From 8e718ecb672b59134c4706464c7482f7a2005734 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Fri, 4 Jul 2025 10:35:25 +0200 Subject: [PATCH] Added Simple Workflow Engine --- src/workflow/__init__.py | 0 src/workflow/engine.py | 96 ++++++++++++++++++++++++++ tests/test_workflow_engine.py | 126 ++++++++++++++++++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 src/workflow/__init__.py create mode 100644 src/workflow/engine.py create mode 100644 tests/test_workflow_engine.py diff --git a/src/workflow/__init__.py b/src/workflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/workflow/engine.py b/src/workflow/engine.py new file mode 100644 index 0000000..19317fa --- /dev/null +++ b/src/workflow/engine.py @@ -0,0 +1,96 @@ +from abc import ABC, abstractmethod +from typing import Any, Generator + + +class DataProcessor(ABC): + """Base class for all data processing components.""" + + @abstractmethod + def process(self, data: Any) -> Generator[Any, None, None]: + pass + + +class DataProducer(DataProcessor): + """Base class for data producers that emit data using generators.""" + + @abstractmethod + def emit(self, data: Any = None) -> Generator[Any, None, None]: + """Emit data items one by one using yield. Can augment input data.""" + pass + + def process(self, data: Any) -> Generator[Any, None, None]: + yield from self.emit(data) + + +class DataFilter(DataProcessor): + """Base class for data filters that process data items.""" + + @abstractmethod + def filter(self, data: Any) -> bool: + """Filter data items. Return True to keep the item, False to discard it.""" + pass + + def process(self, data: Any) -> Generator[Any, None, None]: + if self.filter(data): + yield data + + +class DataPresenter(DataProcessor): + """Base class for data presenters that transform data items.""" + + @abstractmethod + def present(self, data: Any) -> Any: + """Present/transform data items.""" + pass + + def process(self, data: Any) -> Generator[Any, None, None]: + yield self.present(data) + + +class WorkflowEngine: + """Orchestrates the data processing pipeline using generators.""" + + def __init__(self): + self.processors: list[DataProcessor] = [] + + def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine': + """Add a data processor to the pipeline.""" + self.processors.append(processor) + return self + + def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]: + """Process a single item through the remaining processors.""" + if processor_index >= len(self.processors): + yield item + return + + processor = self.processors[processor_index] + + # Process the item through the current processor + for processed_item in processor.process(item): + # Recursively process through remaining processors + yield from self._process_single_item(processed_item, processor_index + 1) + + def run(self) -> Generator[Any, None, None]: + """ + Run the workflow pipeline and yield results one by one. + The first processor must be a DataProducer. + """ + if not self.processors: + raise ValueError("No processors in the pipeline") + + first_processor = self.processors[0] + + if not isinstance(first_processor, DataProducer): + raise ValueError("First processor must be a DataProducer") + + for item in first_processor.emit(): + yield from self._process_single_item(item, 1) + + def run_to_list(self) -> list[Any]: + """ + Run the workflow and return all results as a list. + Use this method when you need all results at once. + """ + return list(self.run()) + diff --git a/tests/test_workflow_engine.py b/tests/test_workflow_engine.py new file mode 100644 index 0000000..7f365b0 --- /dev/null +++ b/tests/test_workflow_engine.py @@ -0,0 +1,126 @@ +from unittest.mock import MagicMock + +import pytest + +from workflow.engine import WorkflowEngine, DataProcessor, DataProducer, DataFilter, DataPresenter + + +@pytest.fixture +def engine(): + """Fixture that provides a fresh WorkflowEngine instance for each test.""" + return WorkflowEngine() + + +def test_empty_workflow_initialization(engine): + """Test that a new WorkflowEngine has no processors.""" + assert len(engine.processors) == 0 + + +def test_add_processor(engine): + """Test adding processors to the workflow.""" + mock_processor = MagicMock(spec=DataProcessor) + engine.add_processor(mock_processor) + assert len(engine.processors) == 1 + assert engine.processors[0] is mock_processor + + +def test_run_empty_workflow(engine): + """Test that running an empty workflow raises ValueError.""" + with pytest.raises(ValueError, match="No processors in the pipeline"): + list(engine.run()) + + +def test_run_without_producer_first(engine): + """Test that running a workflow without a DataProducer first raises ValueError.""" + mock_filter = MagicMock(spec=DataFilter) + engine.add_processor(mock_filter) + + with pytest.raises(ValueError, match="First processor must be a DataProducer"): + list(engine.run()) + + +def test_run_simple_workflow(engine): + """Test running a workflow with just a producer.""" + + class SimpleProducer(DataProducer): + def emit(self, data=None): + yield 1 + yield 2 + yield 3 + + engine.add_processor(SimpleProducer()) + result = list(engine.run()) + assert result == [1, 2, 3] + + +def test_process_single_item(engine): + """Test the internal _process_single_item method.""" + mock_processor = MagicMock(spec=DataProcessor) + mock_processor.process.return_value = iter([42]) + + engine.add_processor(mock_processor) + result = list(engine._process_single_item(10, 0)) # 10 is a dummy value for the first item + + mock_processor.process.assert_called_once_with(10) + assert result == [42] + + +def test_run_to_list(engine): + """Test run_to_list returns all results as a list.""" + + class SimpleProducer(DataProducer): + def emit(self, data=None): + yield 1 + yield 2 + + engine.add_processor(SimpleProducer()) + result = engine.run_to_list() + assert result == [1, 2] + assert isinstance(result, list) + + +def test_complex_workflow(): + """Test a complex workflow with multiple processors.""" + + # Define test processors + class NumberProducer(DataProducer): + def emit(self, data=None): + for i in range(1, 6): # 1 to 5 + yield i + + class EvenFilter(DataFilter): + def filter(self, data): + return data % 2 == 0 # Keep even numbers + + class Doubler(DataPresenter): + def present(self, data): + return data * 2 + + # Create and run workflow + workflow = WorkflowEngine() + workflow.add_processor(NumberProducer()) + workflow.add_processor(EvenFilter()) + workflow.add_processor(Doubler()) + + result = workflow.run_to_list() + assert result == [4, 8] # Even numbers (2, 4) doubled + + +def test_branching_workflow(engine): + """Test a workflow with branching outputs.""" + + class BranchingProcessor(DataProducer): + def emit(self, data=None): + yield data + yield data * 10 + + class SimpleProducer(DataProducer): + def emit(self, data=None): + yield 1 + yield 2 + + engine.add_processor(SimpleProducer()) + engine.add_processor(BranchingProcessor()) + + result = engine.run_to_list() + assert result == [1, 10, 2, 20]