Adding unit tests to WorkflowPlayer.py

This commit is contained in:
2025-07-13 12:23:25 +02:00
parent fdf05edec3
commit f3deeaefd1
9 changed files with 252 additions and 217 deletions

View File

@@ -11,7 +11,7 @@ def sample_structure():
"""
A pytest fixture to provide a sample tree structure for testing.
"""
return Html(
return Div(
Header(cls="first-class"),
Body(
"hello world",
@@ -26,13 +26,13 @@ def sample_structure():
@pytest.mark.parametrize("value, expected, expected_error", [
(Div(), "value",
"The types are different: <class 'fastcore.xml.FT'> != <class 'str'>\nactual=div((),{})\nexpected=value."),
"The types are different: <class 'fastcore.xml.FT'> != <class 'str'>\nactual=<div></div>\nexpected=value."),
(Div(), A(),
"The elements are different: 'div' != 'a'."),
(Div(Div()), Div(A()),
"Path 'div':\n\tThe elements are different: 'div' != 'a'."),
(Div(A(Span())), Div(A("element")),
"Path 'div.a':\n\tThe types are different: <class 'fastcore.xml.FT'> != <class 'str'>\nactual=span((),{})\nexpected=element."),
"Path 'div.a':\n\tThe types are different: <class 'fastcore.xml.FT'> != <class 'str'>\nactual=<span></span>\nexpected=element."),
(Div(attr="one"), Div(attr="two"),
"Path 'div':\n\tThe values are different for 'attr' : 'one' != 'two'."),
(Div(A(attr="alpha")), Div(A(attr="beta")),

View File

@@ -5,18 +5,19 @@ from fasthtml.xtend import Script
from components.workflows.components.WorkflowDesigner import WorkflowDesigner, COMPONENT_TYPES
from components.workflows.constants import ProcessorTypes
from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection, \
WorkflowComponentRuntimeState
from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, Connection
from core.settings_management import SettingsManager, MemoryDbEngine
from helpers import matches, Contains
from my_mocks import tabs_manager
TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id"
@pytest.fixture
def designer(session):
def designer(session, tabs_manager):
return WorkflowDesigner(session=session, _id=TEST_WORKFLOW_DESIGNER_ID,
settings_manager=SettingsManager(engine=MemoryDbEngine()),
tabs_manager=tabs_manager,
key=TEST_WORKFLOW_DESIGNER_ID,
designer_settings=WorkflowsDesignerSettings("Workflow Name"),
boundaries={"height": 500, "width": 800}
@@ -72,6 +73,7 @@ def test_i_can_render_no_component(designer):
expected = Div(
H1("Workflow Name"),
P("Drag components from the toolbox to the canvas to create your workflow."),
Div(id=f"t_{designer.get_id()}"), # media + error message
Div(id=f"d_{designer.get_id()}"), # designer container
Div(cls="wkf-splitter"),
Div(id=f"p_{designer.get_id()}"), # properties panel
@@ -84,8 +86,7 @@ def test_i_can_render_no_component(designer):
def test_i_can_render_a_producer(designer, producer_component):
component = producer_component
component_state = WorkflowComponentRuntimeState(component.id)
actual = designer._mk_component(component, component_state)
actual = designer._mk_component(component)
expected = Div(
# input connection point
Div(cls="wkf-connection-point wkf-input-point",

View File

@@ -4,189 +4,212 @@ import pandas as pd
import pytest
from pandas.testing import assert_frame_equal
from components.workflows.components.WorkflowDesigner import COMPONENT_TYPES
from components.workflows.components.WorkflowDesigner import COMPONENT_TYPES, WorkflowDesigner
from components.workflows.components.WorkflowPlayer import WorkflowPlayer, WorkflowsPlayerError
from components.workflows.constants import ProcessorTypes
from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponent, Connection, ComponentState
from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings
from core.settings_management import SettingsManager, MemoryDbEngine
from my_mocks import tabs_manager
TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id"
TEST_WORKFLOW_PLAYER_ID = "workflow_player_id"
@pytest.fixture
def player(session, tabs_manager):
"""
Sets up a standard WorkflowPlayer instance with a 3-component linear workflow.
A helper method 'get_dataframe' is attached for easier testing.
"""
components = [
WorkflowComponent(
"comp_producer",
ProcessorTypes.Producer,
10, 100,
COMPONENT_TYPES[ProcessorTypes.Producer]["title"],
COMPONENT_TYPES[ProcessorTypes.Producer]["description"],
{"processor_name": "Repository"}
),
WorkflowComponent(
"comp_filter",
ProcessorTypes.Filter,
40, 100,
COMPONENT_TYPES[ProcessorTypes.Filter]["title"],
COMPONENT_TYPES[ProcessorTypes.Filter]["description"],
{"processor_name": "Default"}
),
WorkflowComponent(
"comp_presenter",
ProcessorTypes.Presenter,
70, 100,
COMPONENT_TYPES[ProcessorTypes.Presenter]["title"],
COMPONENT_TYPES[ProcessorTypes.Presenter]["description"],
{"processor_name": "Default"}
)
]
connections = [
Connection("conn_1", "comp_producer", "comp_filter"),
Connection("conn_2", "comp_filter", "comp_presenter"),
]
return WorkflowPlayer(session=session,
_id=TEST_WORKFLOW_PLAYER_ID,
settings_manager=SettingsManager(engine=MemoryDbEngine()),
tabs_manager=tabs_manager,
player_settings=WorkflowsPlayerSettings("Workflow Name", components, connections),
boundaries={"height": 500, "width": 800}
def settings_manager():
return SettingsManager(MemoryDbEngine())
@pytest.fixture
def designer(session, settings_manager, tabs_manager):
components = [
WorkflowComponent(
"comp_producer",
ProcessorTypes.Producer,
10, 100,
COMPONENT_TYPES[ProcessorTypes.Producer]["title"],
COMPONENT_TYPES[ProcessorTypes.Producer]["description"],
{"processor_name": "Repository"}
),
WorkflowComponent(
"comp_filter",
ProcessorTypes.Filter,
40, 100,
COMPONENT_TYPES[ProcessorTypes.Filter]["title"],
COMPONENT_TYPES[ProcessorTypes.Filter]["description"],
{"processor_name": "Default"}
),
WorkflowComponent(
"comp_presenter",
ProcessorTypes.Presenter,
70, 100,
COMPONENT_TYPES[ProcessorTypes.Presenter]["title"],
COMPONENT_TYPES[ProcessorTypes.Presenter]["description"],
{"processor_name": "Default"}
)
]
connections = [
Connection("conn_1", "comp_producer", "comp_filter"),
Connection("conn_2", "comp_filter", "comp_presenter"),
]
designer = WorkflowDesigner(
session,
TEST_WORKFLOW_DESIGNER_ID,
settings_manager,
tabs_manager,
"Workflow Designer",
WorkflowsDesignerSettings(workflow_name="Test Workflow"),
{"height": 500, "width": 800}
)
return designer
@pytest.fixture
def player(session, settings_manager, tabs_manager, designer):
"""
Sets up a standard WorkflowPlayer instance with a 3-component linear workflow.
A helper method 'get_dataframe' is attached for easier testing.
"""
return WorkflowPlayer(session=session,
_id=TEST_WORKFLOW_PLAYER_ID,
settings_manager=settings_manager,
tabs_manager=tabs_manager,
designer=designer,
boundaries={"height": 500, "width": 800}
)
def test_run_successful_workflow(player, mocker):
"""
Tests the "happy path" where the workflow runs successfully from start to finish.
"""
# 1. Arrange: Mock a successful engine run
mock_engine = MagicMock()
mock_engine.has_error = False
mock_result_data = [
MagicMock(as_dict=lambda: {'col_a': 1, 'col_b': 'x'}),
MagicMock(as_dict=lambda: {'col_a': 2, 'col_b': 'y'})
]
mock_engine.run_to_list.return_value = mock_result_data
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: Check for success state and correct data
assert not player.has_error
assert player.global_error is None
for component_id, state in player.runtime_states.items():
assert state.state == ComponentState.SUCCESS
player._get_engine.assert_called_once()
mock_engine.run_to_list.assert_called_once()
expected_df = pd.DataFrame([row.as_dict() for row in mock_result_data])
assert_frame_equal(player.get_dataframe(), expected_df)
"""
Tests the "happy path" where the workflow runs successfully from start to finish.
"""
# 1. Arrange: Mock a successful engine run
mock_engine = MagicMock()
mock_engine.has_error = False
mock_result_data = [
MagicMock(as_dict=lambda: {'col_a': 1, 'col_b': 'x'}),
MagicMock(as_dict=lambda: {'col_a': 2, 'col_b': 'y'})
]
mock_engine.run_to_list.return_value = mock_result_data
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: Check for success state and correct data
assert not player.has_error
assert player.global_error is None
for component_id, state in player.runtime_states.items():
assert state.state == ComponentState.SUCCESS
player._get_engine.assert_called_once()
mock_engine.run_to_list.assert_called_once()
expected_df = pd.DataFrame([row.as_dict() for row in mock_result_data])
assert_frame_equal(player.get_dataframe(), expected_df)
def test_run_with_cyclical_dependency(player, mocker):
"""
Tests that a workflow with a cycle is detected and handled before execution.
"""
# 1. Arrange: Introduce a cycle and spy on engine creation
player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer"))
spy_get_engine = mocker.spy(player, '_get_engine')
# 2. Act
player.run()
# 3. Assert: Check for the specific cycle error
assert player.has_error
assert "Workflow configuration error: A cycle was detected" in player.global_error
assert player.get_dataframe().empty
spy_get_engine.assert_not_called()
"""
Tests that a workflow with a cycle is detected and handled before execution.
"""
# 1. Arrange: Introduce a cycle and spy on engine creation
player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer"))
spy_get_engine = mocker.spy(player, '_get_engine')
# 2. Act
player.run()
# 3. Assert: Check for the specific cycle error
assert player.has_error
assert "Workflow configuration error: A cycle was detected" in player.global_error
assert player.get_dataframe().empty
spy_get_engine.assert_not_called()
def test_run_with_component_initialization_failure(player, mocker):
"""
Tests that an error during a component's initialization is handled correctly.
"""
# 1. Arrange: Make the engine creation fail for a specific component
failing_component_id = "comp_filter"
error = ValueError("Missing a required property")
mocker.patch.object(player, '_get_engine', side_effect=WorkflowsPlayerError(failing_component_id, error))
# 2. Act
player.run()
# 3. Assert: Check that the specific component is marked as failed
assert player.has_error
assert f"Failed to init component '{failing_component_id}'" in player.global_error
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
assert str(error) in player.runtime_states[failing_component_id].error_message
assert player.runtime_states["comp_producer"].state == ComponentState.NOT_RUN
"""
Tests that an error during a component's initialization is handled correctly.
"""
# 1. Arrange: Make the engine creation fail for a specific component
failing_component_id = "comp_filter"
error = ValueError("Missing a required property")
mocker.patch.object(player, '_get_engine', side_effect=WorkflowsPlayerError(failing_component_id, error))
# 2. Act
player.run()
# 3. Assert: Check that the specific component is marked as failed
assert player.has_error
assert f"Failed to init component '{failing_component_id}'" in player.global_error
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
assert str(error) in player.runtime_states[failing_component_id].error_message
assert player.runtime_states["comp_producer"].state == ComponentState.NOT_RUN
def test_run_with_failure_in_middle_component(player, mocker):
"""
Tests failure in a middle component updates all component states correctly.
"""
# 1. Arrange: Mock an engine that fails at the filter component
mock_engine = MagicMock()
mock_engine.has_error = True
failing_component_id = "comp_filter"
error = RuntimeError("Data processing failed unexpectedly")
mock_engine.errors = {failing_component_id: error}
mock_engine.run_to_list.return_value = []
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: Check the state of each component in the chain
assert player.has_error
assert f"Error in component 'Default': {error}" in player.global_error
assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
assert str(error) in player.runtime_states[failing_component_id].error_message
assert player.runtime_states["comp_presenter"].state == ComponentState.NOT_RUN
"""
Tests failure in a middle component updates all component states correctly.
"""
# 1. Arrange: Mock an engine that fails at the filter component
mock_engine = MagicMock()
mock_engine.has_error = True
failing_component_id = "comp_filter"
error = RuntimeError("Data processing failed unexpectedly")
mock_engine.errors = {failing_component_id: error}
mock_engine.run_to_list.return_value = []
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: Check the state of each component in the chain
assert player.has_error
assert f"Error in component 'Default': {error}" in player.global_error
assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
assert str(error) in player.runtime_states[failing_component_id].error_message
assert player.runtime_states["comp_presenter"].state == ComponentState.NOT_RUN
def test_run_with_empty_workflow(player, mocker):
"""
Tests that running a workflow with no components completes without errors.
"""
# 1. Arrange: Clear components and connections
player._player_settings.components = []
player._player_settings.connections = []
player.runtime_states = {}
spy_get_engine = mocker.spy(player, '_get_engine')
# 2. Act
player.run()
# 3. Assert: Ensure it finishes cleanly with no data
assert not player.has_error
assert player.global_error is None
assert player.get_dataframe().empty
spy_get_engine.assert_called_once()
"""
Tests that running a workflow with no components completes without errors.
"""
# 1. Arrange: Clear components and connections
player._player_settings.components = []
player._player_settings.connections = []
player.runtime_states = {}
spy_get_engine = mocker.spy(player, '_get_engine')
# 2. Act
player.run()
# 3. Assert: Ensure it finishes cleanly with no data
assert not player.has_error
assert player.global_error is None
assert player.get_dataframe().empty
spy_get_engine.assert_called_once()
def test_run_with_global_engine_error(player, mocker):
"""
Tests a scenario where the engine reports a global error not tied to a specific component.
"""
# 1. Arrange: Mock a global engine failure
mock_engine = MagicMock()
mock_engine.has_error = True
mock_engine.errors = {} # No specific component error
mock_engine.global_error = "A simulated global engine failure"
mock_engine.run_to_list.return_value = []
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: The player should report the global error from the engine
assert player.has_error
assert player.global_error == mock_engine.global_error
"""
Tests a scenario where the engine reports a global error not tied to a specific component.
"""
# 1. Arrange: Mock a global engine failure
mock_engine = MagicMock()
mock_engine.has_error = True
mock_engine.errors = {} # No specific component error
mock_engine.global_error = "A simulated global engine failure"
mock_engine.run_to_list.return_value = []
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
# 2. Act
player.run()
# 3. Assert: The player should report the global error from the engine
assert player.has_error
assert player.global_error == mock_engine.global_error