import ast import logging from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Generator from components.admin.admin_db_manager import AdminDbManager from core.Expando import Expando from core.jira import Jira, JiraRequestTypes from core.preprocessor import PlainTextPreprocessor from core.utils import UnreferencedNamesVisitor from utils.Datahelper import DataHelper @dataclass class WorkflowPayload: processor_name: str component_id: str item_linkage_id: int item: Any class DataProcessorError(Exception): def __init__(self, component_id, error): self.component_id = component_id self.error = error class DataProcessor(ABC): """Base class for all data processing components.""" def __init__(self, component_id: str = None): self.component_id = component_id @abstractmethod def process(self, data: Any) -> Generator[Any, None, None]: pass class DataProducer(DataProcessor): """Base class for data producers that emit data using generators.""" @abstractmethod def emit(self, data: Any = None) -> Generator[Any, None, None]: """Emit data items one by one using yield. Can augment input data.""" pass def process(self, data: Any) -> Generator[Any, None, None]: try: yield from self.emit(data) except Exception as e: raise DataProcessorError(self.component_id, e) class DataFilter(DataProcessor): """Base class for data filters that process data items.""" @abstractmethod def filter(self, data: Any) -> bool: """Filter data items. Return True to keep the item, False to discard it.""" pass def process(self, data: Any) -> Generator[Any, None, None]: try: if self.filter(data): yield data except Exception as e: raise DataProcessorError(self.component_id, e) class DataPresenter(DataProcessor): """Base class for data presenters that transform data items.""" @abstractmethod def present(self, data: Any) -> Any: """Present/transform data items.""" pass def process(self, data: Any) -> Generator[Any, None, None]: try: yield self.present(data) except Exception as e: raise DataProcessorError(self.component_id, e) class TableDataProducer(DataProducer): """Base class for data producers that emit data from a repository.""" def __init__(self, session, settings_manager, component_id, repository_name, table_name): super().__init__(component_id) self._session = session self.settings_manager = settings_manager self.repository_name = repository_name self.table_name = table_name def emit(self, data: Any = None) -> Generator[Any, None, None]: yield from DataHelper.get(self._session, self.settings_manager, self.repository_name, self.table_name, Expando) class JiraDataProducer(DataProducer): """Base class for data producers that emit data from Jira.""" logger = logging.getLogger("DataProcessor.Producer.Jira") def __init__(self, session, settings_manager, component_id, request_type='search', request='', fields=None): super().__init__(component_id) self._session = session self.settings_manager = settings_manager self.request_type = request_type.value if isinstance(request_type, JiraRequestTypes) else request_type self.request = request self.fields = fields self.db = AdminDbManager(session, settings_manager).jira def emit(self, data: Any = None) -> Generator[Any, None, None]: self.logger.debug(f"Emitting data from Jira: {self.request_type} {self.request} {self.fields}") preprocessor = PlainTextPreprocessor() preprocessed_fields = preprocessor.preprocess(self.fields, {"data": data}) self.logger.debug(f" {preprocessed_fields=}") jira = Jira(self.db.user_name, self.db.api_token, fields=preprocessed_fields) if not hasattr(jira, self.request_type): raise ValueError(f"Invalid request type: {self.request_type}") preprocessed_request = preprocessor.preprocess(self.request, {"data": data}) self.logger.debug(f" {preprocessed_request=}") yield from getattr(jira, self.request_type)(preprocessed_request) class DefaultDataFilter(DataFilter): def __init__(self, component_id: str, filter_expression: str): super().__init__(component_id) self.filter_expression = filter_expression self._ast_tree = ast.parse(filter_expression, "", 'eval') self._compiled = compile(self._ast_tree, "", "eval") visitor = UnreferencedNamesVisitor() self._unreferenced_names = visitor.get_names(self._ast_tree) """Default data filter that returns True for all data items.""" def filter(self, data: Any) -> bool: my_locals = {name: data.get(name) for name in self._unreferenced_names if hasattr(data, name)} return eval(self._compiled, globals(), my_locals) class WorkflowEngine: """Orchestrates the data processing pipeline using generators.""" def __init__(self): self.processors: list[DataProcessor] = [] self.has_error = False self.global_error = None self.errors = {} self.debug = {} self.nb_items = -1 def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine': """Add a data processor to the pipeline.""" self.processors.append(processor) return self def _process_single_item(self, item_linkage_id, item: Any, processor_index: int = 0) -> Generator[Any, None, None]: """Process a single item through the remaining processors.""" if processor_index >= len(self.processors): yield item return processor = self.processors[processor_index] if not processor.component_id in self.debug: self.debug[processor.component_id] = {"input": [], "output": []} self.debug[processor.component_id]["input"].append(WorkflowPayload( processor_name=processor.__class__.__name__, component_id=processor.component_id, item_linkage_id=item_linkage_id, item=item)) # Process the item through the current processor for processed_item in processor.process(item): self.debug[processor.component_id]["output"].append(WorkflowPayload( processor_name=processor.__class__.__name__, component_id=processor.component_id, item_linkage_id=item_linkage_id, item=processed_item)) # Recursively process through remaining processors yield from self._process_single_item(item_linkage_id, processed_item, processor_index + 1) def run(self) -> Generator[Any, None, None]: """ Run the workflow pipeline and yield results one by one. The first processor must be a DataProducer. """ self.debug.clear() if not self.processors: self.has_error = False self.global_error = "No processors in the pipeline" self.nb_items = -1 raise ValueError(self.global_error) self.nb_items = 0 first_processor = self.processors[0] if not isinstance(first_processor, DataProducer): self.has_error = False self.global_error = "First processor must be a DataProducer" raise ValueError(self.global_error) self.debug[first_processor.component_id] = {"input": [], "output": []} for item_linkage_id, item in enumerate(first_processor.process(None)): self.nb_items += 1 self.debug[first_processor.component_id]["output"].append(WorkflowPayload( processor_name=first_processor.__class__.__name__, component_id=first_processor.component_id, item_linkage_id=item_linkage_id, item=item)) yield from self._process_single_item(item_linkage_id, item, 1) def run_to_list(self) -> list[Any]: """ Run the workflow and return all results as a list. Use this method when you need all results at once. """ try: return list(self.run()) except DataProcessorError as err: self.has_error = True self.errors[err.component_id] = err.error return [] except Exception as err: self.has_error = True self.global_error = str(err) return []