Files
MyManagingTools/src/components/workflows/components/WorkflowDesigner.py

675 lines
25 KiB
Python

import logging
from fastcore.basics import NotStr
from fasthtml.components import *
from fasthtml.xtend import Script
from assets.icons import icon_error
from components.BaseComponent import BaseComponent
from components.undo_redo.constants import UndoRedoAttrs
from components.workflows.assets.icons import icon_play, icon_pause, icon_stop, icon_refresh
from components.workflows.commands import WorkflowDesignerCommandManager
from components.workflows.components.WorkflowDesignerProperties import WorkflowDesignerProperties
from components.workflows.components.WorkflowPlayer import WorkflowPlayer
from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes, COMPONENT_TYPES, \
PROCESSOR_TYPES
from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \
Connection, WorkflowsDesignerDbManager, ComponentState, WorkflowsDesignerState
from components_helpers import apply_boundaries, mk_tooltip, mk_dialog_buttons, mk_icon
from core.instance_manager import InstanceManager
from core.jira import JiraRequestTypes, DEFAULT_SEARCH_FIELDS
from core.utils import get_unique_id, make_safe_id
from utils.ComponentsInstancesHelper import ComponentsInstancesHelper
from utils.DbManagementHelper import DbManagementHelper
logger = logging.getLogger("WorkflowDesigner")
class WorkflowDesigner(BaseComponent):
def __init__(self, session,
_id=None,
settings_manager=None,
tabs_manager=None,
key: str = None,
designer_settings: WorkflowsDesignerSettings = None,
boundaries: dict = None):
super().__init__(session, _id)
self._settings_manager = settings_manager
self.tabs_manager = tabs_manager
self._key = key
self._designer_settings = designer_settings
self._db = WorkflowsDesignerDbManager(session, settings_manager)
self._undo_redo = ComponentsInstancesHelper.get_undo_redo(session)
self._state: WorkflowsDesignerState = self._db.load_state(key)
self._boundaries = boundaries
self.commands = WorkflowDesignerCommandManager(self)
self.properties = WorkflowDesignerProperties(self._session, f"{self._id}", self)
workflow_name = self._designer_settings.workflow_name
self._player = InstanceManager.get(self._session,
WorkflowPlayer.create_component_id(self._session, workflow_name),
WorkflowPlayer,
settings_manager=self._settings_manager,
tabs_manager=self.tabs_manager,
designer=self,
boundaries=boundaries)
self._error_message = None
def set_boundaries(self, boundaries: dict):
self._boundaries = boundaries
def get_boundaries(self):
return self._boundaries
def get_state(self) -> WorkflowsDesignerState:
return self._state
def get_db(self):
return self._db
def get_key(self):
return self._key
def refresh_designer(self, oob=False):
if oob:
return self._mk_canvas(oob)
else:
return self._mk_elements()
def refresh_properties(self, oob=False):
return self._mk_properties(oob)
def refresh(self):
return self.__ft__(oob=True)
def refresh_state(self):
self._state = self._db.load_state(self._key)
self.properties.update_layout()
self.properties.update_component(self._state.selected_component_id)
return self.__ft__(oob=True)
def add_component(self, component_type, x, y):
self._state.component_counter += 1
component_id = f"comp_{self._state.component_counter}"
info = COMPONENT_TYPES[component_type]
component = WorkflowComponent(
id=component_id,
type=component_type,
x=int(x),
y=int(y),
title=info["title"],
description=info["description"],
properties={"processor_name": PROCESSOR_TYPES[component_type][0]}
)
self._state.components[component_id] = component
undo_redo_attrs = UndoRedoAttrs(f"Add Component '{component_type}'", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs) # update db
return self.refresh_designer(), self._undo_redo.refresh()
def move_component(self, component_id, x, y):
if component_id in self._state.components:
component = self._state.components[component_id]
self._state.selected_component_id = component_id
component.x = int(x)
component.y = int(y)
undo_redo_attrs = UndoRedoAttrs(f"Move Component '{component.title}'", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs) # update db
return self.refresh_designer(), self.properties.refresh(mode="form", oob=True), self._undo_redo.refresh()
def delete_component(self, component_id):
# Remove component
if component_id in self._state.components:
component = self._state.components[component_id]
del self._state.components[component_id]
# Remove related connections
self._state.connections = [connection for connection in self._state.connections
if connection.from_id != component_id and connection.to_id != component_id]
# update db
undo_redo_attrs = UndoRedoAttrs(f"Remove Component '{component.title}'", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.refresh_designer(), self._undo_redo.refresh()
def add_connection(self, from_id, to_id):
# Check if connection already exists
for connection in self._state.connections:
if connection.from_id == from_id and connection.to_id == to_id:
return self.refresh_designer() # , self.error_message("Connection already exists")
connection_id = f"conn_{len(self._state.connections) + 1}"
connection = Connection(id=connection_id, from_id=from_id, to_id=to_id)
self._state.connections.append(connection)
# update db
undo_redo_attrs = UndoRedoAttrs(f"Add Connection", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.refresh_designer(), self._undo_redo.refresh()
def delete_connection(self, from_id, to_id):
for connection in self._state.connections:
if connection.from_id == from_id and connection.to_id == to_id:
self._state.connections.remove(connection)
# update db
undo_redo_attrs = UndoRedoAttrs(f"Delete Connection", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.refresh_designer(), self._undo_redo.refresh()
def set_designer_height(self, height):
self._state.designer_height = height
undo_redo_attrs = UndoRedoAttrs(f"Resize Designer", on_undo=lambda: self.refresh_state())
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.__ft__(), self._undo_redo.refresh() # refresh the whole component
def update_properties_layout(self, input_width, properties_width, output_width):
self._state.properties_input_width = input_width
self._state.properties_properties_width = properties_width
self._state.properties_output_width = output_width
self.properties.update_layout()
undo_redo_attrs = UndoRedoAttrs(f"Resize Properties", on_undo=lambda: self.refresh_state())
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.__ft__(), self._undo_redo.refresh() # refresh the whole component
def select_component(self, component_id):
if component_id in self._state.components:
self._state.selected_component_id = component_id
component = self._state.components[component_id]
undo_redo_attrs = UndoRedoAttrs(f"Select Component {component.title}", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.properties.refresh(mode="form"), self._undo_redo.refresh()
def save_properties(self, component_id: str, details: dict):
if component_id in self._state.components:
component = self._state.components[component_id]
component.properties |= details
undo_redo_attrs = UndoRedoAttrs(f"Set properties for {component.title}", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
logger.debug(f"Saved properties for component {component_id}: {details}")
return self.properties.refresh(mode="form"), self._undo_redo.refresh()
def cancel_properties(self, component_id: str):
if component_id in self._state.components:
logger.debug(f"Cancel saving properties for component {component_id}")
return self.properties.refresh(mode="form")
def set_selected_processor(self, component_id: str, processor_name: str):
if component_id in self._state.components:
component = self._state.components[component_id]
component.properties = {"processor_name": processor_name}
undo_redo_attrs = UndoRedoAttrs(f"Set Processor for {component.title}", on_undo=self.refresh_state)
self._db.save_state(self._key, self._state, undo_redo_attrs)
return self.properties.refresh(mode="form"), self._undo_redo.refresh()
def play_workflow(self, boundaries: dict):
self._error_message = None
self._player.run()
if self._player.global_error:
# Show the error message in the same tab
self._error_message = self._player.global_error
else:
# change the tab and display the results
self._player.set_boundaries(boundaries)
self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key)
return self.tabs_manager.refresh()
def stop_workflow(self):
self._error_message = None
self._player.stop()
return self.tabs_manager.refresh()
def on_processor_details_event(self, component_id: str, event_name: str, details: dict):
if component_id in self._state.components:
component = self._state.components[component_id]
if event_name == "OnRepositoryChanged":
component.properties["repository"] = details["repository"]
tables = DbManagementHelper.list_tables(self._session, details["repository"])
component.properties["table"] = tables[0] if len(tables) > 0 else None
elif event_name == "OnJiraRequestTypeChanged":
component.properties["request_type"] = details["request_type"]
return self.properties.refresh(mode="form")
def get_workflow_name(self):
return self._designer_settings.workflow_name
def get_workflow_components(self):
return self._state.components.values()
def get_workflow_connections(self):
return self._state.connections
def __ft__(self, oob=False):
return Div(
H1(f"{self._designer_settings.workflow_name}", cls="text-xl font-bold"),
# P("Drag components from the toolbox to the canvas to create your workflow.", cls="text-sm"),
Div(
self._mk_media(),
# self._mk_refresh_button(),
self._mk_error_message(),
cls="flex mb-2",
id=f"t_{self._id}"
),
self._mk_designer(),
Div(cls="wkf-splitter", id=f"s_{self._id}"),
self._mk_properties(),
Script(f"bindWorkflowDesigner('{self._id}');"),
**apply_boundaries(self._boundaries),
id=f"{self._id}",
hx_swap_oob='true' if oob else None,
)
def _mk_connection_svg(self, conn: Connection):
if conn.from_id not in self._state.components or conn.to_id not in self._state.components:
return ""
from_comp = self._state.components[conn.from_id]
to_comp = self._state.components[conn.to_id]
# Calculate connection points (approximate)
x1 = from_comp.x + 128 # component width + output point
y1 = from_comp.y + 32 # component height / 2
x2 = to_comp.x
y2 = to_comp.y + 32
# Create curved path
mid_x = (x1 + x2) / 2
path = f"M {x1} {y1} C {mid_x} {y1}, {mid_x} {y2}, {x2} {y2}"
return f"""
<svg class="wkf-connection-line" style="left: 0; top: 0; width: 100%; height: 100%;"
data-from-id="{conn.from_id}" data-to-id="{conn.to_id}">
<path d="{path}" class="wkf-connection-path-thick"/>
<path d="{path}" class="wkf-connection-path" marker-end="url(#arrowhead)"/>
<defs>
<marker id="arrowhead" markerWidth="10" markerHeight="7" refX="9" refY="3.5" orient="auto">
<polygon points="0 0, 10 3.5, 0 7" class="wkf-connection-path-arrowhead"/>
</marker>
</defs>
</svg>
"""
def _mk_component(self, component: WorkflowComponent):
runtime_state = self._player.get_component_runtime_state(component.id)
info = COMPONENT_TYPES[component.type]
is_selected = self._state.selected_component_id == component.id
tooltip_content = None
tooltip_class = ""
if runtime_state.state == ComponentState.FAILURE:
state_class = 'error' # To be styled with a red highlight
tooltip_content = runtime_state.error_message
tooltip_class = "mmt-tooltip"
elif runtime_state.state == ComponentState.NOT_RUN:
state_class = 'not-run' # To be styled as greyed-out
else:
state_class = ''
return Div(
# Input connection point
Div(cls="wkf-connection-point wkf-input-point",
data_component_id=component.id,
data_point_type="input"),
# Component content
Div(
Span(info["icon"], cls="text-xl mb-1"),
H4(component.title, cls="font-semibold text-xs"),
cls=f"wkf-component-content {info['color']} {state_class}"
),
# Output connection point
Div(cls="wkf-connection-point wkf-output-point",
data_component_id=component.id,
data_point_type="output"),
cls=f"wkf-workflow-component w-32 {'selected' if is_selected else ''} {tooltip_class}",
style=f"left: {component.x}px; top: {component.y}px;",
data_component_id=component.id,
data_tooltip=tooltip_content,
draggable="true"
)
def _mk_elements(self):
if len(self._state.components) == 0:
return Div("Drag components from the toolbox to the canvas to create your workflow.",
cls="flex items-center justify-center h-full w-full"
)
return Div(
# Render connections
*[NotStr(self._mk_connection_svg(conn)) for conn in self._state.connections],
# Render components
*[self._mk_component(comp) for comp in self._state.components.values()],
)
def _mk_canvas(self, oob=False):
return Div(
self._mk_elements(),
cls=f"wkf-canvas flex-1 rounded-lg border flex-1 {'wkf-canvas-error' if self._error_message else ''}",
id=f"c_{self._id}",
hx_swap_oob='true' if oob else None,
),
def _mk_toolbox(self):
return Div(
Div(
*[self._mk_toolbox_item(comp_type, info)
for comp_type, info in COMPONENT_TYPES.items()],
# cls="space-y-1"
),
cls="wkf-toolbox"
)
def _mk_designer(self):
return Div(
self._mk_toolbox(), # (Left side)
self._mk_canvas(), # (Right side)
cls="wkf-designer flex gap-1",
id=f"d_{self._id}",
style=f"height:{self._state.designer_height}px;"
)
def _mk_media(self):
return Div(
mk_icon(icon_play, cls="mr-1", **self.commands.play_workflow()),
mk_icon(icon_pause, cls="mr-1", **self.commands.pause_workflow()),
mk_icon(icon_stop, cls="mr-1", **self.commands.stop_workflow()),
cls=f"media-controls flex m-2"
)
def _mk_refresh_button(self):
return mk_icon(icon_refresh, **self.commands.refresh())
def _mk_error_message(self):
if not self._error_message:
return Div()
return Div(
mk_icon(icon_error),
Span(self._error_message, cls="text-sm"),
role="alert",
cls="alert alert-error alert-outline p-1!",
hx_swap_oob='true',
)
def _mk_processor_properties(self, component, processor_name):
if processor_name == "Jira":
return self._mk_jira_processor_details(component)
elif processor_name == "Repository":
return self._mk_repository_processor_details(component)
elif component.type == ProcessorTypes.Filter and processor_name == "Default":
return self._mk_filter_processor_details(component)
elif component.type == ProcessorTypes.Presenter and processor_name == "Default":
return self._mk_presenter_processor_details(component)
return Div('Not defined yet !')
def _mk_properties_output(self, component):
return Div(
"Output name",
Input(type="input",
name="output_name",
placeholder="data",
value=component.properties.get("output_name", None),
cls="input w-xs"),
cls="join"
)
def _mk_properties_details(self, component_id, allow_component_selection=False):
def _mk_header():
return Div(
Div(
Span(icon),
H4(component.title, cls="font-semibold text-xs"),
cls=f"rounded-lg border-2 {color} flex text-center px-2"
),
H1(component_id, cls="ml-4"),
cls="flex mb-2"
)
def _mk_select():
return Select(
*[Option(processor_name, selected="selected" if processor_name == selected_processor_name else None)
for processor_name in PROCESSOR_TYPES[component.type]],
cls="select select-sm w-64 mb-2",
id="processor_name",
name="processor_name",
**self.commands.select_processor(component_id)
)
if component_id is None or component_id not in self._state.components and not allow_component_selection:
return None
else:
component_id = self._state.selected_component_id
component = self._state.components[component_id]
selected_processor_name = component.properties["processor_name"]
icon = COMPONENT_TYPES[component.type]["icon"]
color = COMPONENT_TYPES[component.type]["color"]
return Div(
Form(
_mk_header(),
Div(
Input(type="radio", name=f"pt_{self._id}", cls="tab", aria_label="Properties", checked="checked"),
Div(
_mk_select(),
self._mk_processor_properties(component, selected_processor_name),
cls="tab-content"
),
Input(type="radio", name=f"pt_{self._id}", cls="tab", aria_label="Inputs"),
Div(
"Inputs",
cls="tab-content"
),
Input(type="radio", name=f"pt_{self._id}", cls="tab", aria_label="Output"),
Div(
self._mk_properties_output(component),
cls="tab-content"
),
cls="tabs tabs-border"
),
mk_dialog_buttons(cls="mt-4",
on_ok=self.commands.save_properties(component_id),
on_cancel=self.commands.cancel_properties(component_id)),
cls="font-mono text-sm",
id=f"f_{self._id}_{component_id}",
),
Script(f"bindFormData('f_{self._id}_{component_id}');")
)
def _mk_properties(self, oob=False):
return self.properties
def _mk_jira_processor_details(self, component):
def _mk_option(name):
return Option(name.name,
value=name.value,
selected="selected" if name.value == request_type else None)
def _mk_input_group():
if request_type == JiraRequestTypes.Search.value:
return Div(
Input(type="text",
name="request",
value=component.properties.get("request", ""),
placeholder="Enter JQL",
cls="input w-full"),
P("Write your jql code"),
)
elif request_type == JiraRequestTypes.Comments.value:
return Div(
Input(type="text",
name="request",
value=component.properties.get("request", ""),
placeholder="Issue id",
cls="input w-full"),
P("Put the issue id here"),
)
def _mk_extra_parameters():
if request_type == JiraRequestTypes.Search.value:
return Input(type="text",
name="fields",
value=component.properties.get("fields", DEFAULT_SEARCH_FIELDS),
placeholder="default fields",
cls="input w-full ml-2")
else:
return None
request_type = component.properties.get("request_type", JiraRequestTypes.Search.value)
return Div(
Fieldset(
Legend("JQL", cls="fieldset-legend"),
Div(
Select(
*[_mk_option(enum) for enum in JiraRequestTypes],
cls="select w-xs",
name="request_type",
**self.commands.on_processor_details_event(component.id, "OnJiraRequestTypeChanged"),
),
_mk_extra_parameters(),
cls="flex"),
_mk_input_group(),
cls="fieldset bg-base-200 border-base-300 rounded-box border p-4"
),
)
def _mk_repository_processor_details(self, component):
selected_repo = component.properties.get("repository", None)
selected_table = component.properties.get("table", None)
def _mk_repositories_options():
repositories = DbManagementHelper.list_repositories(self._session)
if len(repositories) == 0:
return [Option("No repository available", disabled=True)]
return ([Option("Choose a repository", disabled=True, selected="selected" if selected_repo is None else None)] +
[Option(repo.name, selected="selected" if repo.name == selected_repo else None)
for repo in DbManagementHelper.list_repositories(self._session)])
def _mk_tables_options():
if selected_repo is None:
return [Option("No repository selected", disabled=True)]
tables = DbManagementHelper.list_tables(self._session, selected_repo)
if len(tables) == 0:
return [Option("No table available", disabled=True)]
return ([Option("Choose a table", disabled=True, selected="selected" if selected_table is None else None)] +
[Option(table, selected="selected" if table == selected_table else None)
for table in DbManagementHelper.list_tables(self._session, selected_repo)])
return Div(
Fieldset(
Legend("Repository", cls="fieldset-legend"),
Div(
Select(
*_mk_repositories_options(),
cls="select w-64",
id=f"repository_{self._id}",
name="repository",
**self.commands.on_processor_details_event(component.id, "OnRepositoryChanged"),
),
Select(
*_mk_tables_options(),
cls="select w-64 ml-4",
id=f"table_{self._id}",
name="table",
),
cls="flex",
),
P("Select the source table"),
cls="fieldset bg-base-200 border-base-300 rounded-box border p-4"
)
)
@staticmethod
def _mk_filter_processor_details(component):
return Div(
Fieldset(
Legend("Filter", cls="fieldset-legend"),
Input(type="text",
name="filter",
value=component.properties.get("filter", ""),
placeholder="Enter filter expression",
cls="input w-full"),
P("Write your filter expression (python syntax)"),
cls="fieldset bg-base-200 border-base-300 rounded-box border p-4"
)
)
@staticmethod
def _mk_presenter_processor_details(component):
return Div(
Fieldset(
Legend("Presenter", cls="fieldset-legend"),
Input(type="text",
name="columns",
value=component.properties.get("columns", ""),
placeholder="Columns to display, separated by comma",
cls="input w-full"),
P("Comma separated list of columns to display. Use '*' to display all columns, 'source=dest' to rename columns."),
P("Use 'parent.*=*' to display all columns from object 'parent' and rename them removing the 'parent' prefix."),
cls="fieldset bg-base-200 border-base-300 rounded-box border p-4"
)
)
def _get_properties_height(self):
print(f"height: {self._boundaries['height']}")
return self._boundaries["height"] - self._state.designer_height - 86
@staticmethod
def create_component_id(session, suffix=None):
prefix = f"{WORKFLOW_DESIGNER_INSTANCE_ID}{session['user_id']}"
if suffix is None:
suffix = get_unique_id()
return make_safe_id(f"{prefix}{suffix}")
@staticmethod
def _mk_toolbox_item(component_type: str, info: dict):
return Div(
mk_tooltip(
Div(
Span(info["icon"], cls="mb-2"),
H4(info["title"], cls="font-semibold text-xs"),
cls=f"p-2 rounded-lg border-2 {info['color']} flex text-center"
),
tooltip=info["description"]),
cls="wkf-toolbox-item p-2",
draggable="true",
data_type=component_type
)