diff --git a/src/components/workflows/assets/Workflows.css b/src/components/workflows/assets/Workflows.css index 122f384..2d8e39b 100644 --- a/src/components/workflows/assets/Workflows.css +++ b/src/components/workflows/assets/Workflows.css @@ -51,12 +51,17 @@ .wkf-canvas { position: relative; + box-sizing: border-box; background-image: linear-gradient(rgba(0,0,0,.1) 1px, transparent 1px), linear-gradient(90deg, rgba(0,0,0,.1) 1px, transparent 1px); background-size: 20px 20px; } +.wkf-canvas-error { + border: 3px solid var(--color-error); +} + .wkf-toolbox { min-height: 230px; width: 8rem; /* w-32 (32 * 0.25rem = 8rem) */ @@ -89,6 +94,10 @@ transition: none; } +.wkf-workflow-component.error { + background: var(--color-error); +} + .wkf-component-content { padding: 0.75rem; /* p-3 in Tailwind */ border-radius: 0.5rem; /* rounded-lg in Tailwind */ diff --git a/src/components/workflows/components/WorkflowDesigner.py b/src/components/workflows/components/WorkflowDesigner.py index ed05784..4959992 100644 --- a/src/components/workflows/components/WorkflowDesigner.py +++ b/src/components/workflows/components/WorkflowDesigner.py @@ -11,7 +11,7 @@ from components.workflows.commands import WorkflowDesignerCommandManager from components.workflows.components.WorkflowPlayer import WorkflowPlayer from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \ - Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings + Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id @@ -64,6 +64,17 @@ class WorkflowDesigner(BaseComponent): self._state = self._db.load_state(key) self._boundaries = boundaries self.commands = WorkflowDesignerCommandManager(self) + + workflow_name = self._designer_settings.workflow_name + self._player = InstanceManager.get(self._session, + WorkflowPlayer.create_component_id(self._session, workflow_name), + WorkflowPlayer, + settings_manager=self._settings_manager, + tabs_manager=self.tabs_manager, + player_settings=WorkflowsPlayerSettings(workflow_name, + list(self._state.components.values())), + boundaries=boundaries) + self._error_message = None def set_boundaries(self, boundaries: dict): @@ -178,24 +189,17 @@ class WorkflowDesigner(BaseComponent): def play_workflow(self, boundaries: dict): if self._state.selected_component_id is None: - return self.error_message("No component selected") - - workflow_name = self._designer_settings.workflow_name - player = InstanceManager.get(self._session, - WorkflowPlayer.create_component_id(self._session, workflow_name), - WorkflowPlayer, - settings_manager=self._settings_manager, - tabs_manager=self.tabs_manager, - player_settings=WorkflowsPlayerSettings(workflow_name, - list(self._state.components.values())), - boundaries=boundaries) - try: - player.run() - self.tabs_manager.add_tab(f"Workflow {workflow_name}", player, player.key) + self._error_message = "No component selected" return self.tabs_manager.refresh() + try: + self._player.run() + self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key) + except Exception as e: - return self.error_message(str(e)) + self._error_message = str(e) + + return self.tabs_manager.refresh() def on_processor_details_event(self, component_id: str, event_name: str, details: dict): if component_id in self._state.components: @@ -207,10 +211,6 @@ class WorkflowDesigner(BaseComponent): return self.refresh_properties() - def error_message(self, message: str): - self._error_message = message - return self.tabs_manager.refresh() - def __ft__(self): return Div( H1(f"{self._designer_settings.workflow_name}", cls="text-xl font-bold"), @@ -265,13 +265,14 @@ class WorkflowDesigner(BaseComponent): *[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections], # Render components - *[self._mk_workflow_component(comp) for comp in self._state.components.values()], + *[self._mk_workflow_component(comp, state) for comp, state in zip(self._state.components.values(), + self._player.runtime_states)], ) def _mk_canvas(self, oob=False): return Div( self._mk_elements(), - cls="wkf-canvas flex-1 rounded-lg border flex-1", + cls=f"wkf-canvas flex-1 rounded-lg border flex-1 {'wkf-canvas-error' if self._error_message else ''}", id=f"c_{self._id}", hx_swap_oob='true' if oob else None, ), @@ -503,7 +504,7 @@ class WorkflowDesigner(BaseComponent): ) @staticmethod - def _mk_workflow_component(component: WorkflowComponent): + def _mk_workflow_component(component: WorkflowComponent, component_state: WorkflowComponentRuntimeState): info = COMPONENT_TYPES[component.type] return Div( # Input connection point @@ -515,7 +516,7 @@ class WorkflowDesigner(BaseComponent): Div( Span(info["icon"], cls="text-xl mb-1"), H4(component.title, cls="font-semibold text-xs"), - cls=f"wkf-component-content {info['color']}" + cls=f"wkf-component-content {info['color']} {'error' if component_state.has_error else ''}" ), # Output connection point diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index 2298c9a..d70abba 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -6,7 +6,7 @@ from components.datagrid_new.components.DataGrid import DataGrid from components.datagrid_new.settings import DataGridSettings from components.workflows.commands import WorkflowPlayerCommandManager from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes -from components.workflows.db_management import WorkflowsPlayerSettings +from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter @@ -39,21 +39,38 @@ class WorkflowPlayer(BaseComponent): key=self.key, grid_settings=grid_settings, boundaries=boundaries) + self.runtime_states = [WorkflowComponentRuntimeState(component.id) for component in player_settings.components] + self.global_error = False def run(self): engine = WorkflowEngine() for component in self._player_settings.components: if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository": engine.add_processor( - TableDataProducer(self._session, self._settings_manager, component.properties["repository"], + TableDataProducer(self._session, + self._settings_manager, + component.id, + component.properties["repository"], component.properties["table"])) + elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default": - engine.add_processor(DefaultDataFilter(component.properties["filter"])) + engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"])) + elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default": - engine.add_processor(DefaultDataPresenter(component.properties["columns"])) + engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"])) res = engine.run_to_list() + if engine.has_error: + self.global_error = engine.global_error + for runtime_state in self.runtime_states: + if runtime_state.id in engine.errors: + runtime_state.has_error = True + runtime_state.error_message = engine.errors[runtime_state.id].error_message + else: + runtime_state.has_error = False + runtime_state.error_message = "" + data = [row.as_dict() for row in res] df = pd.DataFrame(data) self._datagrid.init_from_dataframe(df) diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index c4b9ca5..84844b8 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -27,6 +27,12 @@ class Connection: to_id: str +@dataclass +class WorkflowComponentRuntimeState: + id: str + has_error: bool = False + error_message: str = "" + @dataclass class WorkflowsDesignerSettings: workflow_name: str = "No Name" diff --git a/src/workflow/engine.py b/src/workflow/engine.py index a1704df..f184b76 100644 --- a/src/workflow/engine.py +++ b/src/workflow/engine.py @@ -10,6 +10,9 @@ from utils.Datahelper import DataHelper class DataProcessor(ABC): """Base class for all data processing components.""" + def __init__(self, component_id: str = None): + self.component_id = component_id + @abstractmethod def process(self, data: Any) -> Generator[Any, None, None]: pass @@ -55,7 +58,8 @@ class DataPresenter(DataProcessor): class TableDataProducer(DataProducer): """Base class for data producers that emit data from a repository.""" - def __init__(self, session, settings_manager, repository_name, table_name): + def __init__(self, session, settings_manager, component_id, repository_name, table_name): + super().__init__(component_id) self._session = session self.settings_manager = settings_manager self.repository_name = repository_name @@ -68,8 +72,8 @@ class TableDataProducer(DataProducer): class DefaultDataPresenter(DataPresenter): """Default data presenter that returns the input data unchanged.""" - def __init__(self, columns_as_str: str): - super().__init__() + def __init__(self, component_id: str, columns_as_str: str): + super().__init__(component_id) if not columns_as_str or columns_as_str == "*": self.mappings = None @@ -92,8 +96,8 @@ class DefaultDataPresenter(DataPresenter): class DefaultDataFilter(DataFilter): - def __init__(self, filter_expression: str): - super().__init__() + def __init__(self, component_id: str, filter_expression: str): + super().__init__(component_id) self.filter_expression = filter_expression self._ast_tree = ast.parse(filter_expression, "", 'eval') self._compiled = compile(self._ast_tree, "", "eval") @@ -112,6 +116,9 @@ class WorkflowEngine: def __init__(self): self.processors: list[DataProcessor] = [] + self.has_error = False + self.global_error = None + self.errors = {} def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine': """Add a data processor to the pipeline.""" @@ -137,12 +144,16 @@ class WorkflowEngine: The first processor must be a DataProducer. """ if not self.processors: - raise ValueError("No processors in the pipeline") + self.has_error = False + self.global_error = "No processors in the pipeline" + raise ValueError(self.global_error) first_processor = self.processors[0] if not isinstance(first_processor, DataProducer): - raise ValueError("First processor must be a DataProducer") + self.has_error = False + self.global_error = "First processor must be a DataProducer" + raise ValueError(self.global_error) for item in first_processor.emit(): yield from self._process_single_item(item, 1) diff --git a/tests/test_workflow_designer.py b/tests/test_workflow_designer.py index 57724e4..7f009a4 100644 --- a/tests/test_workflow_designer.py +++ b/tests/test_workflow_designer.py @@ -5,7 +5,8 @@ from fasthtml.xtend import Script from components.workflows.components.WorkflowDesigner import WorkflowDesigner, COMPONENT_TYPES from components.workflows.constants import ProcessorTypes -from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection +from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection, \ + WorkflowComponentRuntimeState from core.settings_management import SettingsManager, MemoryDbEngine from helpers import matches, Contains @@ -83,7 +84,8 @@ def test_i_can_render_no_component(designer): def test_i_can_render_a_producer(designer, producer_component): component = producer_component - actual = designer._mk_workflow_component(component) + component_state = WorkflowComponentRuntimeState(component.id) + actual = designer._mk_workflow_component(component, component_state) expected = Div( # input connection point Div(cls="wkf-connection-point wkf-input-point",