Compare commits
7 Commits
63058ef4a9
...
ResolvingP
| Author | SHA1 | Date | |
|---|---|---|---|
| 7dc7687b25 | |||
| f08ae4a90b | |||
| b48aaf4621 | |||
| 2c5fe004f5 | |||
| 9cf0e5e26a | |||
| 67abb45804 | |||
| 5820efb7f1 |
@@ -1,4 +1,4 @@
|
|||||||
from core.utils import get_user_id, get_unique_id
|
from core.utils import get_user_id
|
||||||
|
|
||||||
|
|
||||||
class BaseComponent:
|
class BaseComponent:
|
||||||
@@ -51,12 +51,3 @@ class BaseComponentSingleton(BaseComponent):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def create_component_id(cls, session):
|
def create_component_id(cls, session):
|
||||||
return f"{cls.COMPONENT_INSTANCE_ID}{session['user_id']}"
|
return f"{cls.COMPONENT_INSTANCE_ID}{session['user_id']}"
|
||||||
|
|
||||||
|
|
||||||
class BaseComponentMultipleInstance(BaseComponent):
|
|
||||||
COMPONENT_INSTANCE_ID = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create_component_id(cls, session):
|
|
||||||
component_id = cls.COMPONENT_INSTANCE_ID or cls.__name__
|
|
||||||
return get_unique_id(f"{component_id}{session['user_id']}")
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from fasthtml.components import Div, sse_message
|
||||||
|
from fasthtml.core import EventStream
|
||||||
from fasthtml.fastapp import fast_app
|
from fasthtml.fastapp import fast_app
|
||||||
from starlette.datastructures import UploadFile
|
from starlette.datastructures import UploadFile
|
||||||
|
|
||||||
@@ -138,8 +141,14 @@ def post(session, _id: str, state: str, args: str = None):
|
|||||||
return instance.manage_state_changed(state, args)
|
return instance.manage_state_changed(state, args)
|
||||||
|
|
||||||
|
|
||||||
|
@rt(Routes.YieldRow)
|
||||||
|
async def get(session, _id: str):
|
||||||
|
logger.debug(f"Entering {Routes.YieldRow} with args {_id=}")
|
||||||
|
instance = InstanceManager.get(session, _id)
|
||||||
|
return EventStream(instance.mk_body_content_sse())
|
||||||
|
|
||||||
@rt(Routes.GetPage)
|
@rt(Routes.GetPage)
|
||||||
def get(session, _id: str, page_index: int):
|
def get(session, _id: str, page_index: int):
|
||||||
logger.debug(f"Entering {Routes.GetPage} with args {_id=}, {page_index=}")
|
logger.debug(f"Entering {Routes.GetPage} with args {_id=}, {page_index=}")
|
||||||
instance = InstanceManager.get(session, _id)
|
instance = InstanceManager.get(session, _id)
|
||||||
return instance.mk_body_content_page(page_index)
|
return instance.mk_body_content_page(page_index)
|
||||||
@@ -400,6 +400,7 @@ class DataGrid(BaseComponent):
|
|||||||
id=f"scb_{self._id}",
|
id=f"scb_{self._id}",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@timed
|
||||||
def mk_table(self, oob=False):
|
def mk_table(self, oob=False):
|
||||||
htmx_extra_params = {
|
htmx_extra_params = {
|
||||||
"hx-on::before-settle": f"onAfterSettle('{self._id}', event);",
|
"hx-on::before-settle": f"onAfterSettle('{self._id}', event);",
|
||||||
|
|||||||
@@ -118,6 +118,38 @@ class DataGridCommandManager(BaseCommandManager):
|
|||||||
"data_tooltip": tooltip_msg,
|
"data_tooltip": tooltip_msg,
|
||||||
"cls": self.merge_class(cls, "mmt-tooltip")
|
"cls": self.merge_class(cls, "mmt-tooltip")
|
||||||
}
|
}
|
||||||
|
#
|
||||||
|
# @staticmethod
|
||||||
|
# def merge(*items):
|
||||||
|
# """
|
||||||
|
# Merges multiple dictionaries into a single dictionary by combining their key-value pairs.
|
||||||
|
# If a key exists in multiple dictionaries and its value is a string, the values are concatenated.
|
||||||
|
# If the key's value is not a string, an error is raised.
|
||||||
|
#
|
||||||
|
# :param items: dictionaries to be merged. If all items are None, None is returned.
|
||||||
|
# :return: A single dictionary containing the merged key-value pairs from all input dictionaries.
|
||||||
|
# :raises NotImplementedError: If a key's value is not a string and exists in multiple input dictionaries.
|
||||||
|
# """
|
||||||
|
# if all(item is None for item in items):
|
||||||
|
# return None
|
||||||
|
#
|
||||||
|
# res = {}
|
||||||
|
# for item in [item for item in items if item is not None]:
|
||||||
|
#
|
||||||
|
# for key, value in item.items():
|
||||||
|
# if not key in res:
|
||||||
|
# res[key] = value
|
||||||
|
# else:
|
||||||
|
# if isinstance(res[key], str):
|
||||||
|
# res[key] += " " + value
|
||||||
|
# else:
|
||||||
|
# raise NotImplementedError("")
|
||||||
|
#
|
||||||
|
# return res
|
||||||
|
#
|
||||||
|
# @staticmethod
|
||||||
|
# def merge_class(cls1, cls2):
|
||||||
|
# return (cls1 + " " + cls2) if cls2 else cls1
|
||||||
|
|
||||||
|
|
||||||
class FilterAllCommands(BaseCommandManager):
|
class FilterAllCommands(BaseCommandManager):
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ class Routes:
|
|||||||
UpdateView = "/update_view"
|
UpdateView = "/update_view"
|
||||||
ShowFooterMenu = "/show_footer_menu"
|
ShowFooterMenu = "/show_footer_menu"
|
||||||
UpdateState = "/update_state"
|
UpdateState = "/update_state"
|
||||||
|
YieldRow = "/yield-row"
|
||||||
GetPage = "/page"
|
GetPage = "/page"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,17 +0,0 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
from fasthtml.fastapp import fast_app
|
|
||||||
|
|
||||||
from components.entryselector.constants import Routes
|
|
||||||
from core.instance_manager import debug_session, InstanceManager
|
|
||||||
|
|
||||||
logger = logging.getLogger("EntrySelectorApp")
|
|
||||||
|
|
||||||
repositories_app, rt = fast_app()
|
|
||||||
|
|
||||||
|
|
||||||
@rt(Routes.Select)
|
|
||||||
def get(session, _id: str, entry: str):
|
|
||||||
logger.debug(f"Entering {Routes.Select} with args {debug_session(session)}, {_id=}, {entry=}")
|
|
||||||
instance = InstanceManager.get(session, _id)
|
|
||||||
return instance.select_entry(entry)
|
|
||||||
@@ -1,16 +0,0 @@
|
|||||||
.es-container {
|
|
||||||
overflow-x: auto;
|
|
||||||
white-space: nowrap;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
.es-entry {
|
|
||||||
border: 2px solid var(--color-base-300);
|
|
||||||
padding: 2px;
|
|
||||||
cursor: pointer;
|
|
||||||
display: inline-block; /* Ensure entries align horizontally if needed */
|
|
||||||
}
|
|
||||||
|
|
||||||
.es-entry:hover {
|
|
||||||
background-color: var(--color-base-300);
|
|
||||||
}
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
from components.BaseCommandManager import BaseCommandManager
|
|
||||||
from components.entryselector.constants import Routes, ROUTE_ROOT
|
|
||||||
|
|
||||||
|
|
||||||
class EntrySelectorCommandManager(BaseCommandManager):
|
|
||||||
def __init__(self, owner):
|
|
||||||
super().__init__(owner)
|
|
||||||
|
|
||||||
def select_entry(self, entry):
|
|
||||||
return {
|
|
||||||
"hx-get": f"{ROUTE_ROOT}{Routes.Select}",
|
|
||||||
"hx-target": f"#{self._owner.content_id}",
|
|
||||||
"hx-swap": "innerHTML",
|
|
||||||
"hx-vals": f'{{"_id": "{self._id}", "entry": "{entry}"}}',
|
|
||||||
}
|
|
||||||
@@ -1,45 +0,0 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
from fasthtml.components import *
|
|
||||||
|
|
||||||
from components.BaseComponent import BaseComponentMultipleInstance
|
|
||||||
from components.entryselector.commands import EntrySelectorCommandManager
|
|
||||||
|
|
||||||
logger = logging.getLogger("EntrySelector")
|
|
||||||
|
|
||||||
|
|
||||||
class EntrySelector(BaseComponentMultipleInstance):
|
|
||||||
def __init__(self, session, _id, owner, content_id, data=None, hooks=None, key=None, boundaries=None):
|
|
||||||
super().__init__(session, _id)
|
|
||||||
self._key = key
|
|
||||||
self._owner = owner # debugger component
|
|
||||||
self.data = data
|
|
||||||
self.content_id = content_id
|
|
||||||
self.hooks = hooks
|
|
||||||
self._boundaries = boundaries if boundaries else {"width": "300"}
|
|
||||||
self._commands = EntrySelectorCommandManager(self)
|
|
||||||
|
|
||||||
def set_data(self, data):
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
def set_boundaries(self, boundaries):
|
|
||||||
self._boundaries = boundaries
|
|
||||||
|
|
||||||
def select_entry(self, entry):
|
|
||||||
logger.debug(f"Selecting entry {entry}")
|
|
||||||
# return self._owner.select_entry(entry)
|
|
||||||
|
|
||||||
def _mk_content(self):
|
|
||||||
if self.data is None:
|
|
||||||
return [Div("no entry")]
|
|
||||||
|
|
||||||
return [Div(index,
|
|
||||||
**self._commands.select_entry(index),
|
|
||||||
cls="es-entry") for index in range(self.data)]
|
|
||||||
|
|
||||||
def __ft__(self):
|
|
||||||
return Div(
|
|
||||||
*self._mk_content(),
|
|
||||||
cls="flex es-container",
|
|
||||||
id=f"{self._id}",
|
|
||||||
)
|
|
||||||
@@ -1,5 +0,0 @@
|
|||||||
ROUTE_ROOT = "/es" # for EntrySelector
|
|
||||||
|
|
||||||
|
|
||||||
class Routes:
|
|
||||||
Select = "/select"
|
|
||||||
@@ -1,10 +1,9 @@
|
|||||||
from fasthtml.common import *
|
from fasthtml.common import *
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from components.BaseComponent import BaseComponent
|
from components.BaseComponent import BaseComponent
|
||||||
from components.entryselector.components.EntrySelector import EntrySelector
|
|
||||||
from components.workflows.constants import COMPONENT_TYPES, PROCESSOR_TYPES
|
from components.workflows.constants import COMPONENT_TYPES, PROCESSOR_TYPES
|
||||||
from components_helpers import mk_dialog_buttons
|
from components_helpers import mk_dialog_buttons
|
||||||
from core.instance_manager import InstanceManager
|
|
||||||
from core.jira import JiraRequestTypes, DEFAULT_SEARCH_FIELDS
|
from core.jira import JiraRequestTypes, DEFAULT_SEARCH_FIELDS
|
||||||
from utils.DbManagementHelper import DbManagementHelper
|
from utils.DbManagementHelper import DbManagementHelper
|
||||||
|
|
||||||
@@ -26,15 +25,6 @@ class WorkflowDesignerProperties(BaseComponent):
|
|||||||
self._component = None
|
self._component = None
|
||||||
self.update_layout()
|
self.update_layout()
|
||||||
self.update_component(self._owner.get_state().selected_component_id)
|
self.update_component(self._owner.get_state().selected_component_id)
|
||||||
self._input_entry_selector = InstanceManager.new(self._session,
|
|
||||||
EntrySelector,
|
|
||||||
owner=self,
|
|
||||||
content_id=f"pic_{self._id}",
|
|
||||||
data=100)
|
|
||||||
self._output_entry_selector = InstanceManager.new(self._session,
|
|
||||||
EntrySelector,
|
|
||||||
owner=self,
|
|
||||||
content_id=f"poc_{self._id}")
|
|
||||||
|
|
||||||
def update_layout(self):
|
def update_layout(self):
|
||||||
if self._owner.get_state().properties_input_width is None:
|
if self._owner.get_state().properties_input_width is None:
|
||||||
@@ -76,8 +66,7 @@ class WorkflowDesignerProperties(BaseComponent):
|
|||||||
|
|
||||||
def _mk_input(self):
|
def _mk_input(self):
|
||||||
return Div(
|
return Div(
|
||||||
self._input_entry_selector,
|
"Input",
|
||||||
Div(id=f"pic_{self._id}"),
|
|
||||||
id=f"pi_{self._id}",
|
id=f"pi_{self._id}",
|
||||||
style=f"width: {self.layout.input_width}px;",
|
style=f"width: {self.layout.input_width}px;",
|
||||||
cls="wkf-properties-input"
|
cls="wkf-properties-input"
|
||||||
@@ -85,8 +74,7 @@ class WorkflowDesignerProperties(BaseComponent):
|
|||||||
|
|
||||||
def _mk_output(self):
|
def _mk_output(self):
|
||||||
return Div(
|
return Div(
|
||||||
self._output_entry_selector,
|
"Output",
|
||||||
"Output Content",
|
|
||||||
id=f"po_{self._id}",
|
id=f"po_{self._id}",
|
||||||
style=f"width: {self.layout.output_width}px;",
|
style=f"width: {self.layout.output_width}px;",
|
||||||
cls="wkf-properties-output"
|
cls="wkf-properties-output"
|
||||||
@@ -198,7 +186,7 @@ class WorkflowDesignerProperties(BaseComponent):
|
|||||||
selected="selected" if name.value == request_type else None)
|
selected="selected" if name.value == request_type else None)
|
||||||
|
|
||||||
def _mk_input_group():
|
def _mk_input_group():
|
||||||
if request_type == JiraRequestTypes.Search.value or request_type == "issues": # remove issues at some point
|
if request_type == JiraRequestTypes.Search.value or request_type == "issues": # remove issues at some point
|
||||||
return [
|
return [
|
||||||
Div(
|
Div(
|
||||||
Input(type="text",
|
Input(type="text",
|
||||||
|
|||||||
@@ -98,8 +98,6 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
|
|
||||||
if component.id not in engine.errors:
|
if component.id not in engine.errors:
|
||||||
runtime_state.state = ComponentState.SUCCESS
|
runtime_state.state = ComponentState.SUCCESS
|
||||||
runtime_state.input = engine.debug[component.id]["input"]
|
|
||||||
runtime_state.output = engine.debug[component.id]["output"]
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# the component failed
|
# the component failed
|
||||||
@@ -179,7 +177,7 @@ class WorkflowPlayer(BaseComponent):
|
|||||||
# Return sorted components
|
# Return sorted components
|
||||||
return [components_by_id[cid] for cid in sorted_order]
|
return [components_by_id[cid] for cid in sorted_order]
|
||||||
|
|
||||||
def _get_engine(self, sorted_components) -> WorkflowEngine:
|
def _get_engine(self, sorted_components):
|
||||||
# first reorder the component, according to the connection definitions
|
# first reorder the component, according to the connection definitions
|
||||||
engine = WorkflowEngine()
|
engine = WorkflowEngine()
|
||||||
for component in sorted_components:
|
for component in sorted_components:
|
||||||
|
|||||||
@@ -48,8 +48,6 @@ class WorkflowComponentRuntimeState:
|
|||||||
id: str
|
id: str
|
||||||
state: ComponentState = ComponentState.SUCCESS
|
state: ComponentState = ComponentState.SUCCESS
|
||||||
error_message: str | None = None
|
error_message: str | None = None
|
||||||
input: list = None
|
|
||||||
output: list = None
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -64,7 +62,7 @@ class WorkflowsDesignerState:
|
|||||||
component_counter: int = 0
|
component_counter: int = 0
|
||||||
designer_height: int = 230
|
designer_height: int = 230
|
||||||
properties_input_width: int = None
|
properties_input_width: int = None
|
||||||
properties_properties_width: int = None
|
properties_properties_width : int = None
|
||||||
properties_output_width: int = None
|
properties_output_width: int = None
|
||||||
selected_component_id: str | None = None
|
selected_component_id: str | None = None
|
||||||
|
|
||||||
|
|||||||
@@ -47,10 +47,6 @@ class InstanceManager:
|
|||||||
|
|
||||||
return InstanceManager._instances[key]
|
return InstanceManager._instances[key]
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def new(session, instance_type, **kwargs):
|
|
||||||
return InstanceManager.get(session, instance_type.create_component_id(session), instance_type, **kwargs)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def register(session: dict | None, instance, instance_id: str = None):
|
def register(session: dict | None, instance, instance_id: str = None):
|
||||||
"""
|
"""
|
||||||
|
|||||||
39
src/main.py
39
src/main.py
@@ -1,5 +1,7 @@
|
|||||||
# global layout
|
# global layout
|
||||||
import logging.config
|
import logging.config
|
||||||
|
import random
|
||||||
|
from asyncio import sleep
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from fasthtml.common import *
|
from fasthtml.common import *
|
||||||
@@ -53,6 +55,9 @@ links = [
|
|||||||
Link(href="./assets/daisyui-5-themes.css", rel="stylesheet", type="text/css"),
|
Link(href="./assets/daisyui-5-themes.css", rel="stylesheet", type="text/css"),
|
||||||
Script(src="./assets/tailwindcss-browser@4.js"),
|
Script(src="./assets/tailwindcss-browser@4.js"),
|
||||||
|
|
||||||
|
# SSE
|
||||||
|
Script(src="https://unpkg.com/htmx-ext-sse@2.2.1/sse.js"),
|
||||||
|
|
||||||
# Old drawer layout
|
# Old drawer layout
|
||||||
Script(src="./assets/DrawerLayout.js", defer=True),
|
Script(src="./assets/DrawerLayout.js", defer=True),
|
||||||
Link(rel="stylesheet", href="./assets/DrawerLayout.css"),
|
Link(rel="stylesheet", href="./assets/DrawerLayout.css"),
|
||||||
@@ -146,7 +151,6 @@ register_component("theme_controller", "components.themecontroller", "ThemeContr
|
|||||||
register_component("main_layout", "components.drawerlayout", "DrawerLayoutApp")
|
register_component("main_layout", "components.drawerlayout", "DrawerLayoutApp")
|
||||||
register_component("undo_redo", "components.undo_redo", "UndoRedoApp")
|
register_component("undo_redo", "components.undo_redo", "UndoRedoApp")
|
||||||
register_component("tabs", "components.tabs", "TabsApp") # before repositories
|
register_component("tabs", "components.tabs", "TabsApp") # before repositories
|
||||||
register_component("entryselector", "components.entryselector", "EntrySelectorApp")
|
|
||||||
register_component("applications", "components.applications", "ApplicationsApp")
|
register_component("applications", "components.applications", "ApplicationsApp")
|
||||||
register_component("repositories", "components.repositories", "RepositoriesApp")
|
register_component("repositories", "components.repositories", "RepositoriesApp")
|
||||||
register_component("workflows", "components.workflows", "WorkflowsApp")
|
register_component("workflows", "components.workflows", "WorkflowsApp")
|
||||||
@@ -215,17 +219,17 @@ app, rt = fast_app(
|
|||||||
# -------------------------
|
# -------------------------
|
||||||
# Profiling middleware
|
# Profiling middleware
|
||||||
# -------------------------
|
# -------------------------
|
||||||
# @app.middleware("http")
|
@app.middleware("http")
|
||||||
async def timing_middleware(request, call_next):
|
async def timing_middleware(request, call_next):
|
||||||
import time
|
import time
|
||||||
start_total = time.perf_counter()
|
start_total = time.perf_counter()
|
||||||
|
|
||||||
# Call the next middleware or route handler
|
# Call the next middleware or route handler
|
||||||
response = await call_next(request)
|
response = await call_next(request)
|
||||||
|
|
||||||
end_total = time.perf_counter()
|
end_total = time.perf_counter()
|
||||||
elapsed = end_total - start_total
|
elapsed = end_total - start_total
|
||||||
|
|
||||||
print(f"[PERF] Total server time: {elapsed:.4f} sec - Path: {request.url.path}")
|
print(f"[PERF] Total server time: {elapsed:.4f} sec - Path: {request.url.path}")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -272,6 +276,31 @@ def get(session):
|
|||||||
DrawerLayoutOld(pages),)
|
DrawerLayoutOld(pages),)
|
||||||
|
|
||||||
|
|
||||||
|
shutdown_event = signal_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
async def number_generator():
|
||||||
|
while True: # not shutdown_event.is_set():
|
||||||
|
data = Article(random.randint(1, 100))
|
||||||
|
print(data)
|
||||||
|
yield sse_message(data)
|
||||||
|
await sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
@rt("/sse")
|
||||||
|
def get():
|
||||||
|
return Titled("SSE Random Number Generator",
|
||||||
|
P("Generate pairs of random numbers, as the list grows scroll downwards."),
|
||||||
|
Div(hx_ext="sse",
|
||||||
|
sse_connect="/number-stream",
|
||||||
|
hx_swap="beforeend show:bottom",
|
||||||
|
sse_swap="message"))
|
||||||
|
|
||||||
|
|
||||||
|
@rt("/number-stream")
|
||||||
|
async def get(): return EventStream(number_generator())
|
||||||
|
|
||||||
|
|
||||||
@rt('/toasting')
|
@rt('/toasting')
|
||||||
def get(session):
|
def get(session):
|
||||||
# Normally one toast is enough, this allows us to see
|
# Normally one toast is enough, this allows us to see
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import ast
|
import ast
|
||||||
import logging
|
import logging
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any, Generator
|
from typing import Any, Generator
|
||||||
|
|
||||||
from components.admin.admin_db_manager import AdminDbManager
|
from components.admin.admin_db_manager import AdminDbManager
|
||||||
@@ -12,14 +11,6 @@ from core.utils import UnreferencedNamesVisitor
|
|||||||
from utils.Datahelper import DataHelper
|
from utils.Datahelper import DataHelper
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class WorkflowPayload:
|
|
||||||
processor_name: str
|
|
||||||
component_id: str
|
|
||||||
item_linkage_id: int
|
|
||||||
item: Any
|
|
||||||
|
|
||||||
|
|
||||||
class DataProcessorError(Exception):
|
class DataProcessorError(Exception):
|
||||||
def __init__(self, component_id, error):
|
def __init__(self, component_id, error):
|
||||||
self.component_id = component_id
|
self.component_id = component_id
|
||||||
@@ -155,56 +146,35 @@ class WorkflowEngine:
|
|||||||
self.has_error = False
|
self.has_error = False
|
||||||
self.global_error = None
|
self.global_error = None
|
||||||
self.errors = {}
|
self.errors = {}
|
||||||
self.debug = {}
|
|
||||||
self.item_count = -1
|
|
||||||
|
|
||||||
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
|
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
|
||||||
"""Add a data processor to the pipeline."""
|
"""Add a data processor to the pipeline."""
|
||||||
self.processors.append(processor)
|
self.processors.append(processor)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def _process_single_item(self, item_linkage_id, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
|
def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
|
||||||
"""Process a single item through the remaining processors."""
|
"""Process a single item through the remaining processors."""
|
||||||
if processor_index >= len(self.processors):
|
if processor_index >= len(self.processors):
|
||||||
yield item
|
yield item
|
||||||
return
|
return
|
||||||
|
|
||||||
processor = self.processors[processor_index]
|
processor = self.processors[processor_index]
|
||||||
if not processor.component_id in self.debug:
|
|
||||||
self.debug[processor.component_id] = {"input": [], "output": []}
|
|
||||||
|
|
||||||
self.debug[processor.component_id]["input"].append(WorkflowPayload(
|
|
||||||
processor_name=processor.__class__.__name__,
|
|
||||||
component_id=processor.component_id,
|
|
||||||
item_linkage_id=item_linkage_id,
|
|
||||||
item=item))
|
|
||||||
|
|
||||||
# Process the item through the current processor
|
# Process the item through the current processor
|
||||||
for processed_item in processor.process(item):
|
for processed_item in processor.process(item):
|
||||||
self.debug[processor.component_id]["output"].append(WorkflowPayload(
|
|
||||||
processor_name=processor.__class__.__name__,
|
|
||||||
component_id=processor.component_id,
|
|
||||||
item_linkage_id=item_linkage_id,
|
|
||||||
item=processed_item))
|
|
||||||
|
|
||||||
# Recursively process through remaining processors
|
# Recursively process through remaining processors
|
||||||
yield from self._process_single_item(item_linkage_id, processed_item, processor_index + 1)
|
yield from self._process_single_item(processed_item, processor_index + 1)
|
||||||
|
|
||||||
def run(self) -> Generator[Any, None, None]:
|
def run(self) -> Generator[Any, None, None]:
|
||||||
"""
|
"""
|
||||||
Run the workflow pipeline and yield results one by one.
|
Run the workflow pipeline and yield results one by one.
|
||||||
The first processor must be a DataProducer.
|
The first processor must be a DataProducer.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.debug.clear()
|
|
||||||
|
|
||||||
if not self.processors:
|
if not self.processors:
|
||||||
self.has_error = False
|
self.has_error = False
|
||||||
self.global_error = "No processors in the pipeline"
|
self.global_error = "No processors in the pipeline"
|
||||||
self.item_count = -1
|
|
||||||
raise ValueError(self.global_error)
|
raise ValueError(self.global_error)
|
||||||
|
|
||||||
self.item_count = 0
|
|
||||||
first_processor = self.processors[0]
|
first_processor = self.processors[0]
|
||||||
|
|
||||||
if not isinstance(first_processor, DataProducer):
|
if not isinstance(first_processor, DataProducer):
|
||||||
@@ -212,16 +182,8 @@ class WorkflowEngine:
|
|||||||
self.global_error = "First processor must be a DataProducer"
|
self.global_error = "First processor must be a DataProducer"
|
||||||
raise ValueError(self.global_error)
|
raise ValueError(self.global_error)
|
||||||
|
|
||||||
self.debug[first_processor.component_id] = {"input": [], "output": []}
|
for item in first_processor.process(None):
|
||||||
|
yield from self._process_single_item(item, 1)
|
||||||
for item_linkage_id, item in enumerate(first_processor.process(None)):
|
|
||||||
self.item_count += 1
|
|
||||||
self.debug[first_processor.component_id]["output"].append(WorkflowPayload(
|
|
||||||
processor_name=first_processor.__class__.__name__,
|
|
||||||
component_id=first_processor.component_id,
|
|
||||||
item_linkage_id=item_linkage_id,
|
|
||||||
item=item))
|
|
||||||
yield from self._process_single_item(item_linkage_id, item, 1)
|
|
||||||
|
|
||||||
def run_to_list(self) -> list[Any]:
|
def run_to_list(self) -> list[Any]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user