203 lines
6.7 KiB
Python
203 lines
6.7 KiB
Python
import ast
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Generator
|
|
|
|
from components.admin.admin_db_manager import AdminDbManager
|
|
from core.Expando import Expando
|
|
from core.jira import Jira, JiraRequestTypes
|
|
from core.preprocessor import PlainTextPreprocessor
|
|
from core.utils import UnreferencedNamesVisitor
|
|
from utils.Datahelper import DataHelper
|
|
|
|
|
|
class DataProcessorError(Exception):
|
|
def __init__(self, component_id, error):
|
|
self.component_id = component_id
|
|
self.error = error
|
|
|
|
|
|
class DataProcessor(ABC):
|
|
"""Base class for all data processing components."""
|
|
|
|
def __init__(self, component_id: str = None):
|
|
self.component_id = component_id
|
|
|
|
@abstractmethod
|
|
def process(self, data: Any) -> Generator[Any, None, None]:
|
|
pass
|
|
|
|
|
|
class DataProducer(DataProcessor):
|
|
"""Base class for data producers that emit data using generators."""
|
|
|
|
@abstractmethod
|
|
def emit(self, data: Any = None) -> Generator[Any, None, None]:
|
|
"""Emit data items one by one using yield. Can augment input data."""
|
|
pass
|
|
|
|
def process(self, data: Any) -> Generator[Any, None, None]:
|
|
try:
|
|
yield from self.emit(data)
|
|
|
|
except Exception as e:
|
|
raise DataProcessorError(self.component_id, e)
|
|
|
|
|
|
class DataFilter(DataProcessor):
|
|
"""Base class for data filters that process data items."""
|
|
|
|
@abstractmethod
|
|
def filter(self, data: Any) -> bool:
|
|
"""Filter data items. Return True to keep the item, False to discard it."""
|
|
pass
|
|
|
|
def process(self, data: Any) -> Generator[Any, None, None]:
|
|
try:
|
|
if self.filter(data):
|
|
yield data
|
|
|
|
except Exception as e:
|
|
raise DataProcessorError(self.component_id, e)
|
|
|
|
|
|
class DataPresenter(DataProcessor):
|
|
"""Base class for data presenters that transform data items."""
|
|
|
|
@abstractmethod
|
|
def present(self, data: Any) -> Any:
|
|
"""Present/transform data items."""
|
|
pass
|
|
|
|
def process(self, data: Any) -> Generator[Any, None, None]:
|
|
try:
|
|
yield self.present(data)
|
|
|
|
except Exception as e:
|
|
raise DataProcessorError(self.component_id, e)
|
|
|
|
|
|
class TableDataProducer(DataProducer):
|
|
"""Base class for data producers that emit data from a repository."""
|
|
|
|
def __init__(self, session, settings_manager, component_id, repository_name, table_name):
|
|
super().__init__(component_id)
|
|
self._session = session
|
|
self.settings_manager = settings_manager
|
|
self.repository_name = repository_name
|
|
self.table_name = table_name
|
|
|
|
def emit(self, data: Any = None) -> Generator[Any, None, None]:
|
|
yield from DataHelper.get(self._session, self.settings_manager, self.repository_name, self.table_name, Expando)
|
|
|
|
|
|
class JiraDataProducer(DataProducer):
|
|
"""Base class for data producers that emit data from Jira."""
|
|
|
|
logger = logging.getLogger("DataProcessor.Producer.Jira")
|
|
|
|
def __init__(self, session, settings_manager, component_id, request_type='search', request='', fields=None):
|
|
super().__init__(component_id)
|
|
self._session = session
|
|
self.settings_manager = settings_manager
|
|
self.request_type = request_type.value if isinstance(request_type, JiraRequestTypes) else request_type
|
|
self.request = request
|
|
self.fields = fields
|
|
self.db = AdminDbManager(session, settings_manager).jira
|
|
|
|
def emit(self, data: Any = None) -> Generator[Any, None, None]:
|
|
self.logger.debug(f"Emitting data from Jira: {self.request_type} {self.request} {self.fields}")
|
|
|
|
preprocessor = PlainTextPreprocessor()
|
|
preprocessed_fields = preprocessor.preprocess(self.fields, {"data": data})
|
|
self.logger.debug(f" {preprocessed_fields=}")
|
|
|
|
jira = Jira(self.db.user_name, self.db.api_token, fields=preprocessed_fields)
|
|
if not hasattr(jira, self.request_type):
|
|
raise ValueError(f"Invalid request type: {self.request_type}")
|
|
|
|
preprocessed_request = preprocessor.preprocess(self.request, {"data": data})
|
|
self.logger.debug(f" {preprocessed_request=}")
|
|
|
|
yield from getattr(jira, self.request_type)(preprocessed_request)
|
|
|
|
|
|
class DefaultDataFilter(DataFilter):
|
|
def __init__(self, component_id: str, filter_expression: str):
|
|
super().__init__(component_id)
|
|
self.filter_expression = filter_expression
|
|
self._ast_tree = ast.parse(filter_expression, "<user input>", 'eval')
|
|
self._compiled = compile(self._ast_tree, "<string>", "eval")
|
|
visitor = UnreferencedNamesVisitor()
|
|
self._unreferenced_names = visitor.get_names(self._ast_tree)
|
|
|
|
"""Default data filter that returns True for all data items."""
|
|
|
|
def filter(self, data: Any) -> bool:
|
|
my_locals = {name: data.get(name) for name in self._unreferenced_names if hasattr(data, name)}
|
|
return eval(self._compiled, globals(), my_locals)
|
|
|
|
|
|
class WorkflowEngine:
|
|
"""Orchestrates the data processing pipeline using generators."""
|
|
|
|
def __init__(self):
|
|
self.processors: list[DataProcessor] = []
|
|
self.has_error = False
|
|
self.global_error = None
|
|
self.errors = {}
|
|
|
|
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
|
|
"""Add a data processor to the pipeline."""
|
|
self.processors.append(processor)
|
|
return self
|
|
|
|
def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
|
|
"""Process a single item through the remaining processors."""
|
|
if processor_index >= len(self.processors):
|
|
yield item
|
|
return
|
|
|
|
processor = self.processors[processor_index]
|
|
|
|
# Process the item through the current processor
|
|
for processed_item in processor.process(item):
|
|
# Recursively process through remaining processors
|
|
yield from self._process_single_item(processed_item, processor_index + 1)
|
|
|
|
def run(self) -> Generator[Any, None, None]:
|
|
"""
|
|
Run the workflow pipeline and yield results one by one.
|
|
The first processor must be a DataProducer.
|
|
"""
|
|
if not self.processors:
|
|
self.has_error = False
|
|
self.global_error = "No processors in the pipeline"
|
|
raise ValueError(self.global_error)
|
|
|
|
first_processor = self.processors[0]
|
|
|
|
if not isinstance(first_processor, DataProducer):
|
|
self.has_error = False
|
|
self.global_error = "First processor must be a DataProducer"
|
|
raise ValueError(self.global_error)
|
|
|
|
for item in first_processor.process(None):
|
|
yield from self._process_single_item(item, 1)
|
|
|
|
def run_to_list(self) -> list[Any]:
|
|
"""
|
|
Run the workflow and return all results as a list.
|
|
Use this method when you need all results at once.
|
|
"""
|
|
try:
|
|
return list(self.run())
|
|
except DataProcessorError as err:
|
|
self.has_error = True
|
|
self.errors[err.component_id] = err.error
|
|
return []
|
|
except Exception as err:
|
|
self.has_error = True
|
|
self.global_error = str(err)
|
|
return []
|