from unittest.mock import MagicMock import pandas as pd import pytest from pandas.testing import assert_frame_equal from components.undo_redo.components.UndoRedo import UndoRedo from components.workflows.components.WorkflowDesigner import COMPONENT_TYPES, WorkflowDesigner from components.workflows.components.WorkflowPlayer import WorkflowPlayer, WorkflowsPlayerError from components.workflows.constants import ProcessorTypes from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings from core.instance_manager import InstanceManager from core.settings_management import SettingsManager, MemoryDbEngine from my_mocks import tabs_manager from workflow.engine import DataProcessorError TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id" TEST_WORKFLOW_PLAYER_ID = "workflow_player_id" @pytest.fixture(autouse=True) def mock_undo_redo(session): # Create a mock UndoRedo instance undo_redo = MagicMock(spec=UndoRedo) # Store original get method original_get = InstanceManager.get def mock_get(sess, instance_id, *args, **kwargs): if instance_id == UndoRedo.create_component_id(sess): return undo_redo return original_get(sess, instance_id, *args, **kwargs) # Replace get method with our mock InstanceManager.get = mock_get yield undo_redo # Restore original get method after test InstanceManager.get = original_get @pytest.fixture def settings_manager(): return SettingsManager(MemoryDbEngine()) @pytest.fixture def designer(session, settings_manager, tabs_manager): components = [ WorkflowComponent( "comp_producer", ProcessorTypes.Producer, 10, 100, COMPONENT_TYPES[ProcessorTypes.Producer]["title"], COMPONENT_TYPES[ProcessorTypes.Producer]["description"], {"processor_name": "Repository"} ), WorkflowComponent( "comp_filter", ProcessorTypes.Filter, 40, 100, COMPONENT_TYPES[ProcessorTypes.Filter]["title"], COMPONENT_TYPES[ProcessorTypes.Filter]["description"], {"processor_name": "Default"} ), WorkflowComponent( "comp_presenter", ProcessorTypes.Presenter, 70, 100, COMPONENT_TYPES[ProcessorTypes.Presenter]["title"], COMPONENT_TYPES[ProcessorTypes.Presenter]["description"], {"processor_name": "Default"} ) ] connections = [ Connection("conn_1", "comp_producer", "comp_filter"), Connection("conn_2", "comp_filter", "comp_presenter"), ] designer = WorkflowDesigner( session, TEST_WORKFLOW_DESIGNER_ID, settings_manager, tabs_manager, "Workflow Designer", WorkflowsDesignerSettings(workflow_name="Test Workflow"), {"height": 500, "width": 800} ) designer._state.components = {c.id: c for c in components} designer._state.connections = connections return designer @pytest.fixture def player(session, settings_manager, tabs_manager, designer): """ Sets up a standard WorkflowPlayer instance with a 3-component linear workflow. A helper method 'get_dataframe' is attached for easier testing. """ return WorkflowPlayer(session=session, _id=TEST_WORKFLOW_PLAYER_ID, settings_manager=settings_manager, tabs_manager=tabs_manager, designer=designer, boundaries={"height": 500, "width": 800} ) def test_run_successful_workflow(player, mocker): """ Tests the "happy path" where the workflow runs successfully from start to finish. """ # 1. Arrange: Mock a successful engine run mock_engine = MagicMock() mock_engine.has_error = False mock_result_data = [ MagicMock(as_dict=lambda: {'col_a': 1, 'col_b': 'x'}), MagicMock(as_dict=lambda: {'col_a': 2, 'col_b': 'y'}) ] mock_engine.run_to_list.return_value = mock_result_data mocker.patch.object(player, '_get_engine', return_value=mock_engine) # 2. Act player.run() # 3. Assert: Check for success state and correct data assert not player.has_error assert player.global_error is None for component_id, state in player.runtime_states.items(): assert state.state == ComponentState.SUCCESS player._get_engine.assert_called_once() mock_engine.run_to_list.assert_called_once() expected_df = pd.DataFrame([row.as_dict() for row in mock_result_data]) assert_frame_equal(player.get_dataframe(), expected_df) def test_run_with_cyclical_dependency(player, mocker): """ Tests that a workflow with a cycle is detected and handled before execution. """ # 1. Arrange: Introduce a cycle and spy on engine creation player._designer._state.connections.append(Connection("conn_3", "comp_presenter", "comp_producer")) spy_get_engine = mocker.spy(player, '_get_engine') # 2. Act player.run() # 3. Assert: Check for the specific cycle error assert player.has_error assert "Workflow configuration error: A cycle was detected" in player.global_error spy_get_engine.assert_not_called() def test_run_with_component_initialization_failure(player, mocker): """ Tests that an error during a component's initialization is handled correctly. """ # 1. Arrange: Make the engine creation fail for a specific component failing_component_id = "comp_filter" error = ValueError("Missing a required property") mocker.patch.object(player, '_get_engine', side_effect=WorkflowsPlayerError(failing_component_id, error)) # 2. Act player.run() # 3. Assert: Check that the specific component is marked as failed assert player.has_error assert f"Failed to init component '{failing_component_id}'" in player.global_error assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE assert str(error) in player.runtime_states[failing_component_id].error_message assert player.runtime_states["comp_producer"].state == ComponentState.NOT_RUN def test_run_with_failure_in_middle_component(player, mocker): """ Tests failure in a middle component updates all component states correctly. """ # 1. Arrange: Mock an engine that fails at the filter component mock_engine = MagicMock() mock_engine.has_error = True failing_component_id = "comp_filter" error = RuntimeError("Data processing failed unexpectedly") mock_engine.errors = {failing_component_id: DataProcessorError(failing_component_id, error)} mock_engine.run_to_list.return_value = [] mocker.patch.object(player, '_get_engine', return_value=mock_engine) # 2. Act player.run() # 3. Assert: Check the state of each component in the chain assert player.has_error assert f"Error in component 'comp_filter':" in player.global_error assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE assert str(error) in player.runtime_states[failing_component_id].error_message assert player.runtime_states["comp_presenter"].state == ComponentState.NOT_RUN def test_run_with_empty_workflow(player, mocker): """ Tests that running a workflow with no components completes without errors. """ # 1. Arrange: Clear components and connections player._designer._state.components = {} player._designer._state.connections = [] spy_get_engine = mocker.spy(player, '_get_engine') # 2. Act player.run() # 3. Assert: Ensure it finishes cleanly with no data assert not player.has_error assert player.global_error == 'No connections defined.' spy_get_engine.assert_not_called() def test_run_with_global_engine_error(player, mocker): """ Tests a scenario where the engine reports a global error not tied to a specific component. """ # 1. Arrange: Mock a global engine failure mock_engine = MagicMock() mock_engine.has_error = True mock_engine.errors = {} # No specific component error mock_engine.global_error = "A simulated global engine failure" mock_engine.run_to_list.return_value = [] mocker.patch.object(player, '_get_engine', return_value=mock_engine) # 2. Act player.run() # 3. Assert: The player should report the global error from the engine assert player.has_error assert player.global_error == mock_engine.global_error