Files
MyManagingTools/src/components/repositories/db_management.py
2025-07-01 22:07:12 +02:00

223 lines
8.0 KiB
Python

import dataclasses
import logging
from core.settings_management import SettingsManager
REPOSITORIES_SETTINGS_ENTRY = "Repositories"
logger = logging.getLogger("RepositoriesSettings")
@dataclasses.dataclass
class Repository:
name: str
tables: list[str]
@dataclasses.dataclass
class RepositoriesSettings:
repositories: list[Repository] = dataclasses.field(default_factory=list)
selected_repository_name: str = None
@staticmethod
def use_refs():
return {"repositories"}
class RepositoriesDbManager:
def __init__(self, session: dict, settings_manager: SettingsManager):
self.session = session
self.settings_manager = settings_manager
def add_repository(self, repository_name: str, tables: list[str] = None):
"""
Adds a new repository to the list of repositories. The repository is identified
by its name and can optionally include a list of associated tables.
:param repository_name: The name of the repository to be added.
:param tables: A list of Table objects to be associated with the repository,
defaulting to an empty list if not provided.
:type tables: list[Table], optional
:return: None
"""
settings = self._get_settings()
if repository_name is None or repository_name == "":
raise ValueError("Repository name cannot be empty.")
if repository_name in [repo.name for repo in settings.repositories]:
raise ValueError(f"Repository '{repository_name}' already exists.")
existing_repositories = [r.name for r in settings.repositories]
logger.info(f"Existing repositories:{existing_repositories}")
repository = Repository(repository_name, tables or [])
settings.repositories.append(repository)
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
return repository
def get_repository(self, repository_name: str):
if repository_name is None or repository_name == "":
raise ValueError("Repository name cannot be empty.")
settings = self._get_settings()
if repository_name not in [repo.name for repo in settings.repositories]:
raise ValueError(f"Repository '{repository_name}' does not exist.")
return next(filter(lambda r: r.name == repository_name, settings.repositories))
def modify_repository(self, old_repository_name, new_repository_name: str, tables: list[str]):
if not old_repository_name or not new_repository_name:
raise ValueError("Repository name cannot be empty.")
settings = self._get_settings()
for repository in settings.repositories:
if repository.name == old_repository_name:
repository.name = new_repository_name
repository.tables = tables
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
return repository
else:
raise ValueError(f"Repository '{old_repository_name}' not found.")
def remove_repository(self, repository_name):
if not repository_name:
raise ValueError("Repository name cannot be empty.")
settings = self._get_settings()
repository = next(filter(lambda r: r.name == repository_name, settings.repositories), None)
if repository is None:
raise ValueError(f"Repository '{repository_name}' does not exist.")
settings.repositories.remove(repository)
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
return repository
def exists_repository(self, repository_name):
if repository_name is None or repository_name == "":
raise ValueError("Repository name cannot be empty.")
settings = self._get_settings()
return repository_name in [repo.name for repo in settings.repositories]
def get_repositories(self):
return self._get_settings().repositories
def add_table(self, repository_name: str, table_name: str):
"""
Adds a table to the specified repository
:param repository_name: The name of the target repository.
:param table_name: The name of the table to add.
:return: None
"""
settings, repository = self._get_settings_and_repo(repository_name, table_name, t1_must_exists=False)
repository.tables.append(table_name)
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
def modify_table(self, repository_name: str, old_table_name: str, new_table_name: str):
"""
Modifies the name of a table in the specified repository.
:param repository_name: The name of the repository containing the table.
:param old_table_name: The current name of the table to be modified.
:param new_table_name: The new name for the table.
:return: None
"""
settings, repository = self._get_settings_and_repo(repository_name, old_table_name, new_table_name)
table_index = repository.tables.index(old_table_name)
repository.tables[table_index] = new_table_name
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
def remove_table(self, repository_name: str, table_name: str):
"""
Removes a table from the specified repository.
:param repository_name: The name of the repository containing the table.
:param table_name: The name of the table to be removed.
:return: None
"""
settings, repository = self._get_settings_and_repo(repository_name, table_name)
repository.tables.remove(table_name)
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
def exists_table(self, repository_name: str, table_name: str):
if repository_name is None or repository_name == "":
raise ValueError("Repository name cannot be empty.")
if table_name is None or table_name == "":
raise ValueError("Table name cannot be empty.")
settings = self._get_settings()
repository = next(filter(lambda r: r.name == repository_name, settings.repositories), None)
if repository is None:
return False
return table_name in repository.tables
def ensure_exists(self, repository_name: str, table_name: str):
"""
:param repository_name:
:param table_name:
:return:
"""
try:
if not self.exists_table(repository_name, table_name):
self.add_table(repository_name, table_name)
except NameError:
self.add_repository(repository_name, [table_name])
return repository_name, table_name
def select_repository(self, repository_name: str):
"""
Select and save the specified repository name in the current session's settings.
:param repository_name: The name of the repository to be selected and stored.
:type repository_name: str
:return: None
"""
settings = self._get_settings()
settings.selected_repository_name = repository_name
self.settings_manager.save(self.session, REPOSITORIES_SETTINGS_ENTRY, settings)
def get_selected_repository(self):
settings = self._get_settings()
return settings.selected_repository_name
def _get_settings(self):
return self.settings_manager.load(self.session, REPOSITORIES_SETTINGS_ENTRY, default=RepositoriesSettings())
def _get_settings_and_repo(self, repository_name, *tables_names, t1_must_exists=True, t2_must_exists=False):
if not repository_name:
raise ValueError("Repository name cannot be empty.")
for table_name in tables_names:
if not table_name:
raise ValueError("Table name cannot be empty.")
settings = self._get_settings()
repository = next(filter(lambda r: r.name == repository_name, settings.repositories), None)
if repository is None:
raise NameError(f"Repository '{repository_name}' does not exist.")
for table_name, must_exist in zip(tables_names, [t1_must_exists, t2_must_exists]):
if must_exist:
if table_name not in repository.tables:
raise NameError(f"Table '{table_name}' does not exist in repository '{repository_name}'.")
else:
if table_name in repository.tables:
raise ValueError(f"Table '{table_name}' already exists in repository '{repository_name}'.")
return settings, repository