7 Commits

Author SHA1 Message Date
7dc7687b25 Fixed unit tests 2025-08-23 21:34:09 +02:00
f08ae4a90b Added RowIndex in GridState.
Fixed content escaping
2025-08-23 00:29:52 +02:00
b48aaf4621 Fixed unit tests 2025-08-22 23:17:01 +02:00
2c5fe004f5 Improving lazing loading. The scrollbars updates itself 2025-08-22 00:09:59 +02:00
9cf0e5e26a Trying things 2025-08-18 06:59:25 +02:00
67abb45804 Working on improving the perf 2025-08-10 17:42:54 +02:00
5820efb7f1 Updating git 2025-08-10 11:29:39 +02:00
22 changed files with 114 additions and 274 deletions

View File

@@ -1,4 +1,4 @@
from core.utils import get_user_id, get_unique_id from core.utils import get_user_id
class BaseComponent: class BaseComponent:
@@ -51,12 +51,3 @@ class BaseComponentSingleton(BaseComponent):
@classmethod @classmethod
def create_component_id(cls, session): def create_component_id(cls, session):
return f"{cls.COMPONENT_INSTANCE_ID}{session['user_id']}" return f"{cls.COMPONENT_INSTANCE_ID}{session['user_id']}"
class BaseComponentMultipleInstance(BaseComponent):
COMPONENT_INSTANCE_ID = None
@classmethod
def create_component_id(cls, session):
component_id = cls.COMPONENT_INSTANCE_ID or cls.__name__
return get_unique_id(f"{component_id}{session['user_id']}")

View File

@@ -1,6 +1,9 @@
import asyncio
import json import json
import logging import logging
from fasthtml.components import Div, sse_message
from fasthtml.core import EventStream
from fasthtml.fastapp import fast_app from fasthtml.fastapp import fast_app
from starlette.datastructures import UploadFile from starlette.datastructures import UploadFile
@@ -138,6 +141,12 @@ def post(session, _id: str, state: str, args: str = None):
return instance.manage_state_changed(state, args) return instance.manage_state_changed(state, args)
@rt(Routes.YieldRow)
async def get(session, _id: str):
logger.debug(f"Entering {Routes.YieldRow} with args {_id=}")
instance = InstanceManager.get(session, _id)
return EventStream(instance.mk_body_content_sse())
@rt(Routes.GetPage) @rt(Routes.GetPage)
def get(session, _id: str, page_index: int): def get(session, _id: str, page_index: int):
logger.debug(f"Entering {Routes.GetPage} with args {_id=}, {page_index=}") logger.debug(f"Entering {Routes.GetPage} with args {_id=}, {page_index=}")

View File

@@ -400,6 +400,7 @@ class DataGrid(BaseComponent):
id=f"scb_{self._id}", id=f"scb_{self._id}",
) )
@timed
def mk_table(self, oob=False): def mk_table(self, oob=False):
htmx_extra_params = { htmx_extra_params = {
"hx-on::before-settle": f"onAfterSettle('{self._id}', event);", "hx-on::before-settle": f"onAfterSettle('{self._id}', event);",

View File

@@ -118,6 +118,38 @@ class DataGridCommandManager(BaseCommandManager):
"data_tooltip": tooltip_msg, "data_tooltip": tooltip_msg,
"cls": self.merge_class(cls, "mmt-tooltip") "cls": self.merge_class(cls, "mmt-tooltip")
} }
#
# @staticmethod
# def merge(*items):
# """
# Merges multiple dictionaries into a single dictionary by combining their key-value pairs.
# If a key exists in multiple dictionaries and its value is a string, the values are concatenated.
# If the key's value is not a string, an error is raised.
#
# :param items: dictionaries to be merged. If all items are None, None is returned.
# :return: A single dictionary containing the merged key-value pairs from all input dictionaries.
# :raises NotImplementedError: If a key's value is not a string and exists in multiple input dictionaries.
# """
# if all(item is None for item in items):
# return None
#
# res = {}
# for item in [item for item in items if item is not None]:
#
# for key, value in item.items():
# if not key in res:
# res[key] = value
# else:
# if isinstance(res[key], str):
# res[key] += " " + value
# else:
# raise NotImplementedError("")
#
# return res
#
# @staticmethod
# def merge_class(cls1, cls2):
# return (cls1 + " " + cls2) if cls2 else cls1
class FilterAllCommands(BaseCommandManager): class FilterAllCommands(BaseCommandManager):

