Added Simple Workflow Engine
This commit is contained in:
96
src/workflow/engine.py
Normal file
96
src/workflow/engine.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Generator
|
||||
|
||||
|
||||
class DataProcessor(ABC):
|
||||
"""Base class for all data processing components."""
|
||||
|
||||
@abstractmethod
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
pass
|
||||
|
||||
|
||||
class DataProducer(DataProcessor):
|
||||
"""Base class for data producers that emit data using generators."""
|
||||
|
||||
@abstractmethod
|
||||
def emit(self, data: Any = None) -> Generator[Any, None, None]:
|
||||
"""Emit data items one by one using yield. Can augment input data."""
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
yield from self.emit(data)
|
||||
|
||||
|
||||
class DataFilter(DataProcessor):
|
||||
"""Base class for data filters that process data items."""
|
||||
|
||||
@abstractmethod
|
||||
def filter(self, data: Any) -> bool:
|
||||
"""Filter data items. Return True to keep the item, False to discard it."""
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
if self.filter(data):
|
||||
yield data
|
||||
|
||||
|
||||
class DataPresenter(DataProcessor):
|
||||
"""Base class for data presenters that transform data items."""
|
||||
|
||||
@abstractmethod
|
||||
def present(self, data: Any) -> Any:
|
||||
"""Present/transform data items."""
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
yield self.present(data)
|
||||
|
||||
|
||||
class WorkflowEngine:
|
||||
"""Orchestrates the data processing pipeline using generators."""
|
||||
|
||||
def __init__(self):
|
||||
self.processors: list[DataProcessor] = []
|
||||
|
||||
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
|
||||
"""Add a data processor to the pipeline."""
|
||||
self.processors.append(processor)
|
||||
return self
|
||||
|
||||
def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
|
||||
"""Process a single item through the remaining processors."""
|
||||
if processor_index >= len(self.processors):
|
||||
yield item
|
||||
return
|
||||
|
||||
processor = self.processors[processor_index]
|
||||
|
||||
# Process the item through the current processor
|
||||
for processed_item in processor.process(item):
|
||||
# Recursively process through remaining processors
|
||||
yield from self._process_single_item(processed_item, processor_index + 1)
|
||||
|
||||
def run(self) -> Generator[Any, None, None]:
|
||||
"""
|
||||
Run the workflow pipeline and yield results one by one.
|
||||
The first processor must be a DataProducer.
|
||||
"""
|
||||
if not self.processors:
|
||||
raise ValueError("No processors in the pipeline")
|
||||
|
||||
first_processor = self.processors[0]
|
||||
|
||||
if not isinstance(first_processor, DataProducer):
|
||||
raise ValueError("First processor must be a DataProducer")
|
||||
|
||||
for item in first_processor.emit():
|
||||
yield from self._process_single_item(item, 1)
|
||||
|
||||
def run_to_list(self) -> list[Any]:
|
||||
"""
|
||||
Run the workflow and return all results as a list.
|
||||
Use this method when you need all results at once.
|
||||
"""
|
||||
return list(self.run())
|
||||
|
||||
Reference in New Issue
Block a user