Files
MyManagingTools/tests/test_workflow_engine.py

127 lines
3.3 KiB
Python

from unittest.mock import MagicMock
import pytest
from workflow.engine import WorkflowEngine, DataProcessor, DataProducer, DataFilter, DataPresenter
@pytest.fixture
def engine():
"""Fixture that provides a fresh WorkflowEngine instance for each test."""
return WorkflowEngine()
def test_empty_workflow_initialization(engine):
"""Test that a new WorkflowEngine has no processors."""
assert len(engine.processors) == 0
def test_add_processor(engine):
"""Test adding processors to the workflow."""
mock_processor = MagicMock(spec=DataProcessor)
engine.add_processor(mock_processor)
assert len(engine.processors) == 1
assert engine.processors[0] is mock_processor
def test_run_empty_workflow(engine):
"""Test that running an empty workflow raises ValueError."""
with pytest.raises(ValueError, match="No processors in the pipeline"):
list(engine.run())
def test_run_without_producer_first(engine):
"""Test that running a workflow without a DataProducer first raises ValueError."""
mock_filter = MagicMock(spec=DataFilter)
engine.add_processor(mock_filter)
with pytest.raises(ValueError, match="First processor must be a DataProducer"):
list(engine.run())
def test_run_simple_workflow(engine):
"""Test running a workflow with just a producer."""
class SimpleProducer(DataProducer):
def emit(self, data=None):
yield 1
yield 2
yield 3
engine.add_processor(SimpleProducer())
result = list(engine.run())
assert result == [1, 2, 3]
def test_process_single_item(engine):
"""Test the internal _process_single_item method."""
mock_processor = MagicMock(spec=DataProcessor)
mock_processor.process.return_value = iter([42])
engine.add_processor(mock_processor)
result = list(engine._process_single_item(10, 0)) # 10 is a dummy value for the first item
mock_processor.process.assert_called_once_with(10)
assert result == [42]
def test_run_to_list(engine):
"""Test run_to_list returns all results as a list."""
class SimpleProducer(DataProducer):
def emit(self, data=None):
yield 1
yield 2
engine.add_processor(SimpleProducer())
result = engine.run_to_list()
assert result == [1, 2]
assert isinstance(result, list)
def test_complex_workflow():
"""Test a complex workflow with multiple processors."""
# Define test processors
class NumberProducer(DataProducer):
def emit(self, data=None):
for i in range(1, 6): # 1 to 5
yield i
class EvenFilter(DataFilter):
def filter(self, data):
return data % 2 == 0 # Keep even numbers
class Doubler(DataPresenter):
def present(self, data):
return data * 2
# Create and run workflow
workflow = WorkflowEngine()
workflow.add_processor(NumberProducer())
workflow.add_processor(EvenFilter())
workflow.add_processor(Doubler())
result = workflow.run_to_list()
assert result == [4, 8] # Even numbers (2, 4) doubled
def test_branching_workflow(engine):
"""Test a workflow with branching outputs."""
class BranchingProcessor(DataProducer):
def emit(self, data=None):
yield data
yield data * 10
class SimpleProducer(DataProducer):
def emit(self, data=None):
yield 1
yield 2
engine.add_processor(SimpleProducer())
engine.add_processor(BranchingProcessor())
result = engine.run_to_list()
assert result == [1, 10, 2, 20]