Adding visual return when error

This commit is contained in:
2025-07-12 09:52:56 +02:00
parent d0f7536fa0
commit 2754312141
7 changed files with 163 additions and 76 deletions

View File

@@ -1,3 +1,4 @@
from fasthtml.components import Html
from fasthtml.components import * from fasthtml.components import *
from fasthtml.xtend import Script from fasthtml.xtend import Script

View File

@@ -98,6 +98,7 @@
background: var(--color-error); background: var(--color-error);
} }
.wkf-component-content { .wkf-component-content {
padding: 0.75rem; /* p-3 in Tailwind */ padding: 0.75rem; /* p-3 in Tailwind */
border-radius: 0.5rem; /* rounded-lg in Tailwind */ border-radius: 0.5rem; /* rounded-lg in Tailwind */
@@ -108,6 +109,13 @@
align-items: center; /* items-center in Tailwind */ align-items: center; /* items-center in Tailwind */
} }
.wkf-component-content.error {
background: var(--color-error);
}
.wkf-component-content.not-run {
background: var(--color-neutral);
}
.wkf-connection-line { .wkf-connection-line {
position: absolute; position: absolute;

View File

@@ -11,7 +11,7 @@ from components.workflows.commands import WorkflowDesignerCommandManager
from components.workflows.components.WorkflowPlayer import WorkflowPlayer from components.workflows.components.WorkflowPlayer import WorkflowPlayer
from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes
from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \ from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \
Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState Connection, WorkflowsDesignerDbManager, WorkflowsPlayerSettings, WorkflowComponentRuntimeState, ComponentState
from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon
from core.instance_manager import InstanceManager from core.instance_manager import InstanceManager
from core.utils import get_unique_id, make_safe_id from core.utils import get_unique_id, make_safe_id
@@ -84,8 +84,8 @@ class WorkflowDesigner(BaseComponent):
def refresh_designer(self): def refresh_designer(self):
return self._mk_elements() return self._mk_elements()
def refresh_properties(self): def refresh_properties(self, oob=False):
return self._mk_properties() return self._mk_properties(oob)
def add_component(self, component_type, x, y): def add_component(self, component_type, x, y):
self._state.component_counter += 1 self._state.component_counter += 1
@@ -109,11 +109,12 @@ class WorkflowDesigner(BaseComponent):
def move_component(self, component_id, x, y): def move_component(self, component_id, x, y):
if component_id in self._state.components: if component_id in self._state.components:
self._state.selected_component_id = component_id
self._state.components[component_id].x = int(x) self._state.components[component_id].x = int(x)
self._state.components[component_id].y = int(y) self._state.components[component_id].y = int(y)
self._db.save_state(self._key, self._state) # update db self._db.save_state(self._key, self._state) # update db
return self.refresh_designer() return self.refresh_designer(), self.refresh_properties(True)
def delete_component(self, component_id): def delete_component(self, component_id):
# Remove component # Remove component
@@ -189,17 +190,18 @@ class WorkflowDesigner(BaseComponent):
return self.refresh_properties() return self.refresh_properties()
def play_workflow(self, boundaries: dict): def play_workflow(self, boundaries: dict):
if self._state.selected_component_id is None: self._error_message = None
self._error_message = "No component selected"
return self.tabs_manager.refresh()
try: self._player.run()
self._player.run() if self._player.global_error:
# Show the error message in the same tab
self._error_message = self._player.global_error
else:
# change the tab and display the results
self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key) self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key)
except Exception as e:
self._error_message = str(e)
return self.tabs_manager.refresh() return self.tabs_manager.refresh()
def on_processor_details_event(self, component_id: str, event_name: str, details: dict): def on_processor_details_event(self, component_id: str, event_name: str, details: dict):
@@ -260,14 +262,48 @@ class WorkflowDesigner(BaseComponent):
</svg> </svg>
""" """
def _mk_component(self, component: WorkflowComponent, runtime_state: WorkflowComponentRuntimeState):
info = COMPONENT_TYPES[component.type]
is_selected = self._state.selected_component_id == component.id
if runtime_state.state == ComponentState.FAILURE:
state_class = 'error' # To be styled with a red highlight
elif runtime_state.state == ComponentState.NOT_RUN:
state_class = 'not-run' # To be styled as greyed-out
else:
state_class = ''
return Div(
# Input connection point
Div(cls="wkf-connection-point wkf-input-point",
data_component_id=component.id,
data_point_type="input"),
# Component content
Div(
Span(info["icon"], cls="text-xl mb-1"),
H4(component.title, cls="font-semibold text-xs"),
cls=f"wkf-component-content {info['color']} {state_class}"
),
# Output connection point
Div(cls="wkf-connection-point wkf-output-point",
data_component_id=component.id,
data_point_type="output"),
cls=f"wkf-workflow-component w-32 {'selected' if is_selected else ''}",
style=f"left: {component.x}px; top: {component.y}px;",
data_component_id=component.id,
draggable="true"
)
def _mk_elements(self): def _mk_elements(self):
return Div( return Div(
# Render connections # Render connections
*[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections], *[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections],
# Render components # Render components
*[self._mk_workflow_component(comp, state) for comp, state in zip(self._state.components.values(), *[self._mk_component(comp, state) for comp, state in zip(self._state.components.values(),
self._player.runtime_states)], self._player.runtime_states)],
) )
def _mk_canvas(self, oob=False): def _mk_canvas(self, oob=False):
@@ -293,7 +329,7 @@ class WorkflowDesigner(BaseComponent):
self._mk_toolbox(), # (Left side) self._mk_toolbox(), # (Left side)
self._mk_canvas(), # (Right side) self._mk_canvas(), # (Right side)
cls="wkf-designer flex gap-4", cls="wkf-designer flex gap-1",
id=f"d_{self._id}", id=f"d_{self._id}",
style=f"height:{self._state.designer_height}px;" style=f"height:{self._state.designer_height}px;"
) )
@@ -374,11 +410,12 @@ class WorkflowDesigner(BaseComponent):
Script(f"bindFormData('f_{self._id}_{component_id}');") Script(f"bindFormData('f_{self._id}_{component_id}');")
) )
def _mk_properties(self): def _mk_properties(self, oob=False):
return Div( return Div(
self._mk_properties_details(self._state.selected_component_id), self._mk_properties_details(self._state.selected_component_id),
cls="p-2 bg-base-100 rounded-lg border", cls="p-2 bg-base-100 rounded-lg border",
style=f"height:{self._get_properties_height()}px;", style=f"height:{self._get_properties_height()}px;",
hx_swap_oob='true' if oob else None,
id=f"p_{self._id}", id=f"p_{self._id}",
) )
@@ -503,30 +540,3 @@ class WorkflowDesigner(BaseComponent):
draggable="true", draggable="true",
data_type=component_type data_type=component_type
) )
@staticmethod
def _mk_workflow_component(component: WorkflowComponent, component_state: WorkflowComponentRuntimeState):
info = COMPONENT_TYPES[component.type]
return Div(
# Input connection point
Div(cls="wkf-connection-point wkf-input-point",
data_component_id=component.id,
data_point_type="input"),
# Component content
Div(
Span(info["icon"], cls="text-xl mb-1"),
H4(component.title, cls="font-semibold text-xs"),
cls=f"wkf-component-content {info['color']} {'error' if component_state.has_error else ''}"
),
# Output connection point
Div(cls="wkf-connection-point wkf-output-point",
data_component_id=component.id,
data_point_type="output"),
cls="wkf-workflow-component w-32",
style=f"left: {component.x}px; top: {component.y}px;",
data_component_id=component.id,
draggable="true"
)

