from unittest.mock import MagicMock import pytest from workflow.engine import WorkflowEngine, DataProcessor, DataProducer, DataFilter, DataPresenter @pytest.fixture def engine(): """Fixture that provides a fresh WorkflowEngine instance for each test.""" return WorkflowEngine() def test_empty_workflow_initialization(engine): """Test that a new WorkflowEngine has no processors.""" assert len(engine.processors) == 0 def test_add_processor(engine): """Test adding processors to the workflow.""" mock_processor = MagicMock(spec=DataProcessor) engine.add_processor(mock_processor) assert len(engine.processors) == 1 assert engine.processors[0] is mock_processor def test_run_empty_workflow(engine): """Test that running an empty workflow raises ValueError.""" with pytest.raises(ValueError, match="No processors in the pipeline"): list(engine.run()) def test_run_without_producer_first(engine): """Test that running a workflow without a DataProducer first raises ValueError.""" mock_filter = MagicMock(spec=DataFilter) engine.add_processor(mock_filter) with pytest.raises(ValueError, match="First processor must be a DataProducer"): list(engine.run()) def test_run_simple_workflow(engine): """Test running a workflow with just a producer.""" class SimpleProducer(DataProducer): def emit(self, data=None): yield 1 yield 2 yield 3 engine.add_processor(SimpleProducer()) result = list(engine.run()) assert result == [1, 2, 3] def test_process_single_item(engine): """Test the internal _process_single_item method.""" mock_processor = MagicMock(spec=DataProcessor) mock_processor.process.return_value = iter([42]) engine.add_processor(mock_processor) result = list(engine._process_single_item(10, 0)) # 10 is a dummy value for the first item mock_processor.process.assert_called_once_with(10) assert result == [42] def test_run_to_list(engine): """Test run_to_list returns all results as a list.""" class SimpleProducer(DataProducer): def emit(self, data=None): yield 1 yield 2 engine.add_processor(SimpleProducer()) result = engine.run_to_list() assert result == [1, 2] assert isinstance(result, list) def test_complex_workflow(): """Test a complex workflow with multiple processors.""" # Define test processors class NumberProducer(DataProducer): def emit(self, data=None): for i in range(1, 6): # 1 to 5 yield i class EvenFilter(DataFilter): def filter(self, data): return data % 2 == 0 # Keep even numbers class Doubler(DataPresenter): def present(self, data): return data * 2 # Create and run workflow workflow = WorkflowEngine() workflow.add_processor(NumberProducer()) workflow.add_processor(EvenFilter()) workflow.add_processor(Doubler()) result = workflow.run_to_list() assert result == [4, 8] # Even numbers (2, 4) doubled def test_branching_workflow(engine): """Test a workflow with branching outputs.""" class BranchingProcessor(DataProducer): def emit(self, data=None): yield data yield data * 10 class SimpleProducer(DataProducer): def emit(self, data=None): yield 1 yield 2 engine.add_processor(SimpleProducer()) engine.add_processor(BranchingProcessor()) result = engine.run_to_list() assert result == [1, 10, 2, 20]