Improving error management
This commit is contained in:
@@ -1,6 +1,11 @@
|
|||||||
const tooltipElementId = "mmt-app"
|
const tooltipElementId = "mmt-app"
|
||||||
|
|
||||||
function bindTooltipsWithDelegation() {
|
function bindTooltipsWithDelegation() {
|
||||||
|
// To display the tooltip, the attribute 'data-tooltip' is mandatory => it contains the text to tooltip
|
||||||
|
// Then
|
||||||
|
// the 'truncate' to show only when the text is truncated
|
||||||
|
// the class 'mmt-tooltip' for force the display
|
||||||
|
|
||||||
const elementId = tooltipElementId
|
const elementId = tooltipElementId
|
||||||
console.debug("bindTooltips on element " + elementId);
|
console.debug("bindTooltips on element " + elementId);
|
||||||
|
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ def post(session, _id: str, component_type: str, x: int, y: int):
|
|||||||
|
|
||||||
|
|
||||||
@rt(Routes.MoveComponent)
|
@rt(Routes.MoveComponent)
|
||||||
def post(session, _id: str, component_id: str, x: int, y: int):
|
def post(session, _id: str, component_id: str, x: float, y: float):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Entering {Routes.MoveComponent} with args {debug_session(session)}, {_id=}, {component_id=}, {x=}, {y=}")
|
f"Entering {Routes.MoveComponent} with args {debug_session(session)}, {_id=}, {component_id=}, {x=}, {y=}")
|
||||||
instance = InstanceManager.get(session, _id)
|
instance = InstanceManager.get(session, _id)
|
||||||
@@ -133,3 +133,10 @@ def post(session, _id: str, tab_boundaries: str):
|
|||||||
f"Entering {Routes.PlayWorkflow} with args {debug_session(session)}, {_id=}")
|
f"Entering {Routes.PlayWorkflow} with args {debug_session(session)}, {_id=}")
|
||||||
instance = InstanceManager.get(session, _id)
|
instance = InstanceManager.get(session, _id)
|
||||||
return instance.play_workflow(json.loads(tab_boundaries))
|
return instance.play_workflow(json.loads(tab_boundaries))
|
||||||
|
|
||||||
|
@rt(Routes.StopWorkflow)
|
||||||
|
def post(session, _id: str, tab_boundaries: str):
|
||||||
|
logger.debug(
|
||||||
|
f"Entering {Routes.StopWorkflow} with args {debug_session(session)}, {_id=}")
|
||||||
|
instance = InstanceManager.get(session, _id)
|
||||||
|
return instance.stop_workflow(json.loads(tab_boundaries))
|
||||||
@@ -114,7 +114,6 @@
|
|||||||
}
|
}
|
||||||
|
|
||||||
.wkf-component-content.not-run {
|
.wkf-component-content.not-run {
|
||||||
background: var(--color-neutral);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
.wkf-connection-line {
|
.wkf-connection-line {
|
||||||
|
|||||||
@@ -200,10 +200,16 @@ class WorkflowDesigner(BaseComponent):
|
|||||||
else:
|
else:
|
||||||
|
|
||||||
# change the tab and display the results
|
# change the tab and display the results
|
||||||
|
self._player.set_boundaries(boundaries)
|
||||||
self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key)
|
self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key)
|
||||||
|
|
||||||
return self.tabs_manager.refresh()
|
return self.tabs_manager.refresh()
|
||||||
|
|
||||||
|
def stop_workflow(self, boundaries):
|
||||||
|
self._error_message = None
|
||||||
|
self._player.run()
|
||||||
|
return self.tabs_manager.refresh()
|
||||||
|
|
||||||
def on_processor_details_event(self, component_id: str, event_name: str, details: dict):
|
def on_processor_details_event(self, component_id: str, event_name: str, details: dict):
|
||||||
if component_id in self._state.components:
|
if component_id in self._state.components:
|
||||||
component = self._state.components[component_id]
|
component = self._state.components[component_id]
|
||||||
@@ -271,6 +277,12 @@ class WorkflowDesigner(BaseComponent):
|
|||||||
state_class = 'not-run' # To be styled as greyed-out
|
state_class = 'not-run' # To be styled as greyed-out
|
||||||
else:
|
else:
|
||||||
state_class = ''
|
state_class = ''
|
||||||
|
if runtime_state.state == ComponentState.FAILURE:
|
||||||
|
tooltip_content = runtime_state.error_message
|
||||||
|
tooltip_class = "mmt-tooltip"
|
||||||
|
else:
|
||||||
|
tooltip_content = None
|
||||||
|
tooltip_class = ""
|
||||||
|
|
||||||
return Div(
|
return Div(
|
||||||
# Input connection point
|
# Input connection point
|
||||||
@@ -290,9 +302,10 @@ class WorkflowDesigner(BaseComponent):
|
|||||||
data_component_id=component.id,
|
data_component_id=component.id,
|
||||||
data_point_type="output"),
|
data_point_type="output"),
|
||||||
|
|
||||||
cls=f"wkf-workflow-component w-32 {'selected' if is_selected else ''}",
|
cls=f"wkf-workflow-component w-32 {'selected' if is_selected else ''} {tooltip_class}",
|
||||||
style=f"left: {component.x}px; top: {component.y}px;",
|
style=f"left: {component.x}px; top: {component.y}px;",
|
||||||
data_component_id=component.id,
|
data_component_id=component.id,
|
||||||
|
data_tooltip=tooltip_content,
|
||||||
draggable="true"
|
draggable="true"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -303,7 +316,7 @@ class WorkflowDesigner(BaseComponent):
|
|||||||
|
|
||||||
# Render components
|
# Render components
|
||||||
*[self._mk_component(comp, state) for comp, state in zip(self._state.components.values(),
|
*[self._mk_component(comp, state) for comp, state in zip(self._state.components.values(),
|
||||||
self._player.runtime_states)],
|
self._player.runtime_states.values())],
|
||||||
)
|
)
|
||||||
|
|
||||||
def _mk_canvas(self, oob=False):
|
def _mk_canvas(self, oob=False):
|
||||||
@@ -337,8 +350,8 @@ class WorkflowDesigner(BaseComponent):
|
|||||||
def _mk_media(self):
|
def _mk_media(self):
|
||||||
return Div(
|
return Div(
|
||||||
mk_icon(icon_play, cls="mr-1", **self.commands.play_workflow()),
|
mk_icon(icon_play, cls="mr-1", **self.commands.play_workflow()),
|
||||||
mk_icon(icon_pause, cls="mr-1", **self.commands.play_workflow()),
|
mk_icon(icon_pause, cls="mr-1", **self.commands.pause_workflow()),
|
||||||
mk_icon(icon_stop, cls="mr-1", **self.commands.play_workflow()),
|
mk_icon(icon_stop, cls="mr-1", **self.commands.stop_workflow()),
|
||||||
cls=f"media-controls flex m-2"
|
cls=f"media-controls flex m-2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from fasthtml.components import *
|
from fasthtml.components import *
|
||||||
@@ -22,6 +23,12 @@ grid_settings = DataGridSettings(
|
|||||||
open_settings_visible=False)
|
open_settings_visible=False)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class WorkflowsPlayerError(Exception):
|
||||||
|
component_id: str
|
||||||
|
error: Exception
|
||||||
|
|
||||||
|
|
||||||
class WorkflowPlayer(BaseComponent):
|
class WorkflowPlayer(BaseComponent):
|
||||||
def __init__(self, session,
|
def __init__(self, session,
|
||||||
_id=None,
|
_id=None,
|
||||||
@@ -42,16 +49,16 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
key=self.key,
|
key=self.key,
|
||||||
grid_settings=grid_settings,
|
grid_settings=grid_settings,
|
||||||
boundaries=boundaries)
|
boundaries=boundaries)
|
||||||
self.runtime_states = [WorkflowComponentRuntimeState(component.id) for component in player_settings.components]
|
self.runtime_states = {component.id: WorkflowComponentRuntimeState(component.id)
|
||||||
|
for component in player_settings.components}
|
||||||
self.global_error = None
|
self.global_error = None
|
||||||
self.has_error = False
|
self.has_error = False
|
||||||
|
|
||||||
|
def set_boundaries(self, boundaries: dict):
|
||||||
|
self._datagrid.set_boundaries(boundaries)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# Reset all component states to NOT_RUN before execution
|
self._reset_state(state=ComponentState.NOT_RUN)
|
||||||
for state in self.runtime_states:
|
|
||||||
state.state = ComponentState.NOT_RUN
|
|
||||||
state.error_message = None
|
|
||||||
self.global_error = None
|
|
||||||
|
|
||||||
components_by_id = {c.id: c for c in self._player_settings.components}
|
components_by_id = {c.id: c for c in self._player_settings.components}
|
||||||
|
|
||||||
@@ -63,10 +70,16 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
self._datagrid.init_from_dataframe(pd.DataFrame([]))
|
self._datagrid.init_from_dataframe(pd.DataFrame([]))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
engine = self._get_engine()
|
engine = self._get_engine()
|
||||||
res = engine.run_to_list()
|
except WorkflowsPlayerError as ex:
|
||||||
|
if ex.component_id in self.runtime_states:
|
||||||
|
self.runtime_states[ex.component_id].state = ComponentState.FAILURE
|
||||||
|
self.runtime_states[ex.component_id].error_message = str(ex.error)
|
||||||
|
self.global_error = f"Failed to init component '{ex.component_id}': {ex.error}"
|
||||||
|
return
|
||||||
|
|
||||||
runtime_states_by_id = {rs.id: rs for rs in self.runtime_states}
|
res = engine.run_to_list()
|
||||||
|
|
||||||
if engine.has_error:
|
if engine.has_error:
|
||||||
self.has_error = True
|
self.has_error = True
|
||||||
@@ -78,7 +91,7 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
# Determine component states by simulating a "stop-on-fail" execution
|
# Determine component states by simulating a "stop-on-fail" execution
|
||||||
first_failure_found = False
|
first_failure_found = False
|
||||||
for component in sorted_components:
|
for component in sorted_components:
|
||||||
runtime_state = runtime_states_by_id.get(component.id)
|
runtime_state = self.runtime_states.get(component.id)
|
||||||
if not runtime_state:
|
if not runtime_state:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -108,6 +121,9 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
df = pd.DataFrame(data)
|
df = pd.DataFrame(data)
|
||||||
self._datagrid.init_from_dataframe(df)
|
self._datagrid.init_from_dataframe(df)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._reset_state()
|
||||||
|
|
||||||
def __ft__(self):
|
def __ft__(self):
|
||||||
return Div(
|
return Div(
|
||||||
self._datagrid,
|
self._datagrid,
|
||||||
@@ -172,6 +188,7 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
sorted_components = self._get_sorted_components()
|
sorted_components = self._get_sorted_components()
|
||||||
engine = WorkflowEngine()
|
engine = WorkflowEngine()
|
||||||
for component in sorted_components:
|
for component in sorted_components:
|
||||||
|
try:
|
||||||
if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository":
|
if component.type == ProcessorTypes.Producer and component.properties["processor_name"] == "Repository":
|
||||||
engine.add_processor(
|
engine.add_processor(
|
||||||
TableDataProducer(self._session,
|
TableDataProducer(self._session,
|
||||||
@@ -185,8 +202,18 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
|
|
||||||
elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default":
|
elif component.type == ProcessorTypes.Presenter and component.properties["processor_name"] == "Default":
|
||||||
engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"]))
|
engine.add_processor(DefaultDataPresenter(component.id, component.properties["columns"]))
|
||||||
|
except Exception as e:
|
||||||
|
raise WorkflowsPlayerError(component.id, e)
|
||||||
|
|
||||||
return engine
|
return engine
|
||||||
|
|
||||||
|
def _reset_state(self, state: ComponentState = ComponentState.SUCCESS):
|
||||||
|
self.global_error = None
|
||||||
|
self.has_error = False
|
||||||
|
for runtime_state in self.runtime_states.values():
|
||||||
|
runtime_state.state = state
|
||||||
|
runtime_state.error_message = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_component_id(session, suffix=None):
|
def create_component_id(session, suffix=None):
|
||||||
prefix = f"{WORKFLOW_PLAYER_INSTANCE_ID}{session['user_id']}"
|
prefix = f"{WORKFLOW_PLAYER_INSTANCE_ID}{session['user_id']}"
|
||||||
|
|||||||
Reference in New Issue
Block a user