View File

@@ -1,13 +1,15 @@
from collections import deque
import pandas as pd import pandas as pd
from fasthtml.components import * from fasthtml.components import *
from collections import deque
from components.BaseComponent import BaseComponent from components.BaseComponent import BaseComponent
from components.datagrid_new.components.DataGrid import DataGrid from components.datagrid_new.components.DataGrid import DataGrid
from components.datagrid_new.settings import DataGridSettings from components.datagrid_new.settings import DataGridSettings
from components.workflows.commands import WorkflowPlayerCommandManager from components.workflows.commands import WorkflowPlayerCommandManager
from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes from components.workflows.constants import WORKFLOW_PLAYER_INSTANCE_ID, ProcessorTypes
from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, WorkflowComponent from components.workflows.db_management import WorkflowsPlayerSettings, WorkflowComponentRuntimeState, \
WorkflowComponent, ComponentState
from core.instance_manager import InstanceManager from core.instance_manager import InstanceManager
from core.utils import get_unique_id, make_safe_id from core.utils import get_unique_id, make_safe_id
from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter from workflow.engine import WorkflowEngine, TableDataProducer, DefaultDataPresenter, DefaultDataFilter
@@ -20,8 +22,6 @@ grid_settings = DataGridSettings(
open_settings_visible=False) open_settings_visible=False)
class WorkflowPlayer(BaseComponent): class WorkflowPlayer(BaseComponent):
def __init__(self, session, def __init__(self, session,
_id=None, _id=None,
@@ -33,7 +33,7 @@ class WorkflowPlayer(BaseComponent):
self._settings_manager = settings_manager self._settings_manager = settings_manager
self.tabs_manager = tabs_manager self.tabs_manager = tabs_manager
self.key = f"__WorkflowPlayer_{player_settings.workflow_name}" self.key = f"__WorkflowPlayer_{player_settings.workflow_name}"
self._player_settings : WorkflowsPlayerSettings = player_settings self._player_settings: WorkflowsPlayerSettings = player_settings
self._boundaries = boundaries self._boundaries = boundaries
self.commands = WorkflowPlayerCommandManager(self) self.commands = WorkflowPlayerCommandManager(self)
self._datagrid = InstanceManager.get(self._session, self._datagrid = InstanceManager.get(self._session,
@@ -43,21 +43,66 @@ class WorkflowPlayer(BaseComponent):
grid_settings=grid_settings, grid_settings=grid_settings,
boundaries=boundaries) boundaries=boundaries)
self.runtime_states = [WorkflowComponentRuntimeState(component.id) for component in player_settings.components] self.runtime_states = [WorkflowComponentRuntimeState(component.id) for component in player_settings.components]
self.global_error = False self.global_error = None
self.has_error = False
def run(self): def run(self):
# Reset all component states to NOT_RUN before execution
for state in self.runtime_states:
state.state = ComponentState.NOT_RUN
state.error_message = None
self.global_error = None
components_by_id = {c.id: c for c in self._player_settings.components}
try:
sorted_components = self._get_sorted_components()
except ValueError as e:
# Handle workflow structure errors (e.g., cycles)
self.global_error = f"Workflow configuration error: {e}"
self._datagrid.init_from_dataframe(pd.DataFrame([]))
return
engine = self._get_engine() engine = self._get_engine()
res = engine.run_to_list() res = engine.run_to_list()
runtime_states_by_id = {rs.id: rs for rs in self.runtime_states}
if engine.has_error: if engine.has_error:
self.global_error = engine.global_error self.has_error = True
for runtime_state in self.runtime_states:
if runtime_state.id in engine.errors: if not engine.errors:
runtime_state.has_error = True self.global_error = engine.global_error
runtime_state.error_message = engine.errors[runtime_state.id].error_message
else: else:
runtime_state.has_error = False # Determine component states by simulating a "stop-on-fail" execution
runtime_state.error_message = "" first_failure_found = False
for component in sorted_components:
runtime_state = runtime_states_by_id.get(component.id)
if not runtime_state:
continue
if first_failure_found:
# After a failure, all subsequent components are marked as NOT_RUN
runtime_state.state = ComponentState.NOT_RUN
continue
if component.id in engine.errors:
# This is the first component that failed
first_failure_found = True
error = engine.errors[component.id]
runtime_state.state = ComponentState.FAILURE
runtime_state.error_message = str(error)
# As requested, display the component error in the global error area
component_props = components_by_id[component.id].properties
component_name = component_props.get("processor_name", f"ID: {component.id}")
self.global_error = f"Error in component '{component_name}': {str(error)}"
else:
# This component ran successfully
runtime_state.state = ComponentState.SUCCESS
else:
self.has_error = False
data = [row.as_dict() for row in res] data = [row.as_dict() for row in res]
df = pd.DataFrame(data) df = pd.DataFrame(data)
@@ -68,7 +113,7 @@ class WorkflowPlayer(BaseComponent):
self._datagrid, self._datagrid,
id=self._id, id=self._id,
) )
def _get_sorted_components(self) -> list[WorkflowComponent]: def _get_sorted_components(self) -> list[WorkflowComponent]:
""" """
Sorts the workflow components based on their connections using topological sort. Sorts the workflow components based on their connections using topological sort.
@@ -81,47 +126,47 @@ class WorkflowPlayer(BaseComponent):
:return: A list of sorted WorkflowComponent objects. :return: A list of sorted WorkflowComponent objects.
""" """
components_by_id = {c.id: c for c in self._player_settings.components} components_by_id = {c.id: c for c in self._player_settings.components}
# Get all component IDs involved in connections # Get all component IDs involved in connections
involved_ids = set() involved_ids = set()
for conn in self._player_settings.connections: for conn in self._player_settings.connections:
involved_ids.add(conn.from_id) involved_ids.add(conn.from_id)
involved_ids.add(conn.to_id) involved_ids.add(conn.to_id)
# Check if all involved components exist # Check if all involved components exist
for component_id in involved_ids: for component_id in involved_ids:
if component_id not in components_by_id: if component_id not in components_by_id:
raise ValueError(f"Component with ID '{component_id}' referenced in connections but does not exist.") raise ValueError(f"Component with ID '{component_id}' referenced in connections but does not exist.")
# Build the graph (adjacency list and in-degrees) for involved components # Build the graph (adjacency list and in-degrees) for involved components
adj = {cid: [] for cid in involved_ids} adj = {cid: [] for cid in involved_ids}
in_degree = {cid: 0 for cid in involved_ids} in_degree = {cid: 0 for cid in involved_ids}
for conn in self._player_settings.connections: for conn in self._player_settings.connections:
# from_id -> to_id # from_id -> to_id
adj[conn.from_id].append(conn.to_id) adj[conn.from_id].append(conn.to_id)
in_degree[conn.to_id] += 1 in_degree[conn.to_id] += 1
# Find all sources (nodes with in-degree 0) # Find all sources (nodes with in-degree 0)
queue = deque([cid for cid in involved_ids if in_degree[cid] == 0]) queue = deque([cid for cid in involved_ids if in_degree[cid] == 0])
sorted_order = [] sorted_order = []
while queue: while queue:
u = queue.popleft() u = queue.popleft()
sorted_order.append(u) sorted_order.append(u)
for v in adj.get(u, []): for v in adj.get(u, []):
in_degree[v] -= 1 in_degree[v] -= 1
if in_degree[v] == 0: if in_degree[v] == 0:
queue.append(v) queue.append(v)
# Check for cycles # Check for cycles
if len(sorted_order) != len(involved_ids): if len(sorted_order) != len(involved_ids):
raise ValueError("A cycle was detected in the workflow connections.") raise ValueError("A cycle was detected in the workflow connections.")
# Return sorted components # Return sorted components
return [components_by_id[cid] for cid in sorted_order] return [components_by_id[cid] for cid in sorted_order]
def _get_engine(self): def _get_engine(self):
# first reorder the component, according to the connection definitions # first reorder the component, according to the connection definitions
sorted_components = self._get_sorted_components() sorted_components = self._get_sorted_components()
@@ -134,10 +179,10 @@ class WorkflowPlayer(BaseComponent):
component.id, component.id,
component.properties["repository"], component.properties["repository"],
component.properties["table"])) component.properties["table"]))
elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default": elif component.type == ProcessorTypes.Filter and component.properties["processor_name"] == "Default":
engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"])) engine.add_processor(DefaultDataFilter(component.id, component.properties["filter"]))
elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default": elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default":
engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"])) engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"]))
return engine return engine
@@ -148,4 +193,4 @@ class WorkflowPlayer(BaseComponent):
if suffix is None: if suffix is None:
suffix = get_unique_id() suffix = get_unique_id()
return make_safe_id(f"{prefix}{suffix}") return make_safe_id(f"{prefix}{suffix}")

