Added traceability

This commit is contained in:
2025-08-25 23:20:10 +02:00
parent 957a92f903
commit 63058ef4a9
6 changed files with 67 additions and 9 deletions

View File

@@ -1,6 +1,7 @@
import ast
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Generator
from components.admin.admin_db_manager import AdminDbManager
@@ -11,6 +12,14 @@ from core.utils import UnreferencedNamesVisitor
from utils.Datahelper import DataHelper
@dataclass
class WorkflowPayload:
processor_name: str
component_id: str
item_linkage_id: int
item: Any
class DataProcessorError(Exception):
def __init__(self, component_id, error):
self.component_id = component_id
@@ -146,35 +155,56 @@ class WorkflowEngine:
self.has_error = False
self.global_error = None
self.errors = {}
self.debug = {}
self.item_count = -1
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
"""Add a data processor to the pipeline."""
self.processors.append(processor)
return self
def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
def _process_single_item(self, item_linkage_id, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
"""Process a single item through the remaining processors."""
if processor_index >= len(self.processors):
yield item
return
processor = self.processors[processor_index]
if not processor.component_id in self.debug:
self.debug[processor.component_id] = {"input": [], "output": []}
self.debug[processor.component_id]["input"].append(WorkflowPayload(
processor_name=processor.__class__.__name__,
component_id=processor.component_id,
item_linkage_id=item_linkage_id,
item=item))
# Process the item through the current processor
for processed_item in processor.process(item):
self.debug[processor.component_id]["output"].append(WorkflowPayload(
processor_name=processor.__class__.__name__,
component_id=processor.component_id,
item_linkage_id=item_linkage_id,
item=processed_item))
# Recursively process through remaining processors
yield from self._process_single_item(processed_item, processor_index + 1)
yield from self._process_single_item(item_linkage_id, processed_item, processor_index + 1)
def run(self) -> Generator[Any, None, None]:
"""
Run the workflow pipeline and yield results one by one.
The first processor must be a DataProducer.
"""
self.debug.clear()
if not self.processors:
self.has_error = False
self.global_error = "No processors in the pipeline"
self.item_count = -1
raise ValueError(self.global_error)
self.item_count = 0
first_processor = self.processors[0]
if not isinstance(first_processor, DataProducer):
@@ -182,8 +212,16 @@ class WorkflowEngine:
self.global_error = "First processor must be a DataProducer"
raise ValueError(self.global_error)
for item in first_processor.process(None):
yield from self._process_single_item(item, 1)
self.debug[first_processor.component_id] = {"input": [], "output": []}
for item_linkage_id, item in enumerate(first_processor.process(None)):
self.item_count += 1
self.debug[first_processor.component_id]["output"].append(WorkflowPayload(
processor_name=first_processor.__class__.__name__,
component_id=first_processor.component_id,
item_linkage_id=item_linkage_id,
item=item))
yield from self._process_single_item(item_linkage_id, item, 1)
def run_to_list(self) -> list[Any]:
"""