Adding error management
This commit is contained in:
@@ -72,7 +72,8 @@ class WorkflowDesigner(BaseComponent):
|
||||
settings_manager=self._settings_manager,
|
||||
tabs_manager=self.tabs_manager,
|
||||
player_settings=WorkflowsPlayerSettings(workflow_name,
|
||||
list(self._state.components.values())),
|
||||
list(self._state.components.values()),
|
||||
self._state.connections),
|
||||
boundaries=boundaries)
|
||||
|
||||
self._error_message = None
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import pandas as pd
|
||||
from fasthtml.components import *
|
||||
from collections import deque
|
||||
|
||||
from components.BaseComponent import BaseComponent
|
||||
from components.datagrid_new.components.DataGrid import DataGrid
|
||||
from components.datagrid_new.settings import DataGridSettings
|
||||
from components.workflows.commands import WorkflowPlayerCommandManager
|
||||
from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes
|
||||
from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState
|
||||
from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, WorkflowComponent
|
||||
from core.instance_manager import InstanceManager
|
||||
from core.utils import get_unique_id, make_safe_id
|
||||
from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter
|
||||
@@ -19,6 +20,8 @@ grid_settings = DataGridSettings(
|
||||
open_settings_visible=False)
|
||||
|
||||
|
||||
|
||||
|
||||
class WorkflowPlayer(BaseComponent):
|
||||
def __init__(self, session,
|
||||
_id=None,
|
||||
@@ -30,7 +33,7 @@ class WorkflowPlayer(BaseComponent):
|
||||
self._settings_manager = settings_manager
|
||||
self.tabs_manager = tabs_manager
|
||||
self.key = f"__WorkflowPlayer_{player_settings.workflow_name}"
|
||||
self._player_settings = player_settings
|
||||
self._player_settings : WorkflowsPlayerSettings = player_settings
|
||||
self._boundaries = boundaries
|
||||
self.commands = WorkflowPlayerCommandManager(self)
|
||||
self._datagrid = InstanceManager.get(self._session,
|
||||
@@ -43,22 +46,7 @@ class WorkflowPlayer(BaseComponent):
|
||||
self.global_error = False
|
||||
|
||||
def run(self):
|
||||
engine = WorkflowEngine()
|
||||
for component in self._player_settings.components:
|
||||
if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository":
|
||||
engine.add_processor(
|
||||
TableDataProducer(self._session,
|
||||
self._settings_manager,
|
||||
component.id,
|
||||
component.properties["repository"],
|
||||
component.properties["table"]))
|
||||
|
||||
elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default":
|
||||
engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"]))
|
||||
|
||||
elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default":
|
||||
engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"]))
|
||||
|
||||
engine = self._get_engine()
|
||||
res = engine.run_to_list()
|
||||
|
||||
if engine.has_error:
|
||||
@@ -80,6 +68,79 @@ class WorkflowPlayer(BaseComponent):
|
||||
self._datagrid,
|
||||
id=self._id,
|
||||
)
|
||||
|
||||
def _get_sorted_components(self) -> list[WorkflowComponent]:
|
||||
"""
|
||||
Sorts the workflow components based on their connections using topological sort.
|
||||
|
||||
- A connection from component A to B means A must come before B.
|
||||
- Raises a ValueError if a cycle is detected.
|
||||
- Raises a ValueError if a connection references a non-existent component.
|
||||
- Ignores components that are not part of any connection.
|
||||
|
||||
:return: A list of sorted WorkflowComponent objects.
|
||||
"""
|
||||
components_by_id = {c.id: c for c in self._player_settings.components}
|
||||
|
||||
# Get all component IDs involved in connections
|
||||
involved_ids = set()
|
||||
for conn in self._player_settings.connections:
|
||||
involved_ids.add(conn.from_id)
|
||||
involved_ids.add(conn.to_id)
|
||||
|
||||
# Check if all involved components exist
|
||||
for component_id in involved_ids:
|
||||
if component_id not in components_by_id:
|
||||
raise ValueError(f"Component with ID '{component_id}' referenced in connections but does not exist.")
|
||||
|
||||
# Build the graph (adjacency list and in-degrees) for involved components
|
||||
adj = {cid: [] for cid in involved_ids}
|
||||
in_degree = {cid: 0 for cid in involved_ids}
|
||||
|
||||
for conn in self._player_settings.connections:
|
||||
# from_id -> to_id
|
||||
adj[conn.from_id].append(conn.to_id)
|
||||
in_degree[conn.to_id] += 1
|
||||
|
||||
# Find all sources (nodes with in-degree 0)
|
||||
queue = deque([cid for cid in involved_ids if in_degree[cid] == 0])
|
||||
|
||||
sorted_order = []
|
||||
while queue:
|
||||
u = queue.popleft()
|
||||
sorted_order.append(u)
|
||||
|
||||
for v in adj.get(u, []):
|
||||
in_degree[v] -= 1
|
||||
if in_degree[v] == 0:
|
||||
queue.append(v)
|
||||
|
||||
# Check for cycles
|
||||
if len(sorted_order) != len(involved_ids):
|
||||
raise ValueError("A cycle was detected in the workflow connections.")
|
||||
|
||||
# Return sorted components
|
||||
return [components_by_id[cid] for cid in sorted_order]
|
||||
|
||||
def _get_engine(self):
|
||||
# first reorder the component, according to the connection definitions
|
||||
sorted_components = self._get_sorted_components()
|
||||
engine = WorkflowEngine()
|
||||
for component in sorted_components:
|
||||
if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository":
|
||||
engine.add_processor(
|
||||
TableDataProducer(self._session,
|
||||
self._settings_manager,
|
||||
component.id,
|
||||
component.properties["repository"],
|
||||
component.properties["table"]))
|
||||
|
||||
elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default":
|
||||
engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"]))
|
||||
|
||||
elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default":
|
||||
engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"]))
|
||||
return engine
|
||||
|
||||
@staticmethod
|
||||
def create_component_id(session, suffix=None):
|
||||
@@ -87,4 +148,4 @@ class WorkflowPlayer(BaseComponent):
|
||||
if suffix is None:
|
||||
suffix = get_unique_id()
|
||||
|
||||
return make_safe_id(f"{prefix}{suffix}")
|
||||
return make_safe_id(f"{prefix}{suffix}")
|
||||
@@ -49,8 +49,9 @@ class WorkflowsDesignerState:
|
||||
|
||||
@dataclass
|
||||
class WorkflowsPlayerSettings:
|
||||
workflow_name: str = "No Name"
|
||||
components: list[WorkflowComponent] = None
|
||||
workflow_name: str
|
||||
components: list[WorkflowComponent]
|
||||
connections: list[Connection]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -7,6 +7,12 @@ from core.utils import UnreferencedNamesVisitor
|
||||
from utils.Datahelper import DataHelper
|
||||
|
||||
|
||||
class DataProcessorError(Exception):
|
||||
def __init__(self, component_id, error):
|
||||
self.component_id = component_id
|
||||
self.error = error
|
||||
|
||||
|
||||
class DataProcessor(ABC):
|
||||
"""Base class for all data processing components."""
|
||||
|
||||
@@ -27,7 +33,11 @@ class DataProducer(DataProcessor):
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
yield from self.emit(data)
|
||||
try:
|
||||
yield from self.emit(data)
|
||||
|
||||
except Exception as e:
|
||||
raise DataProcessorError(self.component_id, e)
|
||||
|
||||
|
||||
class DataFilter(DataProcessor):
|
||||
@@ -39,8 +49,12 @@ class DataFilter(DataProcessor):
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
if self.filter(data):
|
||||
yield data
|
||||
try:
|
||||
if self.filter(data):
|
||||
yield data
|
||||
|
||||
except Exception as e:
|
||||
raise DataProcessorError(self.component_id, e)
|
||||
|
||||
|
||||
class DataPresenter(DataProcessor):
|
||||
@@ -52,7 +66,11 @@ class DataPresenter(DataProcessor):
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Generator[Any, None, None]:
|
||||
yield self.present(data)
|
||||
try:
|
||||
yield self.present(data)
|
||||
|
||||
except Exception as e:
|
||||
raise DataProcessorError(self.component_id, e)
|
||||
|
||||
|
||||
class TableDataProducer(DataProducer):
|
||||
|
||||
Reference in New Issue
Block a user