From 27543121414b5d584f2b708026edd102c849ce4c Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Sat, 12 Jul 2025 09:52:56 +0200 Subject: [PATCH] Adding visual return when error --- src/components/page_layout_new.py | 1 + src/components/workflows/assets/Workflows.css | 8 ++ .../workflows/components/WorkflowDesigner.py | 98 +++++++++-------- .../workflows/components/WorkflowPlayer.py | 101 +++++++++++++----- src/components/workflows/db_management.py | 18 +++- src/workflow/engine.py | 11 +- tests/test_workflow_designer.py | 2 +- 7 files changed, 163 insertions(+), 76 deletions(-) diff --git a/src/components/page_layout_new.py b/src/components/page_layout_new.py index aa5ff8a..4baddc3 100644 --- a/src/components/page_layout_new.py +++ b/src/components/page_layout_new.py @@ -1,3 +1,4 @@ +from fasthtml.components import Html from fasthtml.components import * from fasthtml.xtend import Script diff --git a/src/components/workflows/assets/Workflows.css b/src/components/workflows/assets/Workflows.css index 2d8e39b..a1e909f 100644 --- a/src/components/workflows/assets/Workflows.css +++ b/src/components/workflows/assets/Workflows.css @@ -98,6 +98,7 @@ background: var(--color-error); } + .wkf-component-content { padding: 0.75rem; /* p-3 in Tailwind */ border-radius: 0.5rem; /* rounded-lg in Tailwind */ @@ -108,6 +109,13 @@ align-items: center; /* items-center in Tailwind */ } +.wkf-component-content.error { + background: var(--color-error); +} + +.wkf-component-content.not-run { + background: var(--color-neutral); +} .wkf-connection-line { position: absolute; diff --git a/src/components/workflows/components/WorkflowDesigner.py b/src/components/workflows/components/WorkflowDesigner.py index 18ec2dc..80ce9b6 100644 --- a/src/components/workflows/components/WorkflowDesigner.py +++ b/src/components/workflows/components/WorkflowDesigner.py @@ -11,7 +11,7 @@ from components.workflows.commands import WorkflowDesignerCommandManager from components.workflows.components.WorkflowPlayer import WorkflowPlayer from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \ - Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState + Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState, ComponentState from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id @@ -84,8 +84,8 @@ class WorkflowDesigner(BaseComponent): def refresh_designer(self): return self._mk_elements() - def refresh_properties(self): - return self._mk_properties() + def refresh_properties(self, oob=False): + return self._mk_properties(oob) def add_component(self, component_type, x, y): self._state.component_counter += 1 @@ -109,11 +109,12 @@ class WorkflowDesigner(BaseComponent): def move_component(self, component_id, x, y): if component_id in self._state.components: + self._state.selected_component_id = component_id self._state.components[component_id].x = int(x) self._state.components[component_id].y = int(y) self._db.save_state(self._key, self._state) # update db - return self.refresh_designer() + return self.refresh_designer(), self.refresh_properties(True) def delete_component(self, component_id): # Remove component @@ -189,17 +190,18 @@ class WorkflowDesigner(BaseComponent): return self.refresh_properties() def play_workflow(self, boundaries: dict): - if self._state.selected_component_id is None: - self._error_message = "No component selected" - return self.tabs_manager.refresh() + self._error_message = None - try: - self._player.run() + self._player.run() + if self._player.global_error: + # Show the error message in the same tab + self._error_message = self._player.global_error + + else: + + # change the tab and display the results self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key) - - except Exception as e: - self._error_message = str(e) - + return self.tabs_manager.refresh() def on_processor_details_event(self, component_id: str, event_name: str, details: dict): @@ -260,14 +262,48 @@ class WorkflowDesigner(BaseComponent): """ + def _mk_component(self, component: WorkflowComponent, runtime_state: WorkflowComponentRuntimeState): + info = COMPONENT_TYPES[component.type] + is_selected = self._state.selected_component_id == component.id + if runtime_state.state == ComponentState.FAILURE: + state_class = 'error' # To be styled with a red highlight + elif runtime_state.state == ComponentState.NOT_RUN: + state_class = 'not-run' # To be styled as greyed-out + else: + state_class = '' + + return Div( + # Input connection point + Div(cls="wkf-connection-point wkf-input-point", + data_component_id=component.id, + data_point_type="input"), + + # Component content + Div( + Span(info["icon"], cls="text-xl mb-1"), + H4(component.title, cls="font-semibold text-xs"), + cls=f"wkf-component-content {info['color']} {state_class}" + ), + + # Output connection point + Div(cls="wkf-connection-point wkf-output-point", + data_component_id=component.id, + data_point_type="output"), + + cls=f"wkf-workflow-component w-32 {'selected' if is_selected else ''}", + style=f"left: {component.x}px; top: {component.y}px;", + data_component_id=component.id, + draggable="true" + ) + def _mk_elements(self): return Div( # Render connections *[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections], # Render components - *[self._mk_workflow_component(comp, state) for comp, state in zip(self._state.components.values(), - self._player.runtime_states)], + *[self._mk_component(comp, state) for comp, state in zip(self._state.components.values(), + self._player.runtime_states)], ) def _mk_canvas(self, oob=False): @@ -293,7 +329,7 @@ class WorkflowDesigner(BaseComponent): self._mk_toolbox(), # (Left side) self._mk_canvas(), # (Right side) - cls="wkf-designer flex gap-4", + cls="wkf-designer flex gap-1", id=f"d_{self._id}", style=f"height:{self._state.designer_height}px;" ) @@ -374,11 +410,12 @@ class WorkflowDesigner(BaseComponent): Script(f"bindFormData('f_{self._id}_{component_id}');") ) - def _mk_properties(self): + def _mk_properties(self, oob=False): return Div( self._mk_properties_details(self._state.selected_component_id), cls="p-2 bg-base-100 rounded-lg border", style=f"height:{self._get_properties_height()}px;", + hx_swap_oob='true' if oob else None, id=f"p_{self._id}", ) @@ -503,30 +540,3 @@ class WorkflowDesigner(BaseComponent): draggable="true", data_type=component_type ) - - @staticmethod - def _mk_workflow_component(component: WorkflowComponent, component_state: WorkflowComponentRuntimeState): - info = COMPONENT_TYPES[component.type] - return Div( - # Input connection point - Div(cls="wkf-connection-point wkf-input-point", - data_component_id=component.id, - data_point_type="input"), - - # Component content - Div( - Span(info["icon"], cls="text-xl mb-1"), - H4(component.title, cls="font-semibold text-xs"), - cls=f"wkf-component-content {info['color']} {'error' if component_state.has_error else ''}" - ), - - # Output connection point - Div(cls="wkf-connection-point wkf-output-point", - data_component_id=component.id, - data_point_type="output"), - - cls="wkf-workflow-component w-32", - style=f"left: {component.x}px; top: {component.y}px;", - data_component_id=component.id, - draggable="true" - ) diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index 0db0526..cdc25bc 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -1,13 +1,15 @@ +from collections import deque + import pandas as pd from fasthtml.components import * -from collections import deque from components.BaseComponent import BaseComponent from components.datagrid_new.components.DataGrid import DataGrid from components.datagrid_new.settings import DataGridSettings from components.workflows.commands import WorkflowPlayerCommandManager from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes -from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, WorkflowComponent +from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, \ + WorkflowComponent, ComponentState from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter @@ -20,8 +22,6 @@ grid_settings = DataGridSettings( open_settings_visible=False) - - class WorkflowPlayer(BaseComponent): def __init__(self, session, _id=None, @@ -33,7 +33,7 @@ class WorkflowPlayer(BaseComponent): self._settings_manager = settings_manager self.tabs_manager = tabs_manager self.key = f"__WorkflowPlayer_{player_settings.workflow_name}" - self._player_settings : WorkflowsPlayerSettings = player_settings + self._player_settings: WorkflowsPlayerSettings = player_settings self._boundaries = boundaries self.commands = WorkflowPlayerCommandManager(self) self._datagrid = InstanceManager.get(self._session, @@ -43,21 +43,66 @@ class WorkflowPlayer(BaseComponent): grid_settings=grid_settings, boundaries=boundaries) self.runtime_states = [WorkflowComponentRuntimeState(component.id) for component in player_settings.components] - self.global_error = False + self.global_error = None + self.has_error = False def run(self): + # Reset all component states to NOT_RUN before execution + for state in self.runtime_states: + state.state = ComponentState.NOT_RUN + state.error_message = None + self.global_error = None + + components_by_id = {c.id: c for c in self._player_settings.components} + + try: + sorted_components = self._get_sorted_components() + except ValueError as e: + # Handle workflow structure errors (e.g., cycles) + self.global_error = f"Workflow configuration error: {e}" + self._datagrid.init_from_dataframe(pd.DataFrame([])) + return + engine = self._get_engine() res = engine.run_to_list() + runtime_states_by_id = {rs.id: rs for rs in self.runtime_states} + if engine.has_error: - self.global_error = engine.global_error - for runtime_state in self.runtime_states: - if runtime_state.id in engine.errors: - runtime_state.has_error = True - runtime_state.error_message = engine.errors[runtime_state.id].error_message - else: - runtime_state.has_error = False - runtime_state.error_message = "" + self.has_error = True + + if not engine.errors: + self.global_error = engine.global_error + + else: + # Determine component states by simulating a "stop-on-fail" execution + first_failure_found = False + for component in sorted_components: + runtime_state = runtime_states_by_id.get(component.id) + if not runtime_state: + continue + + if first_failure_found: + # After a failure, all subsequent components are marked as NOT_RUN + runtime_state.state = ComponentState.NOT_RUN + continue + + if component.id in engine.errors: + # This is the first component that failed + first_failure_found = True + error = engine.errors[component.id] + runtime_state.state = ComponentState.FAILURE + runtime_state.error_message = str(error) + + # As requested, display the component error in the global error area + component_props = components_by_id[component.id].properties + component_name = component_props.get("processor_name", f"ID: {component.id}") + self.global_error = f"Error in component '{component_name}': {str(error)}" + else: + # This component ran successfully + runtime_state.state = ComponentState.SUCCESS + else: + self.has_error = False data = [row.as_dict() for row in res] df = pd.DataFrame(data) @@ -68,7 +113,7 @@ class WorkflowPlayer(BaseComponent): self._datagrid, id=self._id, ) - + def _get_sorted_components(self) -> list[WorkflowComponent]: """ Sorts the workflow components based on their connections using topological sort. @@ -81,47 +126,47 @@ class WorkflowPlayer(BaseComponent): :return: A list of sorted WorkflowComponent objects. """ components_by_id = {c.id: c for c in self._player_settings.components} - + # Get all component IDs involved in connections involved_ids = set() for conn in self._player_settings.connections: involved_ids.add(conn.from_id) involved_ids.add(conn.to_id) - + # Check if all involved components exist for component_id in involved_ids: if component_id not in components_by_id: raise ValueError(f"Component with ID '{component_id}' referenced in connections but does not exist.") - + # Build the graph (adjacency list and in-degrees) for involved components adj = {cid: [] for cid in involved_ids} in_degree = {cid: 0 for cid in involved_ids} - + for conn in self._player_settings.connections: # from_id -> to_id adj[conn.from_id].append(conn.to_id) in_degree[conn.to_id] += 1 - + # Find all sources (nodes with in-degree 0) queue = deque([cid for cid in involved_ids if in_degree[cid] == 0]) - + sorted_order = [] while queue: u = queue.popleft() sorted_order.append(u) - + for v in adj.get(u, []): in_degree[v] -= 1 if in_degree[v] == 0: queue.append(v) - + # Check for cycles if len(sorted_order) != len(involved_ids): raise ValueError("A cycle was detected in the workflow connections.") - + # Return sorted components return [components_by_id[cid] for cid in sorted_order] - + def _get_engine(self): # first reorder the component, according to the connection definitions sorted_components = self._get_sorted_components() @@ -134,10 +179,10 @@ class WorkflowPlayer(BaseComponent): component.id, component.properties["repository"], component.properties["table"])) - + elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default": engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"])) - + elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default": engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"])) return engine @@ -148,4 +193,4 @@ class WorkflowPlayer(BaseComponent): if suffix is None: suffix = get_unique_id() - return make_safe_id(f"{prefix}{suffix}") \ No newline at end of file + return make_safe_id(f"{prefix}{suffix}") diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index 15f020f..eafdee5 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -1,3 +1,4 @@ +import enum import logging from dataclasses import dataclass, field @@ -8,6 +9,15 @@ from core.settings_management import SettingsManager logger = logging.getLogger("WorkflowsSettings") +class ComponentState(enum.Enum): + """ + Represents the execution state of a workflow component. + """ + SUCCESS = "success" + FAILURE = "failure" + NOT_RUN = "not_run" + + # Data structures @dataclass class WorkflowComponent: @@ -29,9 +39,13 @@ class Connection: @dataclass class WorkflowComponentRuntimeState: + """ + Represents the runtime state of a single workflow component. + """ id: str - has_error: bool = False - error_message: str = "" + state: ComponentState = ComponentState.NOT_RUN + error_message: str | None = None + @dataclass class WorkflowsDesignerSettings: diff --git a/src/workflow/engine.py b/src/workflow/engine.py index ba0fd14..0d57005 100644 --- a/src/workflow/engine.py +++ b/src/workflow/engine.py @@ -181,4 +181,13 @@ class WorkflowEngine: Run the workflow and return all results as a list. Use this method when you need all results at once. """ - return list(self.run()) + try: + return list(self.run()) + except DataProcessorError as err: + self.has_error = True + self.errors[err.component_id] = err.error + return [] + except Exception as err: + self.has_error = True + self.global_error = str(err) + return [] diff --git a/tests/test_workflow_designer.py b/tests/test_workflow_designer.py index 7f009a4..29b57ac 100644 --- a/tests/test_workflow_designer.py +++ b/tests/test_workflow_designer.py @@ -85,7 +85,7 @@ def test_i_can_render_no_component(designer): def test_i_can_render_a_producer(designer, producer_component): component = producer_component component_state = WorkflowComponentRuntimeState(component.id) - actual = designer._mk_workflow_component(component, component_state) + actual = designer._mk_component(component, component_state) expected = Div( # input connection point Div(cls="wkf-connection-point wkf-input-point",