diff --git a/src/components/workflows/Readme.md b/src/components/workflows/Readme.md index 7f97e73..d13cbab 100644 --- a/src/components/workflows/Readme.md +++ b/src/components/workflows/Readme.md @@ -4,10 +4,12 @@ using `_id={WORKFLOW_DESIGNER_INSTANCE_ID}{session['user_id']}{get_unique_id()}` -| Name | value | -|---------------|------------------| -| Canvas | `c_{self._id}` | -| Designer | `d_{self._id}` | -| Error Message | `err_{self._id}` | -| Properties | `p_{self._id}` | -| Spliter | `s_{self._id}` | +| Name | value | +|-----------------|--------------------| +| Canvas | `c_{self._id}` | +| Designer | `d_{self._id}` | +| Error Message | `err_{self._id}` | +| Properties | `p_{self._id}` | +| Spliter | `s_{self._id}` | +| Top element | `t_{self._id}` | + diff --git a/src/components/workflows/WorkflowsApp.py b/src/components/workflows/WorkflowsApp.py index 701dd83..da45fb0 100644 --- a/src/components/workflows/WorkflowsApp.py +++ b/src/components/workflows/WorkflowsApp.py @@ -135,8 +135,8 @@ def post(session, _id: str, tab_boundaries: str): return instance.play_workflow(json.loads(tab_boundaries)) @rt(Routes.StopWorkflow) -def post(session, _id: str, tab_boundaries: str): +def post(session, _id: str): logger.debug( f"Entering {Routes.StopWorkflow} with args {debug_session(session)}, {_id=}") instance = InstanceManager.get(session, _id) - return instance.stop_workflow(json.loads(tab_boundaries)) \ No newline at end of file + return instance.stop_workflow() \ No newline at end of file diff --git a/src/components/workflows/commands.py b/src/components/workflows/commands.py index 4500b38..6d85a9f 100644 --- a/src/components/workflows/commands.py +++ b/src/components/workflows/commands.py @@ -81,7 +81,7 @@ class WorkflowDesignerCommandManager(BaseCommandManager): "hx_post": f"{ROUTE_ROOT}{Routes.PauseWorkflow}", "hx-target": f"#{self._owner.tabs_manager.get_id()}", "hx-swap": "outerHTML", - "hx-vals": f'js:{{"_id": "{self._id}", "tab_boundaries": getTabContentBoundaries("{self._owner.tabs_manager.get_id()}")}}', + "hx-vals": f'js:{{"_id": "{self._id}")}}', } def stop_workflow(self): @@ -89,7 +89,7 @@ class WorkflowDesignerCommandManager(BaseCommandManager): "hx_post": f"{ROUTE_ROOT}{Routes.StopWorkflow}", "hx-target": f"#{self._owner.tabs_manager.get_id()}", "hx-swap": "outerHTML", - "hx-vals": f'js:{{"_id": "{self._id}", "tab_boundaries": getTabContentBoundaries("{self._owner.tabs_manager.get_id()}")}}', + "hx-vals": f'js:{{"_id": "{self._id}")}}', } diff --git a/src/components/workflows/components/WorkflowDesigner.py b/src/components/workflows/components/WorkflowDesigner.py index 8522751..303733d 100644 --- a/src/components/workflows/components/WorkflowDesigner.py +++ b/src/components/workflows/components/WorkflowDesigner.py @@ -11,7 +11,7 @@ from components.workflows.commands import WorkflowDesignerCommandManager from components.workflows.components.WorkflowPlayer import WorkflowPlayer from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \ - Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState, ComponentState + Connection, WorkflowsDesignerDbManager, ComponentState from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id @@ -71,9 +71,7 @@ class WorkflowDesigner(BaseComponent): WorkflowPlayer, settings_manager=self._settings_manager, tabs_manager=self.tabs_manager, - player_settings=WorkflowsPlayerSettings(workflow_name, - list(self._state.components.values()), - self._state.connections), + designer=self, boundaries=boundaries) self._error_message = None @@ -202,10 +200,10 @@ class WorkflowDesigner(BaseComponent): # change the tab and display the results self._player.set_boundaries(boundaries) self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key) - + return self.tabs_manager.refresh() - def stop_workflow(self, boundaries): + def stop_workflow(self): self._error_message = None self._player.run() return self.tabs_manager.refresh() @@ -220,6 +218,15 @@ class WorkflowDesigner(BaseComponent): return self.refresh_properties() + def get_workflow_name(self): + return self._designer_settings.workflow_name + + def get_workflow_components(self): + return self._state.components.values() + + def get_workflow_connections(self): + return self._state.connections + def __ft__(self): return Div( H1(f"{self._designer_settings.workflow_name}", cls="text-xl font-bold"), @@ -227,7 +234,8 @@ class WorkflowDesigner(BaseComponent): Div( self._mk_media(), self._mk_error_message(), - cls="flex mb-2" + cls="flex mb-2", + id=f"t_{self._id}" ), self._mk_designer(), Div(cls="wkf-splitter", id=f"s_{self._id}"), @@ -268,21 +276,23 @@ class WorkflowDesigner(BaseComponent): """ - def _mk_component(self, component: WorkflowComponent, runtime_state: WorkflowComponentRuntimeState): + def _mk_component(self, component: WorkflowComponent): + + runtime_state = self._player.get_component_runtime_state(component.id) + info = COMPONENT_TYPES[component.type] is_selected = self._state.selected_component_id == component.id + tooltip_content = None + tooltip_class = "" + if runtime_state.state == ComponentState.FAILURE: state_class = 'error' # To be styled with a red highlight + tooltip_content = runtime_state.error_message + tooltip_class = "mmt-tooltip" elif runtime_state.state == ComponentState.NOT_RUN: state_class = 'not-run' # To be styled as greyed-out else: state_class = '' - if runtime_state.state == ComponentState.FAILURE: - tooltip_content = runtime_state.error_message - tooltip_class = "mmt-tooltip" - else: - tooltip_content = None - tooltip_class = "" return Div( # Input connection point @@ -315,8 +325,7 @@ class WorkflowDesigner(BaseComponent): *[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections], # Render components - *[self._mk_component(comp, state) for comp, state in zip(self._state.components.values(), - self._player.runtime_states.values())], + *[self._mk_component(comp) for comp in self._state.components.values()], ) def _mk_canvas(self, oob=False): diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index 01e6cec..ac19a12 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -9,7 +9,7 @@ from components.datagrid_new.components.DataGrid import DataGrid from components.datagrid_new.settings import DataGridSettings from components.workflows.commands import WorkflowPlayerCommandManager from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes -from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, \ +from components.workflows.db_management import WorkflowComponentRuntimeState, \ WorkflowComponent, ComponentState from core.instance_manager import InstanceManager from core.utils import get_unique_id, make_safe_id @@ -34,13 +34,13 @@ class WorkflowPlayer(BaseComponent): _id=None, settings_manager=None, tabs_manager=None, - player_settings: WorkflowsPlayerSettings = None, + designer=None, boundaries: dict = None): super().__init__(session, _id) self._settings_manager = settings_manager self.tabs_manager = tabs_manager - self.key = f"__WorkflowPlayer_{player_settings.workflow_name}" - self._player_settings: WorkflowsPlayerSettings = player_settings + self._designer = designer + self.key = f"__WorkflowPlayer_{designer.get_workflow_name()}" self._boundaries = boundaries self.commands = WorkflowPlayerCommandManager(self) self._datagrid = InstanceManager.get(self._session, @@ -49,18 +49,26 @@ class WorkflowPlayer(BaseComponent): key=self.key, grid_settings=grid_settings, boundaries=boundaries) - self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id) - for component in player_settings.components} + self.runtime_states = {} self.global_error = None self.has_error = False def set_boundaries(self, boundaries: dict): self._datagrid.set_boundaries(boundaries) + def get_component_runtime_state(self, component_id: str): + # return a default value if the player hasn't been played yet + return self.runtime_states.get(component_id, WorkflowComponentRuntimeState(component_id)) + def run(self): - self._reset_state() + # at least one connection is required to play + if len(self._designer.get_workflow_connections()) == 0: + self.global_error = "No connections defined." + return - components_by_id = {c.id: c for c in self._player_settings.components} + self._init_state() + + components_by_id = {c.id: c for c in self._designer.get_workflow_components()} try: sorted_components = self._get_sorted_components() @@ -122,8 +130,8 @@ class WorkflowPlayer(BaseComponent): self._datagrid.init_from_dataframe(df) def stop(self): - self._reset_state() - + self._init_state() + def get_dataframe(self): return self._datagrid.get_dataframe() @@ -144,11 +152,11 @@ class WorkflowPlayer(BaseComponent): :return: A list of sorted WorkflowComponent objects. """ - components_by_id = {c.id: c for c in self._player_settings.components} + components_by_id = {c.id: c for c in self._designer.get_workflow_components()} # Get all component IDs involved in connections involved_ids = set() - for conn in self._player_settings.connections: + for conn in self._designer.get_workflow_connections(): involved_ids.add(conn.from_id) involved_ids.add(conn.to_id) @@ -161,7 +169,7 @@ class WorkflowPlayer(BaseComponent): adj = {cid: [] for cid in involved_ids} in_degree = {cid: 0 for cid in involved_ids} - for conn in self._player_settings.connections: + for conn in self._designer.get_workflow_connections(): # from_id -> to_id adj[conn.from_id].append(conn.to_id) in_degree[conn.to_id] += 1 @@ -210,12 +218,11 @@ class WorkflowPlayer(BaseComponent): return engine - def _reset_state(self, state: ComponentState = ComponentState.SUCCESS): + def _init_state(self): self.global_error = None self.has_error = False - for runtime_state in self.runtime_states.values(): - runtime_state.state = state - runtime_state.error_message = None + self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id) + for component in self._designer.get_workflow_components()} @staticmethod def create_component_id(session, suffix=None): diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index eafdee5..164bdb2 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -61,13 +61,6 @@ class WorkflowsDesignerState: selected_component_id = None -@dataclass -class WorkflowsPlayerSettings: - workflow_name: str - components: list[WorkflowComponent] - connections: list[Connection] - - @dataclass class WorkflowsSettings: workflows: list[str] = field(default_factory=list) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 6b59633..4bae48e 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -11,7 +11,7 @@ def sample_structure(): """ A pytest fixture to provide a sample tree structure for testing. """ - return Html( + return Div( Header(cls="first-class"), Body( "hello world", @@ -26,13 +26,13 @@ def sample_structure(): @pytest.mark.parametrize("value, expected, expected_error", [ (Div(), "value", - "The types are different: != \nactual=div((),{})\nexpected=value."), + "The types are different: != \nactual=
\nexpected=value."), (Div(), A(), "The elements are different: 'div' != 'a'."), (Div(Div()), Div(A()), "Path 'div':\n\tThe elements are different: 'div' != 'a'."), (Div(A(Span())), Div(A("element")), - "Path 'div.a':\n\tThe types are different: != \nactual=span((),{})\nexpected=element."), + "Path 'div.a':\n\tThe types are different: != \nactual=\nexpected=element."), (Div(attr="one"), Div(attr="two"), "Path 'div':\n\tThe values are different for 'attr' : 'one' != 'two'."), (Div(A(attr="alpha")), Div(A(attr="beta")), diff --git a/tests/test_workflow_designer.py b/tests/test_workflow_designer.py index 29b57ac..fecd94b 100644 --- a/tests/test_workflow_designer.py +++ b/tests/test_workflow_designer.py @@ -5,18 +5,19 @@ from fasthtml.xtend import Script from components.workflows.components.WorkflowDesigner import WorkflowDesigner, COMPONENT_TYPES from components.workflows.constants import ProcessorTypes -from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection, \ - WorkflowComponentRuntimeState +from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection from core.settings_management import SettingsManager, MemoryDbEngine from helpers import matches, Contains +from my_mocks import tabs_manager TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id" @pytest.fixture -def designer(session): +def designer(session, tabs_manager): return WorkflowDesigner(session=session, _id=TEST_WORKFLOW_DESIGNER_ID, settings_manager=SettingsManager(engine=MemoryDbEngine()), + tabs_manager=tabs_manager, key=TEST_WORKFLOW_DESIGNER_ID, designer_settings=WorkflowsDesignerSettings("Workflow Name"), boundaries={"height": 500, "width": 800} @@ -72,6 +73,7 @@ def test_i_can_render_no_component(designer): expected = Div( H1("Workflow Name"), P("Drag components from the toolbox to the canvas to create your workflow."), + Div(id=f"t_{designer.get_id()}"), # media + error message Div(id=f"d_{designer.get_id()}"), # designer container Div(cls="wkf-splitter"), Div(id=f"p_{designer.get_id()}"), # properties panel @@ -84,8 +86,7 @@ def test_i_can_render_no_component(designer): def test_i_can_render_a_producer(designer, producer_component): component = producer_component - component_state = WorkflowComponentRuntimeState(component.id) - actual = designer._mk_component(component, component_state) + actual = designer._mk_component(component) expected = Div( # input connection point Div(cls="wkf-connection-point wkf-input-point", diff --git a/tests/test_workflow_player.py b/tests/test_workflow_player.py index 596b489..95f0210 100644 --- a/tests/test_workflow_player.py +++ b/tests/test_workflow_player.py @@ -4,189 +4,212 @@ import pandas as pd import pytest from pandas.testing import assert_frame_equal -from components.workflows.components.WorkflowDesigner import COMPONENT_TYPES +from components.workflows.components.WorkflowDesigner import COMPONENT_TYPES, WorkflowDesigner from components.workflows.components.WorkflowPlayer import WorkflowPlayer, WorkflowsPlayerError from components.workflows.constants import ProcessorTypes -from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponent, Connection, ComponentState +from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings from core.settings_management import SettingsManager, MemoryDbEngine from my_mocks import tabs_manager +TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id" TEST_WORKFLOW_PLAYER_ID = "workflow_player_id" @pytest.fixture -def player(session, tabs_manager): - """ - Sets up a standard WorkflowPlayer instance with a 3-component linear workflow. - A helper method 'get_dataframe' is attached for easier testing. - """ - components = [ - WorkflowComponent( - "comp_producer", - ProcessorTypes.Producer, - 10, 100, - COMPONENT_TYPES[ProcessorTypes.Producer]["title"], - COMPONENT_TYPES[ProcessorTypes.Producer]["description"], - {"processor_name": "Repository"} - ), - WorkflowComponent( - "comp_filter", - ProcessorTypes.Filter, - 40, 100, - COMPONENT_TYPES[ProcessorTypes.Filter]["title"], - COMPONENT_TYPES[ProcessorTypes.Filter]["description"], - {"processor_name": "Default"} - ), - WorkflowComponent( - "comp_presenter", - ProcessorTypes.Presenter, - 70, 100, - COMPONENT_TYPES[ProcessorTypes.Presenter]["title"], - COMPONENT_TYPES[ProcessorTypes.Presenter]["description"], - {"processor_name": "Default"} - ) - ] - connections = [ - Connection("conn_1", "comp_producer", "comp_filter"), - Connection("conn_2", "comp_filter", "comp_presenter"), - ] - return WorkflowPlayer(session=session, - _id=TEST_WORKFLOW_PLAYER_ID, - settings_manager=SettingsManager(engine=MemoryDbEngine()), - tabs_manager=tabs_manager, - player_settings=WorkflowsPlayerSettings("Workflow Name", components, connections), - boundaries={"height": 500, "width": 800} +def settings_manager(): + return SettingsManager(MemoryDbEngine()) + + +@pytest.fixture +def designer(session, settings_manager, tabs_manager): + components = [ + WorkflowComponent( + "comp_producer", + ProcessorTypes.Producer, + 10, 100, + COMPONENT_TYPES[ProcessorTypes.Producer]["title"], + COMPONENT_TYPES[ProcessorTypes.Producer]["description"], + {"processor_name": "Repository"} + ), + WorkflowComponent( + "comp_filter", + ProcessorTypes.Filter, + 40, 100, + COMPONENT_TYPES[ProcessorTypes.Filter]["title"], + COMPONENT_TYPES[ProcessorTypes.Filter]["description"], + {"processor_name": "Default"} + ), + WorkflowComponent( + "comp_presenter", + ProcessorTypes.Presenter, + 70, 100, + COMPONENT_TYPES[ProcessorTypes.Presenter]["title"], + COMPONENT_TYPES[ProcessorTypes.Presenter]["description"], + {"processor_name": "Default"} ) + ] + connections = [ + Connection("conn_1", "comp_producer", "comp_filter"), + Connection("conn_2", "comp_filter", "comp_presenter"), + ] + + designer = WorkflowDesigner( + session, + TEST_WORKFLOW_DESIGNER_ID, + settings_manager, + tabs_manager, + "Workflow Designer", + WorkflowsDesignerSettings(workflow_name="Test Workflow"), + {"height": 500, "width": 800} + ) + + return designer + + +@pytest.fixture +def player(session, settings_manager, tabs_manager, designer): + """ + Sets up a standard WorkflowPlayer instance with a 3-component linear workflow. + A helper method 'get_dataframe' is attached for easier testing. + """ + + return WorkflowPlayer(session=session, + _id=TEST_WORKFLOW_PLAYER_ID, + settings_manager=settings_manager, + tabs_manager=tabs_manager, + designer=designer, + boundaries={"height": 500, "width": 800} + ) def test_run_successful_workflow(player, mocker): - """ - Tests the "happy path" where the workflow runs successfully from start to finish. - """ - # 1. Arrange: Mock a successful engine run - mock_engine = MagicMock() - mock_engine.has_error = False - mock_result_data = [ - MagicMock(as_dict=lambda: {'col_a': 1, 'col_b': 'x'}), - MagicMock(as_dict=lambda: {'col_a': 2, 'col_b': 'y'}) - ] - mock_engine.run_to_list.return_value = mock_result_data - mocker.patch.object(player, '_get_engine', return_value=mock_engine) - - # 2. Act - player.run() - - # 3. Assert: Check for success state and correct data - assert not player.has_error - assert player.global_error is None - for component_id, state in player.runtime_states.items(): - assert state.state == ComponentState.SUCCESS - - player._get_engine.assert_called_once() - mock_engine.run_to_list.assert_called_once() - - expected_df = pd.DataFrame([row.as_dict() for row in mock_result_data]) - assert_frame_equal(player.get_dataframe(), expected_df) + """ + Tests the "happy path" where the workflow runs successfully from start to finish. + """ + # 1. Arrange: Mock a successful engine run + mock_engine = MagicMock() + mock_engine.has_error = False + mock_result_data = [ + MagicMock(as_dict=lambda: {'col_a': 1, 'col_b': 'x'}), + MagicMock(as_dict=lambda: {'col_a': 2, 'col_b': 'y'}) + ] + mock_engine.run_to_list.return_value = mock_result_data + mocker.patch.object(player, '_get_engine', return_value=mock_engine) + + # 2. Act + player.run() + + # 3. Assert: Check for success state and correct data + assert not player.has_error + assert player.global_error is None + for component_id, state in player.runtime_states.items(): + assert state.state == ComponentState.SUCCESS + + player._get_engine.assert_called_once() + mock_engine.run_to_list.assert_called_once() + + expected_df = pd.DataFrame([row.as_dict() for row in mock_result_data]) + assert_frame_equal(player.get_dataframe(), expected_df) def test_run_with_cyclical_dependency(player, mocker): - """ - Tests that a workflow with a cycle is detected and handled before execution. - """ - # 1. Arrange: Introduce a cycle and spy on engine creation - player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer")) - spy_get_engine = mocker.spy(player, '_get_engine') - - # 2. Act - player.run() - - # 3. Assert: Check for the specific cycle error - assert player.has_error - assert "Workflow configuration error: A cycle was detected" in player.global_error - assert player.get_dataframe().empty - spy_get_engine.assert_not_called() + """ + Tests that a workflow with a cycle is detected and handled before execution. + """ + # 1. Arrange: Introduce a cycle and spy on engine creation + player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer")) + spy_get_engine = mocker.spy(player, '_get_engine') + + # 2. Act + player.run() + + # 3. Assert: Check for the specific cycle error + assert player.has_error + assert "Workflow configuration error: A cycle was detected" in player.global_error + assert player.get_dataframe().empty + spy_get_engine.assert_not_called() def test_run_with_component_initialization_failure(player, mocker): - """ - Tests that an error during a component's initialization is handled correctly. - """ - # 1. Arrange: Make the engine creation fail for a specific component - failing_component_id = "comp_filter" - error = ValueError("Missing a required property") - mocker.patch.object(player, '_get_engine', side_effect=WorkflowsPlayerError(failing_component_id, error)) - - # 2. Act - player.run() - - # 3. Assert: Check that the specific component is marked as failed - assert player.has_error - assert f"Failed to init component '{failing_component_id}'" in player.global_error - assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE - assert str(error) in player.runtime_states[failing_component_id].error_message - assert player.runtime_states["comp_producer"].state == ComponentState.NOT_RUN + """ + Tests that an error during a component's initialization is handled correctly. + """ + # 1. Arrange: Make the engine creation fail for a specific component + failing_component_id = "comp_filter" + error = ValueError("Missing a required property") + mocker.patch.object(player, '_get_engine', side_effect=WorkflowsPlayerError(failing_component_id, error)) + + # 2. Act + player.run() + + # 3. Assert: Check that the specific component is marked as failed + assert player.has_error + assert f"Failed to init component '{failing_component_id}'" in player.global_error + assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE + assert str(error) in player.runtime_states[failing_component_id].error_message + assert player.runtime_states["comp_producer"].state == ComponentState.NOT_RUN def test_run_with_failure_in_middle_component(player, mocker): - """ - Tests failure in a middle component updates all component states correctly. - """ - # 1. Arrange: Mock an engine that fails at the filter component - mock_engine = MagicMock() - mock_engine.has_error = True - failing_component_id = "comp_filter" - error = RuntimeError("Data processing failed unexpectedly") - mock_engine.errors = {failing_component_id: error} - mock_engine.run_to_list.return_value = [] - mocker.patch.object(player, '_get_engine', return_value=mock_engine) - - # 2. Act - player.run() - - # 3. Assert: Check the state of each component in the chain - assert player.has_error - assert f"Error in component 'Default': {error}" in player.global_error - assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS - assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE - assert str(error) in player.runtime_states[failing_component_id].error_message - assert player.runtime_states["comp_presenter"].state == ComponentState.NOT_RUN + """ + Tests failure in a middle component updates all component states correctly. + """ + # 1. Arrange: Mock an engine that fails at the filter component + mock_engine = MagicMock() + mock_engine.has_error = True + failing_component_id = "comp_filter" + error = RuntimeError("Data processing failed unexpectedly") + mock_engine.errors = {failing_component_id: error} + mock_engine.run_to_list.return_value = [] + mocker.patch.object(player, '_get_engine', return_value=mock_engine) + + # 2. Act + player.run() + + # 3. Assert: Check the state of each component in the chain + assert player.has_error + assert f"Error in component 'Default': {error}" in player.global_error + assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS + assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE + assert str(error) in player.runtime_states[failing_component_id].error_message + assert player.runtime_states["comp_presenter"].state == ComponentState.NOT_RUN def test_run_with_empty_workflow(player, mocker): - """ - Tests that running a workflow with no components completes without errors. - """ - # 1. Arrange: Clear components and connections - player._player_settings.components = [] - player._player_settings.connections = [] - player.runtime_states = {} - spy_get_engine = mocker.spy(player, '_get_engine') - - # 2. Act - player.run() - - # 3. Assert: Ensure it finishes cleanly with no data - assert not player.has_error - assert player.global_error is None - assert player.get_dataframe().empty - spy_get_engine.assert_called_once() + """ + Tests that running a workflow with no components completes without errors. + """ + # 1. Arrange: Clear components and connections + player._player_settings.components = [] + player._player_settings.connections = [] + player.runtime_states = {} + spy_get_engine = mocker.spy(player, '_get_engine') + + # 2. Act + player.run() + + # 3. Assert: Ensure it finishes cleanly with no data + assert not player.has_error + assert player.global_error is None + assert player.get_dataframe().empty + spy_get_engine.assert_called_once() def test_run_with_global_engine_error(player, mocker): - """ - Tests a scenario where the engine reports a global error not tied to a specific component. - """ - # 1. Arrange: Mock a global engine failure - mock_engine = MagicMock() - mock_engine.has_error = True - mock_engine.errors = {} # No specific component error - mock_engine.global_error = "A simulated global engine failure" - mock_engine.run_to_list.return_value = [] - mocker.patch.object(player, '_get_engine', return_value=mock_engine) - - # 2. Act - player.run() - - # 3. Assert: The player should report the global error from the engine - assert player.has_error - assert player.global_error == mock_engine.global_error \ No newline at end of file + """ + Tests a scenario where the engine reports a global error not tied to a specific component. + """ + # 1. Arrange: Mock a global engine failure + mock_engine = MagicMock() + mock_engine.has_error = True + mock_engine.errors = {} # No specific component error + mock_engine.global_error = "A simulated global engine failure" + mock_engine.run_to_list.return_value = [] + mocker.patch.object(player, '_get_engine', return_value=mock_engine) + + # 2. Act + player.run() + + # 3. Assert: The player should report the global error from the engine + assert player.has_error + assert player.global_error == mock_engine.global_error