Another implementation of undo/redo

This commit is contained in:
2025-07-31 22:54:09 +02:00
parent 72f5f30da6
commit 37c91d0d5d
13 changed files with 228 additions and 94 deletions

View File

@@ -14,7 +14,7 @@ python main.py
```shell ```shell
docker-compose up -d docker-compose up -d
``` ```
The application will be accessible on port 8000 (or whatever port you configured). The application will be accessible on port 8001 (if the docker compose file was not changed !).
2. **Initialize the Mistral model** (first run): 2. **Initialize the Mistral model** (first run):
```shell ```shell

View File

@@ -1,30 +1,24 @@
import logging import logging
from abc import ABC, abstractmethod from dataclasses import dataclass
from fasthtml.components import * from fasthtml.components import *
from components.BaseComponent import BaseComponentSingleton from components.BaseComponent import BaseComponentSingleton
from components.undo_redo.assets.icons import icon_redo, icon_undo from components.undo_redo.assets.icons import icon_redo, icon_undo
from components.undo_redo.commands import UndoRedoCommandManager from components.undo_redo.commands import UndoRedoCommandManager
from components.undo_redo.constants import UNDO_REDO_INSTANCE_ID from components.undo_redo.constants import UNDO_REDO_INSTANCE_ID, UndoRedoAttrs
from components_helpers import mk_icon, mk_tooltip from components_helpers import mk_icon, mk_tooltip
logger = logging.getLogger("UndoRedoApp") logger = logging.getLogger("UndoRedoApp")
class CommandHistory(ABC): @dataclass
def __init__(self, name, desc, owner): class CommandHistory:
self.name = name attrs: UndoRedoAttrs
self.desc = desc digest: str | None # digest to remember
self.owner = owner entry: str # digest to remember
key: str # key
@abstractmethod path: str # path within the key if only on subitem needs to be updated
def undo(self):
pass
@abstractmethod
def redo(self):
pass
class UndoRedo(BaseComponentSingleton): class UndoRedo(BaseComponentSingleton):
@@ -35,35 +29,73 @@ class UndoRedo(BaseComponentSingleton):
self.index = -1 self.index = -1
self.history = [] self.history = []
self._commands = UndoRedoCommandManager(self) self._commands = UndoRedoCommandManager(self)
self._db_engine = settings_manager.get_db_engine()
def push(self, command: CommandHistory): def snapshot(self, undo_redo_attrs: UndoRedoAttrs, entry, key, path=None):
self.history = self.history[:self.index + 1] digest = self._settings_manager.get_digest(self._session, entry) # get the current digest (the last one)
# init the history if this is the first call
if len(self.history) == 0:
digest_history = self._settings_manager.history(self._session, entry, digest, 2)
command = CommandHistory(undo_redo_attrs,
digest_history[1] if len(digest_history) > 1 else None,
entry,
key,
path)
self.history.append(command)
self.index = 0
command = CommandHistory(undo_redo_attrs, digest, entry, key, path)
self.history = self.history[:self.index + 1] #
self.history.append(command) self.history.append(command)
self.index += 1 self.index = len(self.history) - 1
def undo(self): def undo(self):
logger.debug(f"Undo command") logger.debug(f"Undo command")
if self.index < 0 : if self.index < 1:
logger.debug(f" No command to undo.") logger.debug(f" No command to undo.")
return self return self
command = self.history[self.index] current = self.history[self.index]
logger.debug(f" Undoing command {command.name} ({command.desc})") current_state = self._settings_manager.load(self._session, None, digest=current.digest)
res = command.undo()
previous = self.history[self.index - 1]
previous_state = self._settings_manager.load(self._session, None, digest=previous.digest)
# reapply the state
current_state[current.key] = previous_state[current.key]
self._settings_manager.save(self._session, current.entry, current_state)
self.index -= 1 self.index -= 1
return self, res
if current.attrs.on_undo is not None:
return self, current.attrs.on_undo()
else:
return self
def redo(self): def redo(self):
logger.debug("Redo command") logger.debug(f"Redo command")
if self.index == len(self.history) - 1: if self.index >= len(self.history) - 1:
logger.debug(f" No command to redo.") logger.debug(f" No command to undo.")
return self return self
current = self.history[self.index]
current_state = self._settings_manager.load(self._session, None, digest=current.digest)
next_ = self.history[self.index + 1]
next_state = self._settings_manager.load(self._session, None, digest=next_.digest)
# reapply the state
current_state[current.key] = next_state[current.key]
self._settings_manager.save(self._session, current.entry, current_state)
self.index += 1 self.index += 1
command = self.history[self.index]
logger.debug(f" Redoing command {command.name} ({command.desc})") if current.attrs.on_undo is not None:
res = command.redo() return self, current.attrs.on_redo()
return self, res else:
return self
def refresh(self): def refresh(self):
return self.__ft__(oob=True) return self.__ft__(oob=True)
@@ -83,7 +115,7 @@ class UndoRedo(BaseComponentSingleton):
return mk_tooltip(mk_icon(icon_undo, return mk_tooltip(mk_icon(icon_undo,
size=24, size=24,
**self._commands.undo()), **self._commands.undo()),
f"Undo '{command.name}'.") f"Undo '{command.attrs.name}'.")
else: else:
return mk_tooltip(mk_icon(icon_undo, return mk_tooltip(mk_icon(icon_undo,
size=24, size=24,
@@ -97,7 +129,7 @@ class UndoRedo(BaseComponentSingleton):
return mk_tooltip(mk_icon(icon_redo, return mk_tooltip(mk_icon(icon_redo,
size=24, size=24,
**self._commands.redo()), **self._commands.redo()),
f"Redo '{command.name}'.") f"Redo '{command.attrs.name}'.")
else: else:
return mk_tooltip(mk_icon(icon_redo, return mk_tooltip(mk_icon(icon_redo,
size=24, size=24,
@@ -106,7 +138,7 @@ class UndoRedo(BaseComponentSingleton):
"Nothing to redo.") "Nothing to redo.")
def _can_undo(self): def _can_undo(self):
return self.index >= 0 return self.index >= 1
def _can_redo(self): def _can_redo(self):
return self.index < len(self.history) - 1 return self.index < len(self.history) - 1

View File

@@ -1,3 +1,6 @@
from dataclasses import dataclass
from typing import Callable
UNDO_REDO_INSTANCE_ID = "__UndoRedo__" UNDO_REDO_INSTANCE_ID = "__UndoRedo__"
ROUTE_ROOT = "/undo" ROUTE_ROOT = "/undo"
@@ -5,4 +8,16 @@ ROUTE_ROOT = "/undo"
class Routes: class Routes:
Undo = "/undo" Undo = "/undo"
Redo = "/redo" Redo = "/redo"
@dataclass
class UndoRedoAttrs:
name: str
desc: str = None
on_undo: Callable = None
on_redo: Callable = None
def __post_init__(self):
if self.on_redo is None:
self.on_redo = self.on_undo

View File

@@ -129,14 +129,20 @@ def post(session, _id: str, component_id: str, event_name: str, details: dict):
@rt(Routes.PlayWorkflow) @rt(Routes.PlayWorkflow)
def post(session, _id: str, tab_boundaries: str): def post(session, _id: str, tab_boundaries: str):
logger.debug( logger.debug(f"Entering {Routes.PlayWorkflow} with args {debug_session(session)}, {_id=}")
f"Entering {Routes.PlayWorkflow} with args {debug_session(session)}, {_id=}")
instance = InstanceManager.get(session, _id) instance = InstanceManager.get(session, _id)
return instance.play_workflow(json.loads(tab_boundaries)) return instance.play_workflow(json.loads(tab_boundaries))
@rt(Routes.StopWorkflow) @rt(Routes.StopWorkflow)
def post(session, _id: str): def post(session, _id: str):
logger.debug( logger.debug(f"Entering {Routes.StopWorkflow} with args {debug_session(session)}, {_id=}")
f"Entering {Routes.StopWorkflow} with args {debug_session(session)}, {_id=}")
instance = InstanceManager.get(session, _id) instance = InstanceManager.get(session, _id)
return instance.stop_workflow() return instance.stop_workflow()
@rt(Routes.Refresh)
def post(session, _id: str):
logger.debug(f"Entering {Routes.Refresh} with args {debug_session(session)}, {_id=}")
instance = InstanceManager.get(session, _id)
return instance.refresh()

View File

@@ -23,3 +23,6 @@ icon_pause_circle = NotStr(
# fluent RecordStop20Regular # fluent RecordStop20Regular
icon_stop_circle = NotStr( icon_stop_circle = NotStr(
"""<svg name="stop" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 20 20"><g fill="none"><path d="M10 3a7 7 0 1 0 0 14a7 7 0 0 0 0-14zm-8 7a8 8 0 1 1 16 0a8 8 0 0 1-16 0zm5-2a1 1 0 0 1 1-1h4a1 1 0 0 1 1 1v4a1 1 0 0 1-1 1H8a1 1 0 0 1-1-1V8z" fill="currentColor"></path></g></svg>""") """<svg name="stop" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 20 20"><g fill="none"><path d="M10 3a7 7 0 1 0 0 14a7 7 0 0 0 0-14zm-8 7a8 8 0 1 1 16 0a8 8 0 0 1-16 0zm5-2a1 1 0 0 1 1-1h4a1 1 0 0 1 1 1v4a1 1 0 0 1-1 1H8a1 1 0 0 1-1-1V8z" fill="currentColor"></path></g></svg>""")
# fluent ArrowClockwise20Regular
icon_refresh = NotStr("""<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 20 20"><g fill="none"><path d="M3.066 9.05a7 7 0 0 1 12.557-3.22l.126.17H12.5a.5.5 0 1 0 0 1h4a.5.5 0 0 0 .5-.5V2.502a.5.5 0 0 0-1 0v2.207a8 8 0 1 0 1.986 4.775a.5.5 0 0 0-.998.064A7 7 0 1 1 3.066 9.05z" fill="currentColor"></path></g></svg>""")

View File

@@ -1,23 +1,6 @@
from components.BaseCommandManager import BaseCommandManager from components.BaseCommandManager import BaseCommandManager
from components.undo_redo.components.UndoRedo import CommandHistory
from components.workflows.constants import Routes, ROUTE_ROOT from components.workflows.constants import Routes, ROUTE_ROOT
class AddConnectorCommand(CommandHistory):
def __init__(self, owner, connector):
super().__init__("Add connector", "Add connector", owner)
self.connector = connector
def undo(self):
del self.owner.get_state().components[self.connector.id]
self.owner.get_db().save_state(self.owner.get_key(), self.owner.get_state()) # update db
return self.owner.refresh_designer(True)
def redo(self, oob=True):
self.owner.get_state().components[self.connector.id] = self.connector
self.owner.get_db().save_state(self.owner.get_key(), self.owner.get_state()) # update db
return self.owner.refresh_designer(oob)
class WorkflowsCommandManager(BaseCommandManager): class WorkflowsCommandManager(BaseCommandManager):
def __init__(self, owner): def __init__(self, owner):
@@ -108,6 +91,13 @@ class WorkflowDesignerCommandManager(BaseCommandManager):
"hx-swap": "outerHTML", "hx-swap": "outerHTML",
"hx-vals": f'js:{{"_id": "{self._id}"}}', "hx-vals": f'js:{{"_id": "{self._id}"}}',
} }
def refresh(self):
return {
"hx_post": f"{ROUTE_ROOT}{Routes.Refresh}",
"hx-swap": "none",
"hx-vals": f'js:{{"_id": "{self._id}"}}',
}
class WorkflowPlayerCommandManager(BaseCommandManager): class WorkflowPlayerCommandManager(BaseCommandManager):

View File

@@ -6,8 +6,9 @@ from fasthtml.xtend import Script
from assets.icons import icon_error from assets.icons import icon_error
from components.BaseComponent import BaseComponent from components.BaseComponent import BaseComponent
from components.workflows.assets.icons import icon_play, icon_pause, icon_stop from components.undo_redo.constants import UndoRedoAttrs
from components.workflows.commands import WorkflowDesignerCommandManager, AddConnectorCommand from components.workflows.assets.icons import icon_play, icon_pause, icon_stop, icon_refresh
from components.workflows.commands import WorkflowDesignerCommandManager
from components.workflows.components.WorkflowPlayer import WorkflowPlayer from components.workflows.components.WorkflowPlayer import WorkflowPlayer
from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes from components.workflows.constants import WORKFLOW_DESIGNER_INSTANCE_ID, ProcessorTypes
from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \ from components.workflows.db_management import WorkflowsDesignerSettings, WorkflowComponent, \
@@ -63,6 +64,7 @@ class WorkflowDesigner(BaseComponent):
self._key = key self._key = key
self._designer_settings = designer_settings self._designer_settings = designer_settings
self._db = WorkflowsDesignerDbManager(session, settings_manager) self._db = WorkflowsDesignerDbManager(session, settings_manager)
self._undo_redo = ComponentsInstancesHelper.get_undo_redo(session)
self._state = self._db.load_state(key) self._state = self._db.load_state(key)
self._boundaries = boundaries self._boundaries = boundaries
self.commands = WorkflowDesignerCommandManager(self) self.commands = WorkflowDesignerCommandManager(self)
@@ -96,6 +98,13 @@ class WorkflowDesigner(BaseComponent):
def refresh_properties(self, oob=False): def refresh_properties(self, oob=False):
return self._mk_properties(oob) return self._mk_properties(oob)
def refresh(self):
return self.__ft__(oob=True)
def refresh_state(self):
self._state = self._db.load_state(self._key)
return self.__ft__(oob=True)
def add_component(self, component_type, x, y): def add_component(self, component_type, x, y):
self._state.component_counter += 1 self._state.component_counter += 1
@@ -111,24 +120,19 @@ class WorkflowDesigner(BaseComponent):
description=info["description"], description=info["description"],
properties={"processor_name": PROCESSOR_TYPES[component_type][0]} properties={"processor_name": PROCESSOR_TYPES[component_type][0]}
) )
command = AddConnectorCommand(self, component)
undo_redo = ComponentsInstancesHelper.get_undo_redo(self._session)
#undo_redo.push(command)
self._state.components[component_id] = component self._state.components[component_id] = component
self._db.save_state(self._key, self._state) # update db
undo_redo.snapshot("add_component") undo_redo_attrs = UndoRedoAttrs(f"Add Component '{component_type}'", on_undo=lambda: self.refresh_state())
return command.redo(), undo_redo.refresh() self._db.save_state(self._key, self._state, undo_redo_attrs) # update db
# self._state.components[component_id] = component
# self._db.save_state(self._key, self._state) # update db return self.refresh_designer(), self._undo_redo.refresh()
# return self.refresh_designer()
def move_component(self, component_id, x, y): def move_component(self, component_id, x, y):
if component_id in self._state.components: if component_id in self._state.components:
self._state.selected_component_id = component_id self._state.selected_component_id = component_id
self._state.components[component_id].x = int(x) self._state.components[component_id].x = int(x)
self._state.components[component_id].y = int(y) self._state.components[component_id].y = int(y)
self._db.save_state(self._key, self._state) # update db self._db.save_state(self._key, self._state, ) # update db
return self.refresh_designer(), self.refresh_properties(True) return self.refresh_designer(), self.refresh_properties(True)
@@ -247,12 +251,13 @@ class WorkflowDesigner(BaseComponent):
def get_workflow_connections(self): def get_workflow_connections(self):
return self._state.connections return self._state.connections
def __ft__(self): def __ft__(self, oob=False):
return Div( return Div(
H1(f"{self._designer_settings.workflow_name}", cls="text-xl font-bold"), H1(f"{self._designer_settings.workflow_name}", cls="text-xl font-bold"),
P("Drag components from the toolbox to the canvas to create your workflow.", cls="text-sm mb-6"), P("Drag components from the toolbox to the canvas to create your workflow.", cls="text-sm mb-6"),
Div( Div(
self._mk_media(), self._mk_media(),
#self._mk_refresh_button(),
self._mk_error_message(), self._mk_error_message(),
cls="flex mb-2", cls="flex mb-2",
id=f"t_{self._id}" id=f"t_{self._id}"
@@ -263,6 +268,7 @@ class WorkflowDesigner(BaseComponent):
Script(f"bindWorkflowDesigner('{self._id}');"), Script(f"bindWorkflowDesigner('{self._id}');"),
**apply_boundaries(self._boundaries), **apply_boundaries(self._boundaries),
id=f"{self._id}", id=f"{self._id}",
hx_swap_oob='true' if oob else None,
) )
def _mk_connection_svg(self, conn: Connection): def _mk_connection_svg(self, conn: Connection):
@@ -384,6 +390,9 @@ class WorkflowDesigner(BaseComponent):
cls=f"media-controls flex m-2" cls=f"media-controls flex m-2"
) )
def _mk_refresh_button(self):
return mk_icon(icon_refresh, **self.commands.refresh())
def _mk_error_message(self): def _mk_error_message(self):
if not self._error_message: if not self._error_message:
return Div() return Div()

View File

@@ -32,4 +32,5 @@ class Routes:
PlayWorkflow = "/play-workflow" PlayWorkflow = "/play-workflow"
PauseWorkflow = "/pause-workflow" PauseWorkflow = "/pause-workflow"
StopWorkflow = "/stop-workflow" StopWorkflow = "/stop-workflow"
Refresh = "/refresh"

View File

@@ -2,10 +2,12 @@ import enum
import logging import logging
from dataclasses import dataclass, field from dataclasses import dataclass, field
from components.undo_redo.constants import UndoRedoAttrs
from components.workflows.constants import WORKFLOWS_DB_ENTRY, WORKFLOW_DESIGNER_DB_ENTRY, \ from components.workflows.constants import WORKFLOWS_DB_ENTRY, WORKFLOW_DESIGNER_DB_ENTRY, \
WORKFLOW_DESIGNER_DB_SETTINGS_ENTRY, WORKFLOW_DESIGNER_DB_STATE_ENTRY WORKFLOW_DESIGNER_DB_SETTINGS_ENTRY, WORKFLOW_DESIGNER_DB_STATE_ENTRY
from core.settings_management import SettingsManager from core.settings_management import SettingsManager
from core.utils import make_safe_id from core.utils import make_safe_id
from utils.ComponentsInstancesHelper import ComponentsInstancesHelper
logger = logging.getLogger("WorkflowsSettings") logger = logging.getLogger("WorkflowsSettings")
@@ -158,6 +160,7 @@ class WorkflowsDesignerDbManager:
def __init__(self, session: dict, settings_manager: SettingsManager): def __init__(self, session: dict, settings_manager: SettingsManager):
self._session = session self._session = session
self._settings_manager = settings_manager self._settings_manager = settings_manager
self._undo_redo = ComponentsInstancesHelper.get_undo_redo(session)
@staticmethod @staticmethod
def _get_db_entry(key): def _get_db_entry(key):
@@ -169,11 +172,17 @@ class WorkflowsDesignerDbManager:
WORKFLOW_DESIGNER_DB_SETTINGS_ENTRY, WORKFLOW_DESIGNER_DB_SETTINGS_ENTRY,
settings) settings)
def save_state(self, key: str, state: WorkflowsDesignerState): def save_state(self, key: str, state: WorkflowsDesignerState, undo_redo_attrs: UndoRedoAttrs = None):
db_entry = self._get_db_entry(key)
self._settings_manager.put(self._session, self._settings_manager.put(self._session,
self._get_db_entry(key), db_entry,
WORKFLOW_DESIGNER_DB_STATE_ENTRY, WORKFLOW_DESIGNER_DB_STATE_ENTRY,
state) state)
if undo_redo_attrs is not None:
self._undo_redo.snapshot(undo_redo_attrs,
db_entry,
WORKFLOW_DESIGNER_DB_STATE_ENTRY)
def save_all(self, key: str, settings: WorkflowsDesignerSettings = None, state: WorkflowsDesignerState = None): def save_all(self, key: str, settings: WorkflowsDesignerSettings = None, state: WorkflowsDesignerState = None):
items = {} items = {}

View File

@@ -271,6 +271,42 @@ class DbEngine:
except KeyError: except KeyError:
raise DbException(f"Key '{key}' not found in entry '{entry}'") raise DbException(f"Key '{key}' not found in entry '{entry}'")
def history(self, user_id, entry, digest=None, max_items=1000):
"""
Gives the current digest and all its ancestors
:param user_id:
:param entry:
:param digest:
:param max_items:
:return:
"""
with self.lock:
logger.info(f"History for {user_id=}, {entry=}, {digest=}")
digest_to_use = digest or self._get_entry_digest(user_id, entry)
logger.debug(f"Using digest {digest_to_use}.")
count = 0
history = []
while True:
if count >= max_items or digest_to_use is None:
break
history.append(digest_to_use)
count += 1
try:
target_file = self._get_obj_path(user_id, digest_to_use)
with open(target_file, 'r', encoding='utf-8') as file:
as_dict = json.load(file)
digest_to_use = as_dict[TAG_PARENT][0]
except FileNotFoundError:
break
return history
def debug_root(self): def debug_root(self):
""" """
Lists all folders in the root directory Lists all folders in the root directory
@@ -312,7 +348,7 @@ class DbEngine:
return [] return []
return [f for f in os.listdir(self.root) if os.path.isdir(os.path.join(self.root, f)) and f != 'refs'] return [f for f in os.listdir(self.root) if os.path.isdir(os.path.join(self.root, f)) and f != 'refs']
def debug_get_digest(self, user_id, entry): def get_digest(self, user_id, entry):
return self._get_entry_digest(user_id, entry) return self._get_entry_digest(user_id, entry)
def _serialize(self, obj): def _serialize(self, obj):

View File

@@ -98,10 +98,10 @@ class SettingsManager:
user_id, user_email = self._get_user(session) user_id, user_email = self._get_user(session)
return self._db_engine.save(user_id, user_email, entry, obj) return self._db_engine.save(user_id, user_email, entry, obj)
def load(self, session: dict, entry: str, default=NoDefault): def load(self, session: dict, entry: str, digest=None, default=NoDefault):
user_id, _ = self._get_user(session) user_id, _ = self._get_user(session)
try: try:
return self._db_engine.load(user_id, entry) return self._db_engine.load(user_id, entry, digest)
except DbException: except DbException:
return default return default
@@ -128,6 +128,14 @@ class SettingsManager:
return self._db_engine.exists(user_id, entry) return self._db_engine.exists(user_id, entry)
def get_digest(self, session: dict, entry: str):
user_id, _ = self._get_user(session)
return self._db_engine.get_digest(user_id, entry)
def history(self, session, entry, digest=None, max_items=1000):
user_id, _ = self._get_user(session)
return self._db_engine.history(user_id, entry, digest, max_items)
def get_db_engine(self): def get_db_engine(self):
return self._db_engine return self._db_engine
@@ -177,7 +185,7 @@ class GenericDbManager:
if key.startswith("_"): if key.startswith("_"):
super().__setattr__(key, value) super().__setattr__(key, value)
settings = self._settings_manager.load(self._session, self._obj_entry, self._obj_type()) settings = self._settings_manager.load(self._session, self._obj_entry, default=self._obj_type())
if not (hasattr(settings, key)): if not (hasattr(settings, key)):
raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{key}'.") raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{key}'.")
@@ -188,7 +196,7 @@ class GenericDbManager:
if item.startswith("_"): if item.startswith("_"):
return super().__getattribute__(item) return super().__getattribute__(item)
settings = self._settings_manager.load(self._session, self._obj_entry, self._obj_type()) settings = self._settings_manager.load(self._session, self._obj_entry, default=self._obj_type())
if not (hasattr(settings, item)): if not (hasattr(settings, item)):
raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{item}'.") raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{item}'.")
@@ -250,7 +258,7 @@ class NestedSettingsManager:
self._settings_manager.save(self._session, self._obj_entry, settings) self._settings_manager.save(self._session, self._obj_entry, settings)
def _get_settings_and_object(self): def _get_settings_and_object(self):
settings = self._settings_manager.load(self._session, self._obj_entry, self._obj_type()) settings = self._settings_manager.load(self._session, self._obj_entry, default=self._obj_type())
if not hasattr(settings, self._obj_attribute): if not hasattr(settings, self._obj_attribute):
raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{self._obj_attribute}'.") raise AttributeError(f"Settings '{self._obj_entry}' has no attribute '{self._obj_attribute}'.")

View File

@@ -235,3 +235,40 @@ def test_put_many_save_only_if_necessary(engine):
entry_content = engine.load(FAKE_USER_ID, "MyEntry") entry_content = engine.load(FAKE_USER_ID, "MyEntry")
assert entry_content[TAG_PARENT] == [None] # Still None, nothing was save assert entry_content[TAG_PARENT] == [None] # Still None, nothing was save
def test_i_can_retrieve_history_using_put(engine):
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", DummyObj(1, "a", False))
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", DummyObj(2, "a", False))
engine.put(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", "key1", DummyObj(3, "a", False))
history = engine.history(FAKE_USER_ID, "MyEntry")
assert len(history) == 3
v0 = engine.load(FAKE_USER_ID, "MyEntry", history[0])
v1 = engine.load(FAKE_USER_ID, "MyEntry", history[1])
v2 = engine.load(FAKE_USER_ID, "MyEntry", history[2])
assert v0["key1"] == DummyObj(3, "a", False)
assert v1["key1"] == DummyObj(2, "a", False)
assert v2["key1"] == DummyObj(1, "a", False)
assert v2[TAG_PARENT] == [None]
def test_i_can_retrieve_history_using_save(engine):
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", {"key1" : DummyObj(1, "a", False)})
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", {"key1" : DummyObj(2, "a", False)})
engine.save(FAKE_USER_ID, FAKE_USER_EMAIL, "MyEntry", {"key1" : DummyObj(3, "a", False)})
history = engine.history(FAKE_USER_ID, "MyEntry")
assert len(history) == 3
v0 = engine.load(FAKE_USER_ID, "MyEntry", history[0])
v1 = engine.load(FAKE_USER_ID, "MyEntry", history[1])
v2 = engine.load(FAKE_USER_ID, "MyEntry", history[2])
assert v0["key1"] == DummyObj(3, "a", False)
assert v1["key1"] == DummyObj(2, "a", False)
assert v2["key1"] == DummyObj(1, "a", False)
assert v2[TAG_PARENT] == [None]

View File

@@ -1,25 +1,12 @@
import pytest import pytest
from fasthtml.components import Div from fasthtml.components import Div
from components.undo_redo.components.UndoRedo import UndoRedo, CommandHistory from components.undo_redo.components.UndoRedo import UndoRedo
from core.settings_management import SettingsManager, MemoryDbEngine from core.settings_management import SettingsManager, MemoryDbEngine
from helpers import matches, div_icon, Contains, DoesNotContain from helpers import matches, div_icon, Contains, DoesNotContain
from my_mocks import tabs_manager from my_mocks import tabs_manager
class UndoableCommand(CommandHistory):
def __init__(self, old_value=0, new_value=0):
super().__init__("Set new value", lambda value: f"Setting new value to {value}", None)
self.old_value = old_value
self.new_value = new_value
def undo(self):
return Div(self.old_value, hx_swap_oob="true")
def redo(self):
return Div(self.new_value, hx_swap_oob="true")
@pytest.fixture @pytest.fixture
def undo_redo(session, tabs_manager): def undo_redo(session, tabs_manager):
return UndoRedo(session, return UndoRedo(session,
@@ -100,6 +87,7 @@ def test_i_can_undo_and_redo(undo_redo):
expected = Div(2, hx_swap_oob="true") expected = Div(2, hx_swap_oob="true")
assert matches(res, expected) assert matches(res, expected)
def test_history_is_rewritten_when_pushing_a_command(undo_redo): def test_history_is_rewritten_when_pushing_a_command(undo_redo):
undo_redo.push(UndoableCommand(0, 1)) undo_redo.push(UndoableCommand(0, 1))
undo_redo.push(UndoableCommand(1, 2)) undo_redo.push(UndoableCommand(1, 2))
@@ -109,4 +97,4 @@ def test_history_is_rewritten_when_pushing_a_command(undo_redo):
undo_redo.undo() undo_redo.undo()
undo_redo.push(UndoableCommand(1, 5)) undo_redo.push(UndoableCommand(1, 5))
assert len(undo_redo.history) == 2 assert len(undo_redo.history) == 2