Adding workflow management
I
This commit is contained in:
99
src/components/workflows/db_management.py
Normal file
99
src/components/workflows/db_management.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from components.workflows.constants import WORKFLOWS_SETTINGS_ENTRY
|
||||
from core.settings_management import SettingsManager
|
||||
|
||||
logger = logging.getLogger("WorkflowsSettings")
|
||||
|
||||
|
||||
@dataclass
|
||||
class WorkflowsSettings:
|
||||
workflows: list[str] = field(default_factory=list)
|
||||
selected_workflow: str = None
|
||||
|
||||
|
||||
class WorkflowsDbManager:
|
||||
def __init__(self, session: dict, settings_manager: SettingsManager):
|
||||
self.session = session
|
||||
self.settings_manager = settings_manager
|
||||
|
||||
def add_workflow(self, workflow_name: str):
|
||||
settings = self._get_settings()
|
||||
|
||||
if not workflow_name:
|
||||
raise ValueError("Workflow name cannot be empty.")
|
||||
|
||||
if workflow_name in settings.workflows:
|
||||
raise ValueError(f"Workflow '{workflow_name}' already exists.")
|
||||
|
||||
settings.workflows.append(workflow_name)
|
||||
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
|
||||
return True
|
||||
|
||||
def get_workflow(self, workflow_name: str):
|
||||
if not workflow_name:
|
||||
raise ValueError("Workflow name cannot be empty.")
|
||||
|
||||
settings = self._get_settings()
|
||||
if workflow_name not in settings.workflows:
|
||||
raise ValueError(f"Workflow '{workflow_name}' does not exist.")
|
||||
|
||||
return next(filter(lambda r: r == workflow_name, settings.workflows))
|
||||
|
||||
# def modify_workflow(self, old_workflow_name, new_workflow_name: str, tables: list[str]):
|
||||
# if not old_workflow_name or not new_workflow_name:
|
||||
# raise ValueError("Workflow name cannot be empty.")
|
||||
#
|
||||
# settings = self._get_settings()
|
||||
# for workflow in settings.workflows:
|
||||
# if workflow == old_workflow_name:
|
||||
# workflow.name = new_workflow_name
|
||||
# workflow.tables = tables
|
||||
#
|
||||
# self.settings_manager.save(self.session, workflows_SETTINGS_ENTRY, settings)
|
||||
# return workflow
|
||||
#
|
||||
# else:
|
||||
# raise ValueError(f"workflow '{old_workflow_name}' not found.")
|
||||
|
||||
def remove_workflow(self, workflow_name):
|
||||
if not workflow_name:
|
||||
raise ValueError("Workflow name cannot be empty.")
|
||||
|
||||
settings = self._get_settings()
|
||||
if workflow_name not in settings.workflows:
|
||||
raise ValueError(f"workflow '{workflow_name}' does not exist.")
|
||||
|
||||
settings.workflows.remove(workflow_name)
|
||||
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
|
||||
return True
|
||||
|
||||
def exists_workflow(self, workflow_name):
|
||||
if not workflow_name:
|
||||
raise ValueError("workflow name cannot be empty.")
|
||||
|
||||
settings = self._get_settings()
|
||||
return workflow_name in settings.workflows
|
||||
|
||||
def get_workflows(self):
|
||||
return self._get_settings().workflows
|
||||
|
||||
def select_workflow(self, workflow_name: str):
|
||||
"""
|
||||
Select and save the specified workflow name in the current session's settings.
|
||||
|
||||
:param workflow_name: The name of the workflow to be selected and stored.
|
||||
:type workflow_name: str
|
||||
:return: None
|
||||
"""
|
||||
settings = self._get_settings()
|
||||
settings.selected_workflow = workflow_name
|
||||
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
|
||||
|
||||
def get_selected_workflow(self):
|
||||
settings = self._get_settings()
|
||||
return settings.selected_workflow
|
||||
|
||||
def _get_settings(self):
|
||||
return self.settings_manager.load(self.session, WORKFLOWS_SETTINGS_ENTRY, default=WorkflowsSettings())
|
||||
Reference in New Issue
Block a user