From d0f7536fa092a1a0961d526830d503a0d79d384d Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Fri, 11 Jul 2025 19:03:08 +0200 Subject: [PATCH] Adding error management --- .../workflows/components/WorkflowDesigner.py | 3 +- .../workflows/components/WorkflowPlayer.py | 99 +++++++++++++++---- src/components/workflows/db_management.py | 5 +- src/workflow/engine.py | 26 ++++- 4 files changed, 107 insertions(+), 26 deletions(-) diff --git a/src/components/workflows/components/WorkflowDesigner.py b/src/components/workflows/components/WorkflowDesigner.py index 4959992..18ec2dc 100644 --- a/src/components/workflows/components/WorkflowDesigner.py +++ b/src/components/workflows/components/WorkflowDesigner.py @@ -72,7 +72,8 @@ class WorkflowDesigner(BaseComponent): settings_manager=self._settings_manager, tabs_manager=self.tabs_manager, player_settings=WorkflowsPlayerSettings(workflow_name, - list(self._state.components.values())), + list(self._state.components.values()), + self._state.connections), boundaries=boundaries) self._error_message = None diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index d70abba..0db0526 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -1,12 +1,13 @@ import pandas as pd from fasthtml.components import * +from collections import deque from components.BaseComponent import BaseComponent from components.datagrid_new.components.DataGrid import DataGrid from components.datagrid_new.settings import DataGridSettings from components.workflows.commands import WorkflowPlayerCommandManager from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes -from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState +from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, WorkflowComponent from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter @@ -19,6 +20,8 @@ grid_settings = DataGridSettings( open_settings_visible=False) + + class WorkflowPlayer(BaseComponent): def __init__(self, session, _id=None, @@ -30,7 +33,7 @@ class WorkflowPlayer(BaseComponent): self._settings_manager = settings_manager self.tabs_manager = tabs_manager self.key = f"__WorkflowPlayer_{player_settings.workflow_name}" - self._player_settings = player_settings + self._player_settings : WorkflowsPlayerSettings = player_settings self._boundaries = boundaries self.commands = WorkflowPlayerCommandManager(self) self._datagrid = InstanceManager.get(self._session, @@ -43,22 +46,7 @@ class WorkflowPlayer(BaseComponent): self.global_error = False def run(self): - engine = WorkflowEngine() - for component in self._player_settings.components: - if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository": - engine.add_processor( - TableDataProducer(self._session, - self._settings_manager, - component.id, - component.properties["repository"], - component.properties["table"])) - - elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default": - engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"])) - - elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default": - engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"])) - + engine = self._get_engine() res = engine.run_to_list() if engine.has_error: @@ -80,6 +68,79 @@ class WorkflowPlayer(BaseComponent): self._datagrid, id=self._id, ) + + def _get_sorted_components(self) -> list[WorkflowComponent]: + """ + Sorts the workflow components based on their connections using topological sort. + + - A connection from component A to B means A must come before B. + - Raises a ValueError if a cycle is detected. + - Raises a ValueError if a connection references a non-existent component. + - Ignores components that are not part of any connection. + + :return: A list of sorted WorkflowComponent objects. + """ + components_by_id = {c.id: c for c in self._player_settings.components} + + # Get all component IDs involved in connections + involved_ids = set() + for conn in self._player_settings.connections: + involved_ids.add(conn.from_id) + involved_ids.add(conn.to_id) + + # Check if all involved components exist + for component_id in involved_ids: + if component_id not in components_by_id: + raise ValueError(f"Component with ID '{component_id}' referenced in connections but does not exist.") + + # Build the graph (adjacency list and in-degrees) for involved components + adj = {cid: [] for cid in involved_ids} + in_degree = {cid: 0 for cid in involved_ids} + + for conn in self._player_settings.connections: + # from_id -> to_id + adj[conn.from_id].append(conn.to_id) + in_degree[conn.to_id] += 1 + + # Find all sources (nodes with in-degree 0) + queue = deque([cid for cid in involved_ids if in_degree[cid] == 0]) + + sorted_order = [] + while queue: + u = queue.popleft() + sorted_order.append(u) + + for v in adj.get(u, []): + in_degree[v] -= 1 + if in_degree[v] == 0: + queue.append(v) + + # Check for cycles + if len(sorted_order) != len(involved_ids): + raise ValueError("A cycle was detected in the workflow connections.") + + # Return sorted components + return [components_by_id[cid] for cid in sorted_order] + + def _get_engine(self): + # first reorder the component, according to the connection definitions + sorted_components = self._get_sorted_components() + engine = WorkflowEngine() + for component in sorted_components: + if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository": + engine.add_processor( + TableDataProducer(self._session, + self._settings_manager, + component.id, + component.properties["repository"], + component.properties["table"])) + + elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default": + engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"])) + + elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default": + engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"])) + return engine @staticmethod def create_component_id(session, suffix=None): @@ -87,4 +148,4 @@ class WorkflowPlayer(BaseComponent): if suffix is None: suffix = get_unique_id() - return make_safe_id(f"{prefix}{suffix}") + return make_safe_id(f"{prefix}{suffix}") \ No newline at end of file diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index 84844b8..15f020f 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -49,8 +49,9 @@ class WorkflowsDesignerState: @dataclass class WorkflowsPlayerSettings: - workflow_name: str = "No Name" - components: list[WorkflowComponent] = None + workflow_name: str + components: list[WorkflowComponent] + connections: list[Connection] @dataclass diff --git a/src/workflow/engine.py b/src/workflow/engine.py index f184b76..ba0fd14 100644 --- a/src/workflow/engine.py +++ b/src/workflow/engine.py @@ -7,6 +7,12 @@ from core.utils import UnreferencedNamesVisitor from utils.Datahelper import DataHelper +class DataProcessorError(Exception): + def __init__(self, component_id, error): + self.component_id = component_id + self.error = error + + class DataProcessor(ABC): """Base class for all data processing components.""" @@ -27,7 +33,11 @@ class DataProducer(DataProcessor): pass def process(self, data: Any) -> Generator[Any, None, None]: - yield from self.emit(data) + try: + yield from self.emit(data) + + except Exception as e: + raise DataProcessorError(self.component_id, e) class DataFilter(DataProcessor): @@ -39,8 +49,12 @@ class DataFilter(DataProcessor): pass def process(self, data: Any) -> Generator[Any, None, None]: - if self.filter(data): - yield data + try: + if self.filter(data): + yield data + + except Exception as e: + raise DataProcessorError(self.component_id, e) class DataPresenter(DataProcessor): @@ -52,7 +66,11 @@ class DataPresenter(DataProcessor): pass def process(self, data: Any) -> Generator[Any, None, None]: - yield self.present(data) + try: + yield self.present(data) + + except Exception as e: + raise DataProcessorError(self.component_id, e) class TableDataProducer(DataProducer):