View File

@@ -1,3 +1,4 @@
import enum
import logging import logging
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -8,6 +9,15 @@ from core.settings_management import SettingsManager
logger = logging.getLogger("WorkflowsSettings") logger = logging.getLogger("WorkflowsSettings")
class ComponentState(enum.Enum):
"""
Represents the execution state of a workflow component.
"""
SUCCESS = "success"
FAILURE = "failure"
NOT_RUN = "not_run"
# Data structures # Data structures
@dataclass @dataclass
class WorkflowComponent: class WorkflowComponent:
@@ -29,9 +39,13 @@ class Connection:
@dataclass @dataclass
class WorkflowComponentRuntimeState: class WorkflowComponentRuntimeState:
"""
Represents the runtime state of a single workflow component.
"""
id: str id: str
has_error: bool = False state: ComponentState = ComponentState.NOT_RUN
error_message: str = "" error_message: str | None = None
@dataclass @dataclass
class WorkflowsDesignerSettings: class WorkflowsDesignerSettings:

View File

@@ -181,4 +181,13 @@ class WorkflowEngine:
Run the workflow and return all results as a list. Run the workflow and return all results as a list.
Use this method when you need all results at once. Use this method when you need all results at once.
""" """
return list(self.run()) try:
return list(self.run())
except DataProcessorError as err:
self.has_error = True
self.errors[err.component_id] = err.error
return []
except Exception as err:
self.has_error = True
self.global_error = str(err)
return []

View File

@@ -85,7 +85,7 @@ def test_i_can_render_no_component(designer):
def test_i_can_render_a_producer(designer, producer_component): def test_i_can_render_a_producer(designer, producer_component):
component = producer_component component = producer_component
component_state = WorkflowComponentRuntimeState(component.id) component_state = WorkflowComponentRuntimeState(component.id)
actual = designer._mk_workflow_component(component, component_state) actual = designer._mk_component(component, component_state)
expected = Div( expected = Div(
# input connection point # input connection point
Div(cls="wkf-connection-point wkf-input-point", Div(cls="wkf-connection-point wkf-input-point",