View File

@@ -36,6 +36,7 @@ class Routes:
UpdateView = "/update_view" UpdateView = "/update_view"
ShowFooterMenu = "/show_footer_menu" ShowFooterMenu = "/show_footer_menu"
UpdateState = "/update_state" UpdateState = "/update_state"
YieldRow = "/yield-row"
GetPage = "/page" GetPage = "/page"

View File

@@ -62,7 +62,7 @@ class JsonViewerHelper:
class JsonViewer(BaseComponent): class JsonViewer(BaseComponent):
def __init__(self, session, _id, owner, user_id, data, hooks=None, key=None, boundaries=None): def __init__(self, session, _id, owner, user_id, data, hooks=None, key=None, boundaries=None):
super().__init__(session, _id) super().__init__(session, _id)
self._key = key # for comparison between two jsonviewer components self._key = key
self._owner = owner # debugger component self._owner = owner # debugger component
self.user_id = user_id self.user_id = user_id
self.data = data self.data = data
@@ -88,10 +88,6 @@ class JsonViewer(BaseComponent):
self._helper = JsonViewerHelper() self._helper = JsonViewerHelper()
def set_data(self, data):
self.data = data
self.node = self._create_node(None, data)
def set_node_folding(self, node_id, folding): def set_node_folding(self, node_id, folding):
if folding == self._folding_mode: if folding == self._folding_mode:
self._nodes_to_track.remove(node_id) self._nodes_to_track.remove(node_id)

View File

@@ -1,26 +0,0 @@
import logging
from fasthtml.fastapp import fast_app
from components.entryselector.constants import Routes
from core.instance_manager import debug_session, InstanceManager
logger = logging.getLogger("EntrySelectorApp")
repositories_app, rt = fast_app()
@rt(Routes.Select)
def get(session, _id: str, entry: str):
logger.debug(f"Entering {Routes.Select} with args {debug_session(session)}, {_id=}, {entry=}")
instance = InstanceManager.get(session, _id)
to_update = instance.select_entry(entry)
res = [instance]
if res is None:
return instance
if isinstance(to_update, (list, tuple)):
res.extend(to_update)
else:
res.append(to_update)
return tuple(res)

View File

@@ -1,20 +0,0 @@
.es-container {
overflow-x: auto;
white-space: nowrap;
}
.es-entry {
border: 2px solid var(--color-base-300);
padding: 2px;
cursor: pointer;
display: inline-block; /* Ensure entries align horizontally if needed */
}
.es-entry-selected {
border: 2px solid var(--color-primary);
}
.es-entry:hover {
background-color: var(--color-base-300);
}

View File

@@ -1,15 +0,0 @@
from components.BaseCommandManager import BaseCommandManager
from components.entryselector.constants import Routes, ROUTE_ROOT
class EntrySelectorCommandManager(BaseCommandManager):
def __init__(self, owner):
super().__init__(owner)
def select_entry(self, entry):
return {
"hx-get": f"{ROUTE_ROOT}{Routes.Select}",
"hx-target": f"#{self._id}",
"hx-swap": "outerHTML",
"hx-vals": f'{{"_id": "{self._id}", "entry": "{entry}"}}',
}

View File

@@ -1,56 +0,0 @@
import logging
from fasthtml.components import *
from components.BaseComponent import BaseComponentMultipleInstance
from components.entryselector.commands import EntrySelectorCommandManager
logger = logging.getLogger("EntrySelector")
class EntrySelector(BaseComponentMultipleInstance):
def __init__(self, session, _id, owner, data=None, hooks=None, key=None, boundaries=None):
super().__init__(session, _id)
self._key = key
self._owner = owner # debugger component
self.data = data
self.selected = None
self.hooks = hooks
self._boundaries = boundaries if boundaries else {"width": "300"}
self._commands = EntrySelectorCommandManager(self)
def set_data(self, data):
self.data = data
def set_selected(self, selected):
if selected is None:
self.selected = None
else:
self.selected = int(selected)
def set_boundaries(self, boundaries):
self._boundaries = boundaries
def select_entry(self, entry):
logger.debug(f"Selecting entry {entry}")
self.set_selected(entry)
if self.hooks is not None and (on_entry_selected := self.hooks.get("on_entry_selected", None)) is not None:
return on_entry_selected(entry)
else:
return None
def _mk_content(self):
if not self.data:
return [Div("no entry")]
return [Div(index,
**self._commands.select_entry(index),
cls=f"es-entry {'es-entry-selected' if index == self.selected else ''}")
for index in range(self.data)]
def __ft__(self):
return Div(
*self._mk_content(),
cls="flex es-container",
id=f"{self._id}",
)

