diff --git a/src/components/entryselector/assets/EntrySelector.css b/src/components/entryselector/assets/EntrySelector.css new file mode 100644 index 0000000..e6fa317 --- /dev/null +++ b/src/components/entryselector/assets/EntrySelector.css @@ -0,0 +1,16 @@ +.es-container { + overflow-x: auto; + white-space: nowrap; + +} + +.es-entry { + border: 2px solid var(--color-base-300); + padding: 2px; + cursor: pointer; + display: inline-block; /* Ensure entries align horizontally if needed */ +} + +.es-entry:hover { + background-color: var(--color-base-300); +} \ No newline at end of file diff --git a/src/components/entryselector/components/EntrySelector.py b/src/components/entryselector/components/EntrySelector.py index 7896d2d..738bc61 100644 --- a/src/components/entryselector/components/EntrySelector.py +++ b/src/components/entryselector/components/EntrySelector.py @@ -40,7 +40,6 @@ class EntrySelector(BaseComponentMultipleInstance): def __ft__(self): return Div( *self._mk_content(), - style=f"width: {self._boundaries['width']}px;", - cls="flex", + cls="flex es-container", id=f"{self._id}", ) diff --git a/src/components/workflows/components/WorkflowDesignerProperties.py b/src/components/workflows/components/WorkflowDesignerProperties.py index 787a170..aa943d8 100644 --- a/src/components/workflows/components/WorkflowDesignerProperties.py +++ b/src/components/workflows/components/WorkflowDesignerProperties.py @@ -29,7 +29,8 @@ class WorkflowDesignerProperties(BaseComponent): self._input_entry_selector = InstanceManager.new(self._session, EntrySelector, owner=self, - content_id=f"pic_{self._id}", data=100) + content_id=f"pic_{self._id}", + data=100) self._output_entry_selector = InstanceManager.new(self._session, EntrySelector, owner=self, diff --git a/src/components/workflows/components/WorkflowPlayer.py b/src/components/workflows/components/WorkflowPlayer.py index 5e16952..75d2a1c 100644 --- a/src/components/workflows/components/WorkflowPlayer.py +++ b/src/components/workflows/components/WorkflowPlayer.py @@ -98,6 +98,8 @@ class WorkflowPlayer(BaseComponent): if component.id not in engine.errors: runtime_state.state = ComponentState.SUCCESS + runtime_state.input = engine.debug[component.id]["input"] + runtime_state.output = engine.debug[component.id]["output"] continue # the component failed @@ -177,7 +179,7 @@ class WorkflowPlayer(BaseComponent): # Return sorted components return [components_by_id[cid] for cid in sorted_order] - def _get_engine(self, sorted_components): + def _get_engine(self, sorted_components) -> WorkflowEngine: # first reorder the component, according to the connection definitions engine = WorkflowEngine() for component in sorted_components: diff --git a/src/components/workflows/db_management.py b/src/components/workflows/db_management.py index e2b00f9..dbc9d15 100644 --- a/src/components/workflows/db_management.py +++ b/src/components/workflows/db_management.py @@ -48,6 +48,8 @@ class WorkflowComponentRuntimeState: id: str state: ComponentState = ComponentState.SUCCESS error_message: str | None = None + input: list = None + output: list = None @dataclass @@ -62,7 +64,7 @@ class WorkflowsDesignerState: component_counter: int = 0 designer_height: int = 230 properties_input_width: int = None - properties_properties_width : int = None + properties_properties_width: int = None properties_output_width: int = None selected_component_id: str | None = None diff --git a/src/workflow/engine.py b/src/workflow/engine.py index e57a931..ebc687f 100644 --- a/src/workflow/engine.py +++ b/src/workflow/engine.py @@ -1,6 +1,7 @@ import ast import logging from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Any, Generator from components.admin.admin_db_manager import AdminDbManager @@ -11,6 +12,14 @@ from core.utils import UnreferencedNamesVisitor from utils.Datahelper import DataHelper +@dataclass +class WorkflowPayload: + processor_name: str + component_id: str + item_linkage_id: int + item: Any + + class DataProcessorError(Exception): def __init__(self, component_id, error): self.component_id = component_id @@ -146,35 +155,56 @@ class WorkflowEngine: self.has_error = False self.global_error = None self.errors = {} + self.debug = {} + self.item_count = -1 def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine': """Add a data processor to the pipeline.""" self.processors.append(processor) return self - def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]: + def _process_single_item(self, item_linkage_id, item: Any, processor_index: int = 0) -> Generator[Any, None, None]: """Process a single item through the remaining processors.""" if processor_index >= len(self.processors): yield item return processor = self.processors[processor_index] + if not processor.component_id in self.debug: + self.debug[processor.component_id] = {"input": [], "output": []} + + self.debug[processor.component_id]["input"].append(WorkflowPayload( + processor_name=processor.__class__.__name__, + component_id=processor.component_id, + item_linkage_id=item_linkage_id, + item=item)) # Process the item through the current processor for processed_item in processor.process(item): + self.debug[processor.component_id]["output"].append(WorkflowPayload( + processor_name=processor.__class__.__name__, + component_id=processor.component_id, + item_linkage_id=item_linkage_id, + item=processed_item)) + # Recursively process through remaining processors - yield from self._process_single_item(processed_item, processor_index + 1) + yield from self._process_single_item(item_linkage_id, processed_item, processor_index + 1) def run(self) -> Generator[Any, None, None]: """ Run the workflow pipeline and yield results one by one. The first processor must be a DataProducer. """ + + self.debug.clear() + if not self.processors: self.has_error = False self.global_error = "No processors in the pipeline" + self.item_count = -1 raise ValueError(self.global_error) + self.item_count = 0 first_processor = self.processors[0] if not isinstance(first_processor, DataProducer): @@ -182,8 +212,16 @@ class WorkflowEngine: self.global_error = "First processor must be a DataProducer" raise ValueError(self.global_error) - for item in first_processor.process(None): - yield from self._process_single_item(item, 1) + self.debug[first_processor.component_id] = {"input": [], "output": []} + + for item_linkage_id, item in enumerate(first_processor.process(None)): + self.item_count += 1 + self.debug[first_processor.component_id]["output"].append(WorkflowPayload( + processor_name=first_processor.__class__.__name__, + component_id=first_processor.component_id, + item_linkage_id=item_linkage_id, + item=item)) + yield from self._process_single_item(item_linkage_id, item, 1) def run_to_list(self) -> list[Any]: """