I can show WorkflowDesigner tab
I
This commit is contained in:
@@ -42,6 +42,10 @@
|
||||
transition: opacity 0.2s ease;
|
||||
}
|
||||
|
||||
.mmt-selected {
|
||||
background-color: var(--color-base-300);
|
||||
border-radius: .25rem;
|
||||
}
|
||||
|
||||
.icon-32 {
|
||||
width: 32px;
|
||||
|
||||
@@ -149,6 +149,9 @@ class MyTabs(BaseComponent):
|
||||
if active is not None:
|
||||
to_modify.active = active
|
||||
|
||||
def get_tab_content_by_key(self, key):
|
||||
return self.tabs_by_key[key].content if key in self.tabs_by_key else None
|
||||
|
||||
def refresh(self):
|
||||
return self.render(oob=True)
|
||||
|
||||
|
||||
@@ -24,3 +24,11 @@ def post(session, _id: str, tab_id: str, form_id: str, name: str, tab_boundaries
|
||||
f"Entering {Routes.AddWorkflow} with args {debug_session(session)}, {_id=}, {tab_id=}, {form_id=}, {name=}, {tab_boundaries=}")
|
||||
instance = InstanceManager.get(session, _id)
|
||||
return instance.add_new_workflow(tab_id, form_id, name, json.loads(tab_boundaries))
|
||||
|
||||
|
||||
@rt(Routes.ShowWorkflow)
|
||||
def post(session, _id: str, name: str, tab_boundaries: str):
|
||||
logger.debug(
|
||||
f"Entering {Routes.AddWorkflow} with args {debug_session(session)}, {_id=}, {name=}, {tab_boundaries=}")
|
||||
instance = InstanceManager.get(session, _id)
|
||||
return instance.show_workflow(name, json.loads(tab_boundaries))
|
||||
|
||||
@@ -20,3 +20,11 @@ class WorkflowsCommandManager(BaseCommandManager):
|
||||
"hx-target": f"#w_{self._id}",
|
||||
"hx-vals": f'js:{{"_id": "{self._id}", "tab_id": "{tab_id}", "tab_boundaries": getTabContentBoundaries("{self._owner.tabs_manager.get_id()}")}}',
|
||||
}
|
||||
|
||||
def show_workflow(self, workflow_name):
|
||||
return {
|
||||
"hx_post": f"{ROUTE_ROOT}{Routes.ShowWorkflow}",
|
||||
"hx-target": f"#{self._owner.tabs_manager.get_id()}",
|
||||
"hx-swap": "outerHTML",
|
||||
"hx-vals": f'js:{{"_id": "{self._id}", "name": "{workflow_name}", "tab_boundaries": getTabContentBoundaries("{self._owner.tabs_manager.get_id()}")}}',
|
||||
}
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
from fasthtml.components import *
|
||||
|
||||
from components.BaseComponent import BaseComponent
|
||||
from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID
|
||||
from components.workflows.db_management import WorkflowsDesignerSettings
|
||||
from core.utils import get_unique_id
|
||||
|
||||
|
||||
class WorkflowDesigner(BaseComponent):
|
||||
def __init__(self, session,
|
||||
_id=None,
|
||||
settings_manager=None,
|
||||
designer_settings: WorkflowsDesignerSettings = None,
|
||||
boundaries: dict = None):
|
||||
super().__init__(session, _id)
|
||||
self._settings_manager = settings_manager
|
||||
self._designer_settings = designer_settings
|
||||
self.boundaries = boundaries
|
||||
|
||||
def set_boundaries(self, boundaries: dict):
|
||||
self.boundaries = boundaries
|
||||
|
||||
def __ft__(self):
|
||||
return Div(f"Workflow Designer - {self._designer_settings.workflow_name}")
|
||||
|
||||
@staticmethod
|
||||
def create_component_id(session, suffix=None):
|
||||
prefix = f"{WORKFLOW_DESIGNER_INSTANCE_ID}{session['user_id']}"
|
||||
if suffix is None:
|
||||
suffix = get_unique_id()
|
||||
|
||||
return f"{prefix}{suffix}"
|
||||
|
||||
@@ -6,8 +6,9 @@ from assets.icons import icon_add_regular
|
||||
from components.BaseComponent import BaseComponentSingleton
|
||||
from components.form.components.MyForm import MyForm, FormField
|
||||
from components.workflows.commands import WorkflowsCommandManager
|
||||
from components.workflows.components.WorkflowDesigner import WorkflowDesigner
|
||||
from components.workflows.constants import WORKFLOWS_INSTANCE_ID
|
||||
from components.workflows.db_management import WorkflowsDbManager
|
||||
from components.workflows.db_management import WorkflowsDbManager, WorkflowsDesignerSettings
|
||||
from components_helpers import mk_ellipsis, mk_icon
|
||||
from core.instance_manager import InstanceManager
|
||||
|
||||
@@ -50,7 +51,7 @@ class Workflows(BaseComponentSingleton):
|
||||
self.tabs_manager.set_tab_content(tab_id,
|
||||
self._get_workflow_designer(workflow_name, tab_boundaries),
|
||||
title=workflow_name,
|
||||
key=workflow_name,
|
||||
key=f"{self._id}_{workflow_name}",
|
||||
active=True)
|
||||
|
||||
return self._mk_workflows(), self.tabs_manager.refresh()
|
||||
@@ -62,6 +63,23 @@ class Workflows(BaseComponentSingleton):
|
||||
|
||||
return self.tabs_manager.refresh()
|
||||
|
||||
def show_workflow(self, workflow_name: str, tab_boundaries: dict):
|
||||
tab_key = f"{self._id}_{workflow_name}"
|
||||
if tab_key not in self.tabs_manager.tabs:
|
||||
self.tabs_manager.add_tab(workflow_name,
|
||||
self._get_workflow_designer(workflow_name, tab_boundaries),
|
||||
key=tab_key)
|
||||
else:
|
||||
workflow_designer = self.tabs_manager.get_tab_content_by_key(tab_key)
|
||||
workflow_designer.set_boundaries(tab_boundaries)
|
||||
|
||||
self.tabs_manager.select_tab_by_key(tab_key)
|
||||
self.db.select_workflow(workflow_name)
|
||||
return self.tabs_manager.refresh(), self.refresh()
|
||||
|
||||
def refresh(self):
|
||||
return self._mk_workflows(True)
|
||||
|
||||
def __ft__(self):
|
||||
return Div(
|
||||
Div(cls="divider"),
|
||||
@@ -79,7 +97,12 @@ class Workflows(BaseComponentSingleton):
|
||||
)
|
||||
|
||||
def _get_workflow_designer(self, workflow_name: str, tab_boundaries: dict):
|
||||
return Div(f"Workflow Designer for {workflow_name}")
|
||||
return InstanceManager.get(self._session,
|
||||
WorkflowDesigner.create_component_id(self._session, workflow_name),
|
||||
WorkflowDesigner,
|
||||
settings_manager=self._settings_manager,
|
||||
designer_settings=WorkflowsDesignerSettings(workflow_name=workflow_name),
|
||||
boundaries=tab_boundaries)
|
||||
|
||||
def _mk_add_workflow_form(self, tab_id: str):
|
||||
return InstanceManager.get(self._session, MyForm.create_component_id(self._session), MyForm,
|
||||
@@ -89,7 +112,14 @@ class Workflows(BaseComponentSingleton):
|
||||
)
|
||||
|
||||
def _mk_workflow(self, workflow_name: str, selected: bool):
|
||||
return mk_ellipsis(workflow_name, cls="text-sm")
|
||||
elt = mk_ellipsis(workflow_name, cls="text-sm", **self.commands.show_workflow(workflow_name))
|
||||
if selected:
|
||||
return Div(
|
||||
elt,
|
||||
cls="items-center mmt-selected"
|
||||
)
|
||||
else:
|
||||
return elt
|
||||
|
||||
def _mk_workflows(self, oob=False):
|
||||
return Div(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
WORKFLOWS_INSTANCE_ID = "__Workflows__"
|
||||
WORKFLOW_DESIGNER_INSTANCE_ID = "__WorkflowDesigner__"
|
||||
WORKFLOWS_SETTINGS_ENTRY = "Workflows"
|
||||
ROUTE_ROOT = "/workflows"
|
||||
|
||||
@@ -6,3 +7,4 @@ ROUTE_ROOT = "/workflows"
|
||||
class Routes:
|
||||
AddWorkflow = "/add-workflow"
|
||||
SelectWorkflow = "/select-workflow"
|
||||
ShowWorkflow = "/show-workflow"
|
||||
|
||||
@@ -6,6 +6,11 @@ from core.settings_management import SettingsManager
|
||||
|
||||
logger = logging.getLogger("WorkflowsSettings")
|
||||
|
||||
@dataclass
|
||||
class WorkflowsDesignerSettings:
|
||||
workflow_name: str
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class WorkflowsSettings:
|
||||
|
||||
152
tests/tests_workflows_db_manager.py
Normal file
152
tests/tests_workflows_db_manager.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from components.workflows.constants import WORKFLOWS_SETTINGS_ENTRY
|
||||
from components.workflows.db_management import WorkflowsDbManager, WorkflowsSettings
|
||||
from core.settings_management import SettingsManager, MemoryDbEngine
|
||||
|
||||
USER_EMAIL = "test@mail.com"
|
||||
USER_ID = "test_user"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session():
|
||||
return {"user_id": USER_ID, "user_email": USER_EMAIL}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def settings_manager():
|
||||
return SettingsManager(engine=MemoryDbEngine())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def workflows_db_manager(session, settings_manager):
|
||||
return WorkflowsDbManager(session=session, settings_manager=settings_manager)
|
||||
|
||||
|
||||
def test_add_workflow(workflows_db_manager):
|
||||
# Test adding a new workflow
|
||||
assert workflows_db_manager.add_workflow("workflow1") is True
|
||||
|
||||
# Verify workflow was added
|
||||
workflows = workflows_db_manager.get_workflows()
|
||||
assert "workflow1" in workflows
|
||||
assert len(workflows) == 1
|
||||
|
||||
|
||||
def test_add_workflow_empty_name(workflows_db_manager):
|
||||
# Test adding a workflow with empty name raises ValueError
|
||||
with pytest.raises(ValueError, match="Workflow name cannot be empty."):
|
||||
workflows_db_manager.add_workflow("")
|
||||
|
||||
|
||||
def test_add_workflow_duplicate(workflows_db_manager):
|
||||
# Add a workflow
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
# Test adding duplicate workflow raises ValueError
|
||||
with pytest.raises(ValueError, match="Workflow 'workflow1' already exists."):
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
|
||||
def test_get_workflow(workflows_db_manager):
|
||||
# Add a workflow
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
# Test getting the workflow
|
||||
workflow = workflows_db_manager.get_workflow("workflow1")
|
||||
assert workflow == "workflow1"
|
||||
|
||||
|
||||
def test_get_workflow_empty_name(workflows_db_manager):
|
||||
# Test getting a workflow with empty name raises ValueError
|
||||
with pytest.raises(ValueError, match="Workflow name cannot be empty."):
|
||||
workflows_db_manager.get_workflow("")
|
||||
|
||||
|
||||
def test_get_workflow_nonexistent(workflows_db_manager):
|
||||
# Test getting a non-existent workflow raises ValueError
|
||||
with pytest.raises(ValueError, match="Workflow 'nonexistent' does not exist."):
|
||||
workflows_db_manager.get_workflow("nonexistent")
|
||||
|
||||
|
||||
def test_remove_workflow(workflows_db_manager):
|
||||
# Add a workflow
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
# Test removing the workflow
|
||||
assert workflows_db_manager.remove_workflow("workflow1") is True
|
||||
|
||||
# Verify workflow was removed
|
||||
assert len(workflows_db_manager.get_workflows()) == 0
|
||||
|
||||
|
||||
def test_remove_workflow_empty_name(workflows_db_manager):
|
||||
# Test removing a workflow with empty name raises ValueError
|
||||
with pytest.raises(ValueError, match="Workflow name cannot be empty."):
|
||||
workflows_db_manager.remove_workflow("")
|
||||
|
||||
|
||||
def test_remove_workflow_nonexistent(workflows_db_manager):
|
||||
# Test removing a non-existent workflow raises ValueError
|
||||
with pytest.raises(ValueError, match="workflow 'nonexistent' does not exist."):
|
||||
workflows_db_manager.remove_workflow("nonexistent")
|
||||
|
||||
|
||||
def test_exists_workflow(workflows_db_manager):
|
||||
# Add a workflow
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
# Test workflow exists
|
||||
assert workflows_db_manager.exists_workflow("workflow1") is True
|
||||
|
||||
# Test non-existent workflow
|
||||
assert workflows_db_manager.exists_workflow("nonexistent") is False
|
||||
|
||||
|
||||
def test_exists_workflow_empty_name(workflows_db_manager):
|
||||
# Test checking existence of workflow with empty name raises ValueError
|
||||
with pytest.raises(ValueError, match="workflow name cannot be empty."):
|
||||
workflows_db_manager.exists_workflow("")
|
||||
|
||||
|
||||
def test_get_workflows(workflows_db_manager):
|
||||
# Initially, no workflows
|
||||
assert len(workflows_db_manager.get_workflows()) == 0
|
||||
|
||||
# Add workflows
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
workflows_db_manager.add_workflow("workflow2")
|
||||
|
||||
# Test getting all workflows
|
||||
workflows = workflows_db_manager.get_workflows()
|
||||
assert "workflow1" in workflows
|
||||
assert "workflow2" in workflows
|
||||
assert len(workflows) == 2
|
||||
|
||||
|
||||
def test_select_workflow(workflows_db_manager):
|
||||
# Add a workflow
|
||||
workflows_db_manager.add_workflow("workflow1")
|
||||
|
||||
# Select the workflow
|
||||
workflows_db_manager.select_workflow("workflow1")
|
||||
|
||||
# Verify workflow was selected
|
||||
assert workflows_db_manager.get_selected_workflow() == "workflow1"
|
||||
|
||||
|
||||
def test_get_selected_workflow_none(workflows_db_manager):
|
||||
# Initially, no selected workflow
|
||||
assert workflows_db_manager.get_selected_workflow() is None
|
||||
|
||||
|
||||
def test_get_settings_default(workflows_db_manager, session, settings_manager):
|
||||
# Test _get_settings returns default settings when none exist
|
||||
with patch.object(settings_manager, 'load', return_value=WorkflowsSettings()) as mock_load:
|
||||
settings = workflows_db_manager._get_settings()
|
||||
mock_load.assert_called_once_with(session, WORKFLOWS_SETTINGS_ENTRY, default=WorkflowsSettings())
|
||||
assert isinstance(settings, WorkflowsSettings)
|
||||
assert settings.workflows == []
|
||||
assert settings.selected_workflow is None
|
||||
Reference in New Issue
Block a user