from unittest.mock import MagicMock import pytest from fasthtml.components import * from components.form.components.MyForm import FormField, MyForm from components.undo_redo.components.UndoRedo import UndoRedo from components.workflows.components.Workflows import Workflows from core.instance_manager import InstanceManager from core.settings_management import SettingsManager, MemoryDbEngine from helpers import matches, div_icon, search_elements_by_name, Contains from my_mocks import tabs_manager TEST_WORKFLOWS_ID = "testing_repositories_id" boundaries = {"height": 500, "width": 800} @pytest.fixture def workflows(session, tabs_manager): return Workflows(session=session, _id=TEST_WORKFLOWS_ID, settings_manager=SettingsManager(engine=MemoryDbEngine()), tabs_manager=tabs_manager) @pytest.fixture(autouse=True) def mock_undo_redo(session): # Create a mock UndoRedo instance undo_redo = MagicMock(spec=UndoRedo) # Store original get method original_get = InstanceManager.get def mock_get(sess, instance_id, *args, **kwargs): if instance_id == UndoRedo.create_component_id(sess): return undo_redo return original_get(sess, instance_id, *args, **kwargs) # Replace get method with our mock InstanceManager.get = mock_get yield undo_redo # Restore original get method after test InstanceManager.get = original_get def test_render_no_workflow(workflows): actual = workflows.__ft__() expected = Div( Div(cls="divider"), Div( Div("Workflows"), div_icon("add"), # icon to add a new workflow cls="flex" ), Div(id=f"w_{workflows.get_id()}", ), # list of workflow id=workflows.get_id(), ) assert matches(actual, expected) def test_render_with_workflows_defined(workflows): workflows.db.add_workflow("workflow 1") workflows.db.add_workflow("workflow 2") actual = workflows.__ft__() expected = Div( Div(cls="divider"), Div(), # title + icon 'Add' Div( Div("workflow 1"), Div("workflow 2"), id=f"w_{workflows.get_id()}" ), # list of workflows id=workflows.get_id(), ) assert matches(actual, expected) def test_i_can_see_selected_workflow(workflows): workflows.db.add_workflow("workflow 1") workflows.db.add_workflow("workflow 2") workflows.db.select_workflow("workflow 2") actual = workflows.__ft__() to_compare = search_elements_by_name(actual, "div", attrs={"id": f"w_{workflows.get_id()}"})[0] expected = Div( Div("workflow 1"), Div(Div("workflow 2"), cls=Contains("mmt-selected")), id=f"w_{workflows.get_id()}" ) assert matches(to_compare, expected) def test_i_can_request_for_a_new_workflow(workflows, tabs_manager): res = workflows.request_new_workflow() tabs_manager.request_new_tab_id.assert_called_once() assert "new_tab_id" in res.tabs tab_def = res.tabs["new_tab_id"] assert tab_def[0] == "Add Workflow" content = tab_def[1] assert isinstance(content, MyForm) assert content.title == "Add Workflow" assert content.fields == [FormField("name", 'Workflow Name', 'input')] def test_i_can_add_a_new_workflow(workflows, tabs_manager): res = workflows.request_new_workflow() tab_id = list(res.tabs.keys())[0] actual = workflows.add_new_workflow(tab_id, "Not relevant here", "New Workflow", boundaries) expected = ( Div( Div("New Workflow"), id=f"w_{workflows.get_id()}" ), # list of workflows Div(), # Workflow Designer embedded in the tab ) assert matches(actual, expected) # check that the workflow was added assert workflows.db.exists_workflow("New Workflow") def test_i_can_select_a_workflow(workflows): workflows.add_new_workflow("tab_id_1", "Not relevant", "workflow 1", boundaries) workflows.add_new_workflow("tab_id_2", "Not relevant", "workflow 2", boundaries) workflows.add_new_workflow("tab_id_3", "Not relevant", "workflow 3", boundaries) actual = workflows.show_workflow("workflow 2", boundaries) expected = ( Div( Div("workflow 1"), Div(Div("workflow 2"), cls=Contains("mmt-selected")), Div("workflow 3"), id=f"w_{workflows.get_id()}" ), # list of workflows Div(), # Workflow Designer embedded in the tab ) assert matches(actual, expected)