Files
MyManagingTools/src/components/workflows/db_management.py

126 lines
3.7 KiB
Python

import logging
from dataclasses import dataclass, field
from components.workflows.constants import WORKFLOWS_SETTINGS_ENTRY
from core.settings_management import SettingsManager
logger = logging.getLogger("WorkflowsSettings")
# Data structures
@dataclass
class WorkflowComponent:
id: str
type: str
x: int
y: int
title: str
description: str
@dataclass
class Connection:
id: str
from_id: str
to_id: str
@dataclass
class WorkflowsDesignerSettings:
workflow_name: str
@dataclass
class WorkflowsDesignerState:
components: dict[str, WorkflowComponent] = field(default_factory=dict)
connections: list[Connection] = field(default_factory=list)
component_counter = 0
@dataclass
class WorkflowsSettings:
workflows: list[str] = field(default_factory=list)
selected_workflow: str = None
class WorkflowsDbManager:
def __init__(self, session: dict, settings_manager: SettingsManager):
self.session = session
self.settings_manager = settings_manager
def add_workflow(self, workflow_name: str):
settings = self._get_settings()
if not workflow_name:
raise ValueError("Workflow name cannot be empty.")
if workflow_name in settings.workflows:
raise ValueError(f"Workflow '{workflow_name}' already exists.")
settings.workflows.append(workflow_name)
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
return True
def get_workflow(self, workflow_name: str):
if not workflow_name:
raise ValueError("Workflow name cannot be empty.")
settings = self._get_settings()
if workflow_name not in settings.workflows:
raise ValueError(f"Workflow '{workflow_name}' does not exist.")
return next(filter(lambda r: r == workflow_name, settings.workflows))
# def modify_workflow(self, old_workflow_name, new_workflow_name: str, tables: list[str]):
# if not old_workflow_name or not new_workflow_name:
# raise ValueError("Workflow name cannot be empty.")
#
# settings = self._get_settings()
# for workflow in settings.workflows:
# if workflow == old_workflow_name:
# workflow.name = new_workflow_name
# workflow.tables = tables
#
# self.settings_manager.save(self.session, workflows_SETTINGS_ENTRY, settings)
# return workflow
#
# else:
# raise ValueError(f"workflow '{old_workflow_name}' not found.")
def remove_workflow(self, workflow_name):
if not workflow_name:
raise ValueError("Workflow name cannot be empty.")
settings = self._get_settings()
if workflow_name not in settings.workflows:
raise ValueError(f"workflow '{workflow_name}' does not exist.")
settings.workflows.remove(workflow_name)
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
return True
def exists_workflow(self, workflow_name):
if not workflow_name:
raise ValueError("workflow name cannot be empty.")
settings = self._get_settings()
return workflow_name in settings.workflows
def get_workflows(self):
return self._get_settings().workflows
def select_workflow(self, workflow_name: str):
"""
Select and save the specified workflow name in the current session's settings.
:param workflow_name: The name of the workflow to be selected and stored.
:type workflow_name: str
:return: None
"""
settings = self._get_settings()
settings.selected_workflow = workflow_name
self.settings_manager.save(self.session, WORKFLOWS_SETTINGS_ENTRY, settings)
def get_selected_workflow(self):
settings = self._get_settings()
return settings.selected_workflow
def _get_settings(self):
return self.settings_manager.load(self.session, WORKFLOWS_SETTINGS_ENTRY, default=WorkflowsSettings())