146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
import json
|
|
import logging
|
|
|
|
from fasthtml.components import Script, Div
|
|
|
|
from myfasthtml.core.dbmanager import DbObject
|
|
from myfasthtml.core.instances import MultipleInstance
|
|
|
|
logger = logging.getLogger("VisNetwork")
|
|
|
|
|
|
class VisNetworkState(DbObject):
|
|
def __init__(self, owner):
|
|
super().__init__(owner)
|
|
with self.initializing():
|
|
# persisted in DB
|
|
self.nodes: list = []
|
|
self.edges: list = []
|
|
self.options: dict = {
|
|
"autoResize": True,
|
|
"interaction": {
|
|
"dragNodes": True,
|
|
"zoomView": True,
|
|
"dragView": True,
|
|
},
|
|
"physics": {"enabled": True}
|
|
}
|
|
self.events_handlers: dict = {} # {event_name: command_url}
|
|
|
|
|
|
class VisNetwork(MultipleInstance):
|
|
def __init__(self, parent, _id=None, nodes=None, edges=None, options=None, events_handlers=None):
|
|
super().__init__(parent, _id=_id)
|
|
logger.debug(f"VisNetwork created with id: {self._id}")
|
|
|
|
# possible events (expected in snake_case
|
|
# - select_node → selectNode
|
|
# - select → select
|
|
# - click → click
|
|
# - double_click → doubleClick
|
|
|
|
self._state = VisNetworkState(self)
|
|
|
|
# Convert Commands to URLs
|
|
handlers_htmx_options = {
|
|
event_name: command.ajax_htmx_options()
|
|
for event_name, command in events_handlers.items()
|
|
} if events_handlers else {}
|
|
|
|
self._update_state(nodes, edges, options, handlers_htmx_options)
|
|
|
|
def _update_state(self, nodes, edges, options, events_handlers=None):
|
|
logger.debug(f"Updating VisNetwork state with {nodes=}, {edges=}, {options=}, {events_handlers=}")
|
|
if not nodes and not edges and not options and not events_handlers:
|
|
return
|
|
|
|
state = self._state.copy()
|
|
if nodes is not None:
|
|
state.nodes = nodes
|
|
if edges is not None:
|
|
state.edges = edges
|
|
if options is not None:
|
|
state.options = options
|
|
if events_handlers is not None:
|
|
state.events_handlers = events_handlers
|
|
|
|
self._state.update(state)
|
|
|
|
def add_to_options(self, **kwargs):
|
|
logger.debug(f"add_to_options: {kwargs=}")
|
|
new_options = self._state.options.copy() | kwargs
|
|
self._update_state(None, None, new_options)
|
|
return self
|
|
|
|
def render(self):
|
|
|
|
# Serialize nodes and edges to JSON
|
|
# This preserves all properties (color, shape, size, etc.) that are present
|
|
js_nodes = ",\n ".join(
|
|
json.dumps(node) for node in self._state.nodes
|
|
)
|
|
js_edges = ",\n ".join(
|
|
json.dumps(edge) for edge in self._state.edges
|
|
)
|
|
|
|
# Convert Python options to JS
|
|
js_options = json.dumps(self._state.options, indent=2)
|
|
|
|
# Map Python event names to vis-network event names
|
|
event_name_map = {
|
|
"select_node": "selectNode",
|
|
"select": "select",
|
|
"click": "click",
|
|
"double_click": "doubleClick"
|
|
}
|
|
|
|
# Generate event handlers JavaScript
|
|
event_handlers_js = ""
|
|
for event_name, command_htmx_options in self._state.events_handlers.items():
|
|
vis_event_name = event_name_map.get(event_name, event_name)
|
|
event_handlers_js += f"""
|
|
network.on('{vis_event_name}', function(params) {{
|
|
const event_data = {{
|
|
event_name: '{event_name}',
|
|
nodes: params.nodes,
|
|
edges: params.edges,
|
|
pointer: params.pointer
|
|
}};
|
|
htmx.ajax('POST', '{command_htmx_options['url']}', {{
|
|
values: {{event_data: JSON.stringify(event_data)}},
|
|
target: '{command_htmx_options['target']}',
|
|
swap: '{command_htmx_options['swap']}'
|
|
}});
|
|
}});
|
|
"""
|
|
|
|
return (
|
|
Div(
|
|
id=self._id,
|
|
cls="mf-vis",
|
|
),
|
|
|
|
# The script initializing Vis.js
|
|
Script(f"""
|
|
(function() {{
|
|
const container = document.getElementById("{self._id}");
|
|
const nodes = new vis.DataSet([
|
|
{js_nodes}
|
|
]);
|
|
const edges = new vis.DataSet([
|
|
{js_edges}
|
|
]);
|
|
const data = {{
|
|
nodes: nodes,
|
|
edges: edges
|
|
}};
|
|
const options = {js_options};
|
|
const network = new vis.Network(container, data, options);
|
|
{event_handlers_js}
|
|
}})();
|
|
""")
|
|
)
|
|
|
|
def __ft__(self):
|
|
return self.render()
|