Adding error management
This commit is contained in:
@@ -10,6 +10,9 @@ from utils.Datahelper import DataHelper
|
||||
class DataProcessor(ABC):
|
||||
"""Base class for all data processing components."""
|
||||
|
||||
def __init__(self, component_id: str = None):
|
||||
self.component_id = component_id
|
||||
|
||||
@abstractmethod
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
pass
|
||||
@@ -55,7 +58,8 @@ class DataPresenter(DataProcessor):
|
||||
class TableDataProducer(DataProducer):
|
||||
"""Base class for data producers that emit data from a repository."""
|
||||
|
||||
def __init__(self, session, settings_manager, repository_name, table_name):
|
||||
def __init__(self, session, settings_manager, component_id, repository_name, table_name):
|
||||
super().__init__(component_id)
|
||||
self._session = session
|
||||
self.settings_manager = settings_manager
|
||||
self.repository_name = repository_name
|
||||
@@ -68,8 +72,8 @@ class TableDataProducer(DataProducer):
|
||||
class DefaultDataPresenter(DataPresenter):
|
||||
"""Default data presenter that returns the input data unchanged."""
|
||||
|
||||
def __init__(self, columns_as_str: str):
|
||||
super().__init__()
|
||||
def __init__(self, component_id: str, columns_as_str: str):
|
||||
super().__init__(component_id)
|
||||
if not columns_as_str or columns_as_str == "*":
|
||||
self.mappings = None
|
||||
|
||||
@@ -92,8 +96,8 @@ class DefaultDataPresenter(DataPresenter):
|
||||
|
||||
|
||||
class DefaultDataFilter(DataFilter):
|
||||
def __init__(self, filter_expression: str):
|
||||
super().__init__()
|
||||
def __init__(self, component_id: str, filter_expression: str):
|
||||
super().__init__(component_id)
|
||||
self.filter_expression = filter_expression
|
||||
self._ast_tree = ast.parse(filter_expression, "<user input>", 'eval')
|
||||
self._compiled = compile(self._ast_tree, "<string>", "eval")
|
||||
@@ -112,6 +116,9 @@ class WorkflowEngine:
|
||||
|
||||
def __init__(self):
|
||||
self.processors: list[DataProcessor] = []
|
||||
self.has_error = False
|
||||
self.global_error = None
|
||||
self.errors = {}
|
||||
|
||||
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
|
||||
"""Add a data processor to the pipeline."""
|
||||
@@ -137,12 +144,16 @@ class WorkflowEngine:
|
||||
The first processor must be a DataProducer.
|
||||
"""
|
||||
if not self.processors:
|
||||
raise ValueError("No processors in the pipeline")
|
||||
self.has_error = False
|
||||
self.global_error = "No processors in the pipeline"
|
||||
raise ValueError(self.global_error)
|
||||
|
||||
first_processor = self.processors[0]
|
||||
|
||||
if not isinstance(first_processor, DataProducer):
|
||||
raise ValueError("First processor must be a DataProducer")
|
||||
self.has_error = False
|
||||
self.global_error = "First processor must be a DataProducer"
|
||||
raise ValueError(self.global_error)
|
||||
|
||||
for item in first_processor.emit():
|
||||
yield from self._process_single_item(item, 1)
|
||||
|
||||
Reference in New Issue
Block a user