Fixed unit tests
This commit is contained in:
@@ -66,64 +66,48 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
self.global_error = "No connections defined."
|
self.global_error = "No connections defined."
|
||||||
return
|
return
|
||||||
|
|
||||||
self._init_state()
|
self._init_state(ComponentState.NOT_RUN)
|
||||||
|
|
||||||
components_by_id = {c.id: c for c in self._designer.get_workflow_components()}
|
components_by_id = {c.id: c for c in self._designer.get_workflow_components()}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sorted_components = self._get_sorted_components()
|
sorted_components = self._get_sorted_components()
|
||||||
|
engine = self._get_engine(sorted_components)
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
# Handle workflow structure errors (e.g., cycles)
|
# Handle workflow structure errors (e.g., cycles)
|
||||||
|
self.has_error = True
|
||||||
self.global_error = f"Workflow configuration error: {e}"
|
self.global_error = f"Workflow configuration error: {e}"
|
||||||
self._datagrid.init_from_dataframe(pd.DataFrame([]))
|
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
|
||||||
engine = self._get_engine()
|
|
||||||
except WorkflowsPlayerError as ex:
|
except WorkflowsPlayerError as ex:
|
||||||
|
self.has_error = True
|
||||||
|
self.global_error = f"Failed to init component '{ex.component_id}': {ex.error}"
|
||||||
if ex.component_id in self.runtime_states:
|
if ex.component_id in self.runtime_states:
|
||||||
self.runtime_states[ex.component_id].state = ComponentState.FAILURE
|
self.runtime_states[ex.component_id].state = ComponentState.FAILURE
|
||||||
self.runtime_states[ex.component_id].error_message = str(ex.error)
|
self.runtime_states[ex.component_id].error_message = str(ex.error)
|
||||||
self.global_error = f"Failed to init component '{ex.component_id}': {ex.error}"
|
|
||||||
return
|
return
|
||||||
|
|
||||||
res = engine.run_to_list()
|
res = engine.run_to_list()
|
||||||
|
|
||||||
if engine.has_error:
|
if engine.has_error and not engine.errors:
|
||||||
self.has_error = True
|
self.has_error = True
|
||||||
|
self.global_error = engine.global_error
|
||||||
if not engine.errors:
|
|
||||||
self.global_error = engine.global_error
|
else: # loop through the components and update the runtime states
|
||||||
|
for component in sorted_components:
|
||||||
else:
|
runtime_state = self.runtime_states.get(component.id)
|
||||||
# Determine component states by simulating a "stop-on-fail" execution
|
|
||||||
first_failure_found = False
|
if component.id not in engine.errors:
|
||||||
for component in sorted_components:
|
runtime_state.state = ComponentState.SUCCESS
|
||||||
runtime_state = self.runtime_states.get(component.id)
|
continue
|
||||||
if not runtime_state:
|
|
||||||
continue
|
# the component failed
|
||||||
|
error = engine.errors[component.id]
|
||||||
if first_failure_found:
|
runtime_state.state = ComponentState.FAILURE
|
||||||
# After a failure, all subsequent components are marked as NOT_RUN
|
runtime_state.error_message = str(error)
|
||||||
runtime_state.state = ComponentState.NOT_RUN
|
self.global_error = f"Error in component '{error.component_id}': {error.error}" # update global error as well
|
||||||
continue
|
self.has_error = True
|
||||||
|
break # the remaining components will remain as NOT_RUN
|
||||||
if component.id in engine.errors:
|
|
||||||
# This is the first component that failed
|
|
||||||
first_failure_found = True
|
|
||||||
error = engine.errors[component.id]
|
|
||||||
runtime_state.state = ComponentState.FAILURE
|
|
||||||
runtime_state.error_message = str(error)
|
|
||||||
|
|
||||||
# As requested, display the component error in the global error area
|
|
||||||
component_props = components_by_id[component.id].properties
|
|
||||||
component_name = component_props.get("processor_name", f"ID: {component.id}")
|
|
||||||
self.global_error = f"Error in component '{component_name}': {str(error)}"
|
|
||||||
else:
|
|
||||||
# This component ran successfully
|
|
||||||
runtime_state.state = ComponentState.SUCCESS
|
|
||||||
else:
|
|
||||||
self.has_error = False
|
|
||||||
|
|
||||||
data = [row.as_dict() for row in res]
|
data = [row.as_dict() for row in res]
|
||||||
df = pd.DataFrame(data)
|
df = pd.DataFrame(data)
|
||||||
@@ -194,9 +178,8 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
# Return sorted components
|
# Return sorted components
|
||||||
return [components_by_id[cid] for cid in sorted_order]
|
return [components_by_id[cid] for cid in sorted_order]
|
||||||
|
|
||||||
def _get_engine(self):
|
def _get_engine(self, sorted_components):
|
||||||
# first reorder the component, according to the connection definitions
|
# first reorder the component, according to the connection definitions
|
||||||
sorted_components = self._get_sorted_components()
|
|
||||||
engine = WorkflowEngine()
|
engine = WorkflowEngine()
|
||||||
for component in sorted_components:
|
for component in sorted_components:
|
||||||
try:
|
try:
|
||||||
@@ -218,10 +201,10 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
|
|
||||||
return engine
|
return engine
|
||||||
|
|
||||||
def _init_state(self):
|
def _init_state(self, state: ComponentState = ComponentState.SUCCESS):
|
||||||
self.global_error = None
|
self.global_error = None
|
||||||
self.has_error = False
|
self.has_error = False
|
||||||
self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id)
|
self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id, state)
|
||||||
for component in self._designer.get_workflow_components()}
|
for component in self._designer.get_workflow_components()}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ class WorkflowComponentRuntimeState:
|
|||||||
Represents the runtime state of a single workflow component.
|
Represents the runtime state of a single workflow component.
|
||||||
"""
|
"""
|
||||||
id: str
|
id: str
|
||||||
state: ComponentState = ComponentState.NOT_RUN
|
state: ComponentState = ComponentState.SUCCESS
|
||||||
error_message: str | None = None
|
error_message: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from components.workflows.constants import ProcessorTypes
|
|||||||
from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings
|
from components.workflows.db_management import WorkflowComponent, Connection, ComponentState, WorkflowsDesignerSettings
|
||||||
from core.settings_management import SettingsManager, MemoryDbEngine
|
from core.settings_management import SettingsManager, MemoryDbEngine
|
||||||
from my_mocks import tabs_manager
|
from my_mocks import tabs_manager
|
||||||
|
from workflow.engine import DataProcessorError
|
||||||
|
|
||||||
TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id"
|
TEST_WORKFLOW_DESIGNER_ID = "workflow_designer_id"
|
||||||
TEST_WORKFLOW_PLAYER_ID = "workflow_player_id"
|
TEST_WORKFLOW_PLAYER_ID = "workflow_player_id"
|
||||||
@@ -63,6 +64,9 @@ def designer(session, settings_manager, tabs_manager):
|
|||||||
{"height": 500, "width": 800}
|
{"height": 500, "width": 800}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
designer._state.components = {c.id: c for c in components}
|
||||||
|
designer._state.connections = connections
|
||||||
|
|
||||||
return designer
|
return designer
|
||||||
|
|
||||||
|
|
||||||
@@ -117,7 +121,7 @@ def test_run_with_cyclical_dependency(player, mocker):
|
|||||||
Tests that a workflow with a cycle is detected and handled before execution.
|
Tests that a workflow with a cycle is detected and handled before execution.
|
||||||
"""
|
"""
|
||||||
# 1. Arrange: Introduce a cycle and spy on engine creation
|
# 1. Arrange: Introduce a cycle and spy on engine creation
|
||||||
player._player_settings.connections.append(Connection("conn_3", "comp_presenter", "comp_producer"))
|
player._designer._state.connections.append(Connection("conn_3", "comp_presenter", "comp_producer"))
|
||||||
spy_get_engine = mocker.spy(player, '_get_engine')
|
spy_get_engine = mocker.spy(player, '_get_engine')
|
||||||
|
|
||||||
# 2. Act
|
# 2. Act
|
||||||
@@ -126,7 +130,6 @@ def test_run_with_cyclical_dependency(player, mocker):
|
|||||||
# 3. Assert: Check for the specific cycle error
|
# 3. Assert: Check for the specific cycle error
|
||||||
assert player.has_error
|
assert player.has_error
|
||||||
assert "Workflow configuration error: A cycle was detected" in player.global_error
|
assert "Workflow configuration error: A cycle was detected" in player.global_error
|
||||||
assert player.get_dataframe().empty
|
|
||||||
spy_get_engine.assert_not_called()
|
spy_get_engine.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@@ -159,7 +162,7 @@ def test_run_with_failure_in_middle_component(player, mocker):
|
|||||||
mock_engine.has_error = True
|
mock_engine.has_error = True
|
||||||
failing_component_id = "comp_filter"
|
failing_component_id = "comp_filter"
|
||||||
error = RuntimeError("Data processing failed unexpectedly")
|
error = RuntimeError("Data processing failed unexpectedly")
|
||||||
mock_engine.errors = {failing_component_id: error}
|
mock_engine.errors = {failing_component_id: DataProcessorError(failing_component_id, error)}
|
||||||
mock_engine.run_to_list.return_value = []
|
mock_engine.run_to_list.return_value = []
|
||||||
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
|
mocker.patch.object(player, '_get_engine', return_value=mock_engine)
|
||||||
|
|
||||||
@@ -168,7 +171,7 @@ def test_run_with_failure_in_middle_component(player, mocker):
|
|||||||
|
|
||||||
# 3. Assert: Check the state of each component in the chain
|
# 3. Assert: Check the state of each component in the chain
|
||||||
assert player.has_error
|
assert player.has_error
|
||||||
assert f"Error in component 'Default': {error}" in player.global_error
|
assert f"Error in component 'comp_filter':" in player.global_error
|
||||||
assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS
|
assert player.runtime_states["comp_producer"].state == ComponentState.SUCCESS
|
||||||
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
|
assert player.runtime_states[failing_component_id].state == ComponentState.FAILURE
|
||||||
assert str(error) in player.runtime_states[failing_component_id].error_message
|
assert str(error) in player.runtime_states[failing_component_id].error_message
|
||||||
@@ -180,9 +183,8 @@ def test_run_with_empty_workflow(player, mocker):
|
|||||||
Tests that running a workflow with no components completes without errors.
|
Tests that running a workflow with no components completes without errors.
|
||||||
"""
|
"""
|
||||||
# 1. Arrange: Clear components and connections
|
# 1. Arrange: Clear components and connections
|
||||||
player._player_settings.components = []
|
player._designer._state.components = {}
|
||||||
player._player_settings.connections = []
|
player._designer._state.connections = []
|
||||||
player.runtime_states = {}
|
|
||||||
spy_get_engine = mocker.spy(player, '_get_engine')
|
spy_get_engine = mocker.spy(player, '_get_engine')
|
||||||
|
|
||||||
# 2. Act
|
# 2. Act
|
||||||
@@ -190,9 +192,8 @@ def test_run_with_empty_workflow(player, mocker):
|
|||||||
|
|
||||||
# 3. Assert: Ensure it finishes cleanly with no data
|
# 3. Assert: Ensure it finishes cleanly with no data
|
||||||
assert not player.has_error
|
assert not player.has_error
|
||||||
assert player.global_error is None
|
assert player.global_error == 'No connections defined.'
|
||||||
assert player.get_dataframe().empty
|
spy_get_engine.assert_not_called()
|
||||||
spy_get_engine.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_run_with_global_engine_error(player, mocker):
|
def test_run_with_global_engine_error(player, mocker):
|
||||||
|
|||||||
Reference in New Issue
Block a user