View File

@@ -1,5 +0,0 @@
ROUTE_ROOT = "/es" # for EntrySelector
class Routes:
Select = "/select"

View File

@@ -46,7 +46,7 @@ class WorkflowDesigner(BaseComponent):
self.properties = WorkflowDesignerProperties(self._session, f"{self._id}", self) self.properties = WorkflowDesignerProperties(self._session, f"{self._id}", self)
workflow_name = self._designer_settings.workflow_name workflow_name = self._designer_settings.workflow_name
self.player = InstanceManager.get(self._session, self._player = InstanceManager.get(self._session,
WorkflowPlayer.create_component_id(self._session, workflow_name), WorkflowPlayer.create_component_id(self._session, workflow_name),
WorkflowPlayer, WorkflowPlayer,
settings_manager=self._settings_manager, settings_manager=self._settings_manager,
@@ -222,23 +222,22 @@ class WorkflowDesigner(BaseComponent):
def play_workflow(self, boundaries: dict): def play_workflow(self, boundaries: dict):
self._error_message = None self._error_message = None
self.player.run() self._player.run()
if self.player.global_error: if self._player.global_error:
# Show the error message in the same tab # Show the error message in the same tab
self._error_message = self.player.global_error self._error_message = self._player.global_error
else: else:
self.properties.set_entry_selector_data(self.player.nb_items)
# change the tab and display the results # change the tab and display the results
self.player.set_boundaries(boundaries) self._player.set_boundaries(boundaries)
self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self.player, self.player.key) self.tabs_manager.add_tab(f"Workflow {self._designer_settings.workflow_name}", self._player, self._player.key)
return self.tabs_manager.refresh() return self.tabs_manager.refresh()
def stop_workflow(self): def stop_workflow(self):
self._error_message = None self._error_message = None
self.player.stop() self._player.stop()
self.properties.set_entry_selector_data(0)
return self.tabs_manager.refresh() return self.tabs_manager.refresh()
def on_processor_details_event(self, component_id: str, event_name: str, details: dict): def on_processor_details_event(self, component_id: str, event_name: str, details: dict):
@@ -315,7 +314,7 @@ class WorkflowDesigner(BaseComponent):
def _mk_component(self, component: WorkflowComponent): def _mk_component(self, component: WorkflowComponent):
runtime_state = self.player.get_component_runtime_state(component.id) runtime_state = self._player.get_component_runtime_state(component.id)
info = COMPONENT_TYPES[component.type] info = COMPONENT_TYPES[component.type]
is_selected = self._state.selected_component_id == component.id is_selected = self._state.selected_component_id == component.id

View File

@@ -1,11 +1,9 @@
from fasthtml.common import * from fasthtml.common import *
from dataclasses import dataclass
from components.BaseComponent import BaseComponent from components.BaseComponent import BaseComponent
from components.debugger.components.JsonViewer import JsonViewer
from components.entryselector.components.EntrySelector import EntrySelector
from components.workflows.constants import COMPONENT_TYPES, PROCESSOR_TYPES from components.workflows.constants import COMPONENT_TYPES, PROCESSOR_TYPES
from components_helpers import mk_dialog_buttons from components_helpers import mk_dialog_buttons
from core.instance_manager import InstanceManager
from core.jira import JiraRequestTypes, DEFAULT_SEARCH_FIELDS from core.jira import JiraRequestTypes, DEFAULT_SEARCH_FIELDS
from utils.DbManagementHelper import DbManagementHelper from utils.DbManagementHelper import DbManagementHelper
@@ -27,24 +25,6 @@ class WorkflowDesignerProperties(BaseComponent):
self._component = None self._component = None
self.update_layout() self.update_layout()
self.update_component(self._owner.get_state().selected_component_id) self.update_component(self._owner.get_state().selected_component_id)
self.entry_selector: EntrySelector = InstanceManager.new(self._session,
EntrySelector,
owner=self,
hooks={
"on_entry_selected": self.on_entry_selector_changed})
self._input_jsonviewer: JsonViewer = InstanceManager.new(self._session,
JsonViewer,
owner=self,
user_id=None,
data=None)
self._output_jsonviewer: JsonViewer = InstanceManager.new(self._session,
JsonViewer,
owner=self,
user_id=None,
data=None)
def set_entry_selector_data(self, data):
self.entry_selector.set_data(data)
def update_layout(self): def update_layout(self):
if self._owner.get_state().properties_input_width is None: if self._owner.get_state().properties_input_width is None:
@@ -75,54 +55,29 @@ class WorkflowDesignerProperties(BaseComponent):
return self.__ft__(oob=oob) return self.__ft__(oob=oob)
def on_entry_selector_changed(self, entry):
entry = int(entry)
input_data, output_data = None, None
selected_component_id = self._owner.get_state().selected_component_id
if selected_component_id is not None:
runtime_state = self._owner.player.runtime_states.get(selected_component_id, None)
if runtime_state is not None:
input_content = runtime_state.input[entry] if len(runtime_state.input) > entry else None
output_content = runtime_state.output[entry] if len(runtime_state.output) > entry else None
if input_content is not None:
self._input_jsonviewer.set_data(input_content.item.as_dict())
input_data = self._input_jsonviewer
if output_content is not None:
self._output_jsonviewer.set_data(output_content.item.as_dict())
output_data = self._output_jsonviewer
return (self._mk_input(content=input_data, oob=True),
self._mk_output(content=output_data, oob=True))
def _mk_layout(self): def _mk_layout(self):
return Div( return Div(
self.entry_selector,
Div(
self._mk_input(), self._mk_input(),
self._mk_properties(), self._mk_properties(),
self._mk_output(), self._mk_output(),
cls="flex", cls="flex",
style="height: 100%; width: 100%; flex: 1;" style="height: 100%; width: 100%; flex: 1;"
) )
)
def _mk_input(self, content=None, oob=False): def _mk_input(self):
return Div( return Div(
content, "Input",
id=f"pi_{self._id}", id=f"pi_{self._id}",
style=f"width: {self.layout.input_width}px;", style=f"width: {self.layout.input_width}px;",
cls="wkf-properties-input", cls="wkf-properties-input"
hx_swap_oob=f'true' if oob else None,
) )
def _mk_output(self, content=None, oob=False): def _mk_output(self):
return Div( return Div(
content, "Output",
id=f"po_{self._id}", id=f"po_{self._id}",
style=f"width: {self.layout.output_width}px;", style=f"width: {self.layout.output_width}px;",
cls="wkf-properties-output", cls="wkf-properties-output"
hx_swap_oob=f'true' if oob else None,
) )
def _mk_properties(self): def _mk_properties(self):

View File

@@ -53,7 +53,6 @@ class WorkflowPlayer(BaseComponent):
self.runtime_states = {} self.runtime_states = {}
self.global_error = None self.global_error = None
self.has_error = False self.has_error = False
self.nb_items = 0
def set_boundaries(self, boundaries: dict): def set_boundaries(self, boundaries: dict):
self._datagrid.set_boundaries(boundaries) self._datagrid.set_boundaries(boundaries)
@@ -94,14 +93,11 @@ class WorkflowPlayer(BaseComponent):
self.global_error = engine.global_error self.global_error = engine.global_error
else: # loop through the components and update the runtime states else: # loop through the components and update the runtime states
self.nb_items = engine.nb_items
for component in sorted_components: for component in sorted_components:
runtime_state = self.runtime_states.get(component.id) runtime_state = self.runtime_states.get(component.id)
if component.id not in engine.errors: if component.id not in engine.errors:
runtime_state.state = ComponentState.SUCCESS runtime_state.state = ComponentState.SUCCESS
runtime_state.input = engine.debug[component.id]["input"]
runtime_state.output = engine.debug[component.id]["output"]
continue continue
# the component failed # the component failed
@@ -181,7 +177,7 @@ class WorkflowPlayer(BaseComponent):
# Return sorted components # Return sorted components
return [components_by_id[cid] for cid in sorted_order] return [components_by_id[cid] for cid in sorted_order]
def _get_engine(self, sorted_components) -> WorkflowEngine: def _get_engine(self, sorted_components):
# first reorder the component, according to the connection definitions # first reorder the component, according to the connection definitions
engine = WorkflowEngine() engine = WorkflowEngine()
for component in sorted_components: for component in sorted_components:

View File

@@ -48,8 +48,6 @@ class WorkflowComponentRuntimeState:
id: str id: str
state: ComponentState = ComponentState.SUCCESS state: ComponentState = ComponentState.SUCCESS
error_message: str | None = None error_message: str | None = None
input: list = None
output: list = None
@dataclass @dataclass

View File

@@ -48,9 +48,6 @@ class Expando:
return self._props.copy() return self._props.copy()
def to_dict(self, mappings: dict) -> dict: def to_dict(self, mappings: dict) -> dict:
"""
Return the information as a dictionary, with the given mappings
"""
return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None} return {prop_name: self.get(path) for path, prop_name in mappings.items() if prop_name is not None}
def __hasattr__(self, item): def __hasattr__(self, item):

