From ed793995fba032e788402706bf59026cc856f190 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Sun, 13 Jul 2025 18:11:17 +0200 Subject: [PATCH] Fixed unit tests --- .../workflows/components/WorkflowPlayer.py | 71 +++++++------------ src/components/workflows/db_management.py | 2 +- tests/test_workflow_player.py | 21 +++--- 3 files changed, 39 insertions(+), 55 deletions(-) diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index ac19a12..2db6f2f 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -66,64 +66,48 @@ class WorkflowPlayer(BaseComponent): self.global_error = "No connections defined." return - self._init_state() + self._init_state(ComponentState.NOT_RUN) components_by_id = {c.id: c for c in self._designer.get_workflow_components()} try: sorted_components = self._get_sorted_components() + engine = self._get_engine(sorted_components) + except ValueError as e: # Handle workflow structure errors (e.g., cycles) + self.has_error = True self.global_error = f"Workflow configuration error: {e}" - self._datagrid.init_from_dataframe(pd.DataFrame([])) return - - try: - engine = self._get_engine() except WorkflowsPlayerError as ex: + self.has_error = True + self.global_error = f"Failed to init component '{ex.component_id}': {ex.error}" if ex.component_id in self.runtime_states: self.runtime_states[ex.component_id].state = ComponentState.FAILURE self.runtime_states[ex.component_id].error_message = str(ex.error) - self.global_error = f"Failed to init component '{ex.component_id}': {ex.error}" return res = engine.run_to_list() - if engine.has_error: + if engine.has_error and not engine.errors: self.has_error = True - - if not engine.errors: - self.global_error = engine.global_error - - else: - # Determine component states by simulating a "stop-on-fail" execution - first_failure_found = False - for component in sorted_components: - runtime_state = self.runtime_states.get(component.id) - if not runtime_state: - continue - - if first_failure_found: - # After a failure, all subsequent components are marked as NOT_RUN - runtime_state.state = ComponentState.NOT_RUN - continue - - if component.id in engine.errors: - # This is the first component that failed - first_failure_found = True - error = engine.errors[component.id] - runtime_state.state = ComponentState.FAILURE - runtime_state.error_message = str(error) - - # As requested, display the component error in the global error area - component_props = components_by_id[component.id].properties - component_name = component_props.get("processor_name", f"ID: {component.id}") - self.global_error = f"Error in component '{component_name}': {str(error)}" - else: - # This component ran successfully - runtime_state.state = ComponentState.SUCCESS - else: - self.has_error = False + self.global_error = engine.global_error + + else: # loop through the components and update the runtime states + for component in sorted_components: + runtime_state = self.runtime_states.get(component.id) + + if component.id not in engine.errors: + runtime_state.state = ComponentState.SUCCESS + continue + + # the component failed + error = engine.errors[component.id] + runtime_state.state = ComponentState.FAILURE + runtime_state.error_message = str(error) + self.global_error = f"Error in component '{error.component_id}': {error.error}" # update global error as well + self.has_error = True + break # the remaining components will remain as NOT_RUN data = [row.as_dict() for row in res] df = pd.DataFrame(data) @@ -194,9 +178,8 @@ class WorkflowPlayer(BaseComponent): # Return sorted components return [components_by_id[cid] for cid in sorted_order] - def _get_engine(self): + def _get_engine(self, sorted_components): # first reorder the component, according to the connection definitions - sorted_components = self._get_sorted_components() engine = WorkflowEngine() for component in sorted_components: try: @@ -218,10 +201,10 @@ class WorkflowPlayer(BaseComponent): return engine - def _init_state(self): + def _init_state(self, state: ComponentState = ComponentState.SUCCESS): self.global_error = None self.has_error = False - self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id) + self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id, state) for component in self._designer.get_workflow_components()} @staticmethod diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index 164bdb2..ab918e3 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -43,7 +43,7 @@ class WorkflowComponentRuntimeState: Represents the runtime state of a single workflow component. """ id: str - state: ComponentState = ComponentState.NOT_RUN + state: ComponentState = ComponentState.SUCCESS error_message: str | None = None diff --git a/tests/test_workflow_player.py b/tests/test_workflow_player.py index 95f0210..1fc5e77 100644 --- a/tests/test_workflow_player.py +++ b/tests/test_workflow_player.py @@ -10,6 +10,7 @@ from components.workflows.constants import ProcessorTypes from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings from core.settings_management import SettingsManager, MemoryDbEngine from my_mocks import tabs_manager +from workflow.engine import DataProcessorError TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id" TEST_WORKFLOW_PLAYER_ID = "workflow_player_id" @@ -63,6 +64,9 @@ def designer(session, settings_manager, tabs_manager): {"height": 500, "width": 800} ) + designer._state.components = {c.id: c for c in components} + designer._state.connections = connections + return designer @@ -117,7 +121,7 @@ def test_run_with_cyclical_dependency(player, mocker): Tests that a workflow with a cycle is detected and handled before execution. """ # 1. Arrange: Introduce a cycle and spy on engine creation - player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer")) + player._designer._state.connections.append(Connection("conn_3", "comp_presenter", "comp_producer")) spy_get_engine = mocker.spy(player, '_get_engine') # 2. Act @@ -126,7 +130,6 @@ def test_run_with_cyclical_dependency(player, mocker): # 3. Assert: Check for the specific cycle error assert player.has_error assert "Workflow configuration error: A cycle was detected" in player.global_error - assert player.get_dataframe().empty spy_get_engine.assert_not_called() @@ -159,7 +162,7 @@ def test_run_with_failure_in_middle_component(player, mocker): mock_engine.has_error = True failing_component_id = "comp_filter" error = RuntimeError("Data processing failed unexpectedly") - mock_engine.errors = {failing_component_id: error} + mock_engine.errors = {failing_component_id: DataProcessorError(failing_component_id, error)} mock_engine.run_to_list.return_value = [] mocker.patch.object(player, '_get_engine', return_value=mock_engine) @@ -168,7 +171,7 @@ def test_run_with_failure_in_middle_component(player, mocker): # 3. Assert: Check the state of each component in the chain assert player.has_error - assert f"Error in component 'Default': {error}" in player.global_error + assert f"Error in component 'comp_filter':" in player.global_error assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE assert str(error) in player.runtime_states[failing_component_id].error_message @@ -180,9 +183,8 @@ def test_run_with_empty_workflow(player, mocker): Tests that running a workflow with no components completes without errors. """ # 1. Arrange: Clear components and connections - player._player_settings.components = [] - player._player_settings.connections = [] - player.runtime_states = {} + player._designer._state.components = {} + player._designer._state.connections = [] spy_get_engine = mocker.spy(player, '_get_engine') # 2. Act @@ -190,9 +192,8 @@ def test_run_with_empty_workflow(player, mocker): # 3. Assert: Ensure it finishes cleanly with no data assert not player.has_error - assert player.global_error is None - assert player.get_dataframe().empty - spy_get_engine.assert_called_once() + assert player.global_error == 'No connections defined.' + spy_get_engine.assert_not_called() def test_run_with_global_engine_error(player, mocker):