View File

@@ -47,10 +47,6 @@ class InstanceManager:
return InstanceManager._instances[key] return InstanceManager._instances[key]
@staticmethod
def new(session, instance_type, **kwargs):
return InstanceManager.get(session, instance_type.create_component_id(session), instance_type, **kwargs)
@staticmethod @staticmethod
def register(session: dict | None, instance, instance_id: str = None): def register(session: dict | None, instance, instance_id: str = None):
""" """

View File

@@ -1,5 +1,7 @@
# global layout # global layout
import logging.config import logging.config
import random
from asyncio import sleep
import yaml import yaml
from fasthtml.common import * from fasthtml.common import *
@@ -53,6 +55,9 @@ links = [
Link(href="./assets/daisyui-5-themes.css", rel="stylesheet", type="text/css"), Link(href="./assets/daisyui-5-themes.css", rel="stylesheet", type="text/css"),
Script(src="./assets/tailwindcss-browser@4.js"), Script(src="./assets/tailwindcss-browser@4.js"),
# SSE
Script(src="https://unpkg.com/htmx-ext-sse@2.2.1/sse.js"),
# Old drawer layout # Old drawer layout
Script(src="./assets/DrawerLayout.js", defer=True), Script(src="./assets/DrawerLayout.js", defer=True),
Link(rel="stylesheet", href="./assets/DrawerLayout.css"), Link(rel="stylesheet", href="./assets/DrawerLayout.css"),
@@ -146,7 +151,6 @@ register_component("theme_controller", "components.themecontroller", "ThemeContr
register_component("main_layout", "components.drawerlayout", "DrawerLayoutApp") register_component("main_layout", "components.drawerlayout", "DrawerLayoutApp")
register_component("undo_redo", "components.undo_redo", "UndoRedoApp") register_component("undo_redo", "components.undo_redo", "UndoRedoApp")
register_component("tabs", "components.tabs", "TabsApp") # before repositories register_component("tabs", "components.tabs", "TabsApp") # before repositories
register_component("entryselector", "components.entryselector", "EntrySelectorApp")
register_component("applications", "components.applications", "ApplicationsApp") register_component("applications", "components.applications", "ApplicationsApp")
register_component("repositories", "components.repositories", "RepositoriesApp") register_component("repositories", "components.repositories", "RepositoriesApp")
register_component("workflows", "components.workflows", "WorkflowsApp") register_component("workflows", "components.workflows", "WorkflowsApp")
@@ -215,7 +219,7 @@ app, rt = fast_app(
# ------------------------- # -------------------------
# Profiling middleware # Profiling middleware
# ------------------------- # -------------------------
# @app.middleware("http") @app.middleware("http")
async def timing_middleware(request, call_next): async def timing_middleware(request, call_next):
import time import time
start_total = time.perf_counter() start_total = time.perf_counter()
@@ -272,6 +276,31 @@ def get(session):
DrawerLayoutOld(pages),) DrawerLayoutOld(pages),)
shutdown_event = signal_shutdown()
async def number_generator():
while True: # not shutdown_event.is_set():
data = Article(random.randint(1, 100))
print(data)
yield sse_message(data)
await sleep(1)
@rt("/sse")
def get():
return Titled("SSE Random Number Generator",
P("Generate pairs of random numbers, as the list grows scroll downwards."),
Div(hx_ext="sse",
sse_connect="/number-stream",
hx_swap="beforeend show:bottom",
sse_swap="message"))
@rt("/number-stream")
async def get(): return EventStream(number_generator())
@rt('/toasting') @rt('/toasting')
def get(session): def get(session):
# Normally one toast is enough, this allows us to see # Normally one toast is enough, this allows us to see

View File

@@ -1,7 +1,6 @@
import ast import ast
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Generator from typing import Any, Generator
from components.admin.admin_db_manager import AdminDbManager from components.admin.admin_db_manager import AdminDbManager
@@ -12,14 +11,6 @@ from core.utils import UnreferencedNamesVisitor
from utils.Datahelper import DataHelper from utils.Datahelper import DataHelper
@dataclass
class WorkflowPayload:
processor_name: str
component_id: str
item_linkage_id: int
item: Any
class DataProcessorError(Exception): class DataProcessorError(Exception):
def __init__(self, component_id, error): def __init__(self, component_id, error):
self.component_id = component_id self.component_id = component_id
@@ -155,56 +146,35 @@ class WorkflowEngine:
self.has_error = False self.has_error = False
self.global_error = None self.global_error = None
self.errors = {} self.errors = {}
self.debug = {}
self.nb_items = -1
def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine': def add_processor(self, processor: DataProcessor) -> 'WorkflowEngine':
"""Add a data processor to the pipeline.""" """Add a data processor to the pipeline."""
self.processors.append(processor) self.processors.append(processor)
return self return self
def _process_single_item(self, item_linkage_id, item: Any, processor_index: int = 0) -> Generator[Any, None, None]: def _process_single_item(self, item: Any, processor_index: int = 0) -> Generator[Any, None, None]:
"""Process a single item through the remaining processors.""" """Process a single item through the remaining processors."""
if processor_index >= len(self.processors): if processor_index >= len(self.processors):
yield item yield item
return return
processor = self.processors[processor_index] processor = self.processors[processor_index]
if not processor.component_id in self.debug:
self.debug[processor.component_id] = {"input": [], "output": []}
self.debug[processor.component_id]["input"].append(WorkflowPayload(
processor_name=processor.__class__.__name__,
component_id=processor.component_id,
item_linkage_id=item_linkage_id,
item=item))
# Process the item through the current processor # Process the item through the current processor
for processed_item in processor.process(item): for processed_item in processor.process(item):
self.debug[processor.component_id]["output"].append(WorkflowPayload(
processor_name=processor.__class__.__name__,
component_id=processor.component_id,
item_linkage_id=item_linkage_id,
item=processed_item))
# Recursively process through remaining processors # Recursively process through remaining processors
yield from self._process_single_item(item_linkage_id, processed_item, processor_index + 1) yield from self._process_single_item(processed_item, processor_index + 1)
def run(self) -> Generator[Any, None, None]: def run(self) -> Generator[Any, None, None]:
""" """
Run the workflow pipeline and yield results one by one. Run the workflow pipeline and yield results one by one.
The first processor must be a DataProducer. The first processor must be a DataProducer.
""" """
self.debug.clear()
if not self.processors: if not self.processors:
self.has_error = False self.has_error = False
self.global_error = "No processors in the pipeline" self.global_error = "No processors in the pipeline"
self.nb_items = -1
raise ValueError(self.global_error) raise ValueError(self.global_error)
self.nb_items = 0
first_processor = self.processors[0] first_processor = self.processors[0]
if not isinstance(first_processor, DataProducer): if not isinstance(first_processor, DataProducer):
@@ -212,16 +182,8 @@ class WorkflowEngine:
self.global_error = "First processor must be a DataProducer" self.global_error = "First processor must be a DataProducer"
raise ValueError(self.global_error) raise ValueError(self.global_error)
self.debug[first_processor.component_id] = {"input": [], "output": []} for item in first_processor.process(None):
yield from self._process_single_item(item, 1)
for item_linkage_id, item in enumerate(first_processor.process(None)):
self.nb_items += 1
self.debug[first_processor.component_id]["output"].append(WorkflowPayload(
processor_name=first_processor.__class__.__name__,
component_id=first_processor.component_id,
item_linkage_id=item_linkage_id,
item=item))
yield from self._process_single_item(item_linkage_id, item, 1)
def run_to_list(self) -> list[Any]: def run_to_list(self) -> list[Any]:
""" """