336 lines
11 KiB
Python
336 lines
11 KiB
Python
import dataclasses
|
|
from typing import Any
|
|
|
|
from fasthtml.components import *
|
|
from pandas import DataFrame
|
|
|
|
from components.BaseComponent import BaseComponent
|
|
from components.datagrid_new.components.DataGrid import DataGrid
|
|
from components.debugger.assets.icons import icon_expanded, icon_collapsed, icon_class
|
|
from components.debugger.commands import JsonViewerCommands
|
|
from components.debugger.constants import INDENT_SIZE, MAX_TEXT_LENGTH, NODE_OBJECT, NODES_KEYS_TO_NOT_EXPAND
|
|
from components_helpers import apply_boundaries
|
|
from core.serializer import TAG_OBJECT
|
|
from core.utils import get_unique_id
|
|
|
|
|
|
class FoldingMode:
|
|
COLLAPSE = "collapse"
|
|
EXPAND = "expand"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Node:
|
|
value: Any
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ValueNode(Node):
|
|
hint: str = None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ListNode(Node):
|
|
node_id: str
|
|
level: int
|
|
children: list[Node] = dataclasses.field(default_factory=list)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DictNode(Node):
|
|
node_id: str
|
|
level: int
|
|
children: dict[str, Node] = dataclasses.field(default_factory=dict)
|
|
|
|
|
|
class JsonViewerHelper:
|
|
class_string = f"mmt-jsonviewer-string"
|
|
class_bool = f"mmt-jsonviewer-bool"
|
|
class_number = f"mmt-jsonviewer-number"
|
|
class_null = f"mmt-jsonviewer-null"
|
|
class_digest = f"mmt-jsonviewer-digest"
|
|
class_object = f"mmt-jsonviewer-object"
|
|
class_dataframe = f"mmt-jsonviewer-dataframe"
|
|
|
|
@staticmethod
|
|
def is_sha256(_value):
|
|
return (isinstance(_value, str) and
|
|
len(_value) == 64 and
|
|
all(c in '0123456789abcdefABCDEF' for c in _value))
|
|
|
|
|
|
class JsonViewer(BaseComponent):
|
|
def __init__(self, session, _id, owner, user_id, data, hooks=None, key=None, boundaries=None):
|
|
super().__init__(session, _id)
|
|
self._key = key # for comparison between two jsonviewer components
|
|
self._owner = owner # debugger component
|
|
self.user_id = user_id
|
|
self.data = data
|
|
self._node_id = -1
|
|
self._boundaries = boundaries if boundaries else {"height": "600"}
|
|
self._commands = JsonViewerCommands(self)
|
|
|
|
# A little explanation on how the folding / unfolding work
|
|
# all the nodes are either fold or unfold... except if there are not
|
|
# self._folding_mode keeps the current value (it's FoldingMode.COLLAPSE or FoldingMode.EXPAND
|
|
# self._nodes_to_track keeps track of the exceptions
|
|
# The idea is to minimize the memory usage
|
|
self._folding_mode = FoldingMode.COLLAPSE
|
|
self._nodes_to_track = set() # all nodes that are expanded when _fold_mode and vice versa
|
|
|
|
self._nodes_by_id = {}
|
|
|
|
self.node = self._create_node(None, data)
|
|
|
|
# hooks are used to define specific rendering
|
|
# They are tuple (Predicate, Element to render (eg Div))
|
|
self.hooks = hooks or []
|
|
|
|
self._helper = JsonViewerHelper()
|
|
|
|
def set_data(self, data):
|
|
self.data = data
|
|
self.node = self._create_node(None, data)
|
|
|
|
def set_node_folding(self, node_id, folding):
|
|
if folding == self._folding_mode:
|
|
self._nodes_to_track.remove(node_id)
|
|
else:
|
|
self._nodes_to_track.add(node_id)
|
|
|
|
def render_node(self, node_id):
|
|
key, node = self._nodes_by_id[node_id]
|
|
return self._render_node(key, node)
|
|
|
|
def set_folding_mode(self, folding_mode):
|
|
self._folding_mode = folding_mode
|
|
self._nodes_to_track.clear()
|
|
|
|
def get_folding_mode(self):
|
|
return self._folding_mode
|
|
|
|
def get_owner(self):
|
|
return self._owner
|
|
|
|
def open_digest(self, user_id: str, digest: str):
|
|
return self._owner.db_engine_headers(user_id, digest)
|
|
|
|
def _create_node(self, key, data, level=0):
|
|
if isinstance(data, list):
|
|
node_id = self._get_next_id()
|
|
if level <= 1 and key not in NODES_KEYS_TO_NOT_EXPAND:
|
|
self._nodes_to_track.add(node_id)
|
|
node = ListNode(data, node_id, level)
|
|
self._nodes_by_id[node_id] = (key, node)
|
|
for index, item in enumerate(data):
|
|
node.children.append(self._create_node(index, item, level + 1))
|
|
|
|
elif isinstance(data, dict):
|
|
node_id = self._get_next_id()
|
|
if level <= 1 and key not in NODES_KEYS_TO_NOT_EXPAND:
|
|
self._nodes_to_track.add(node_id)
|
|
node = DictNode(data, node_id, level)
|
|
self._nodes_by_id[node_id] = (key, node)
|
|
for key, value in data.items():
|
|
node.children[key] = self._create_node(key, value, level + 1)
|
|
|
|
else:
|
|
if key == TAG_OBJECT:
|
|
hint = NODE_OBJECT
|
|
else:
|
|
hint = None
|
|
node = ValueNode(data, hint)
|
|
|
|
return node
|
|
|
|
def _must_expand(self, node):
|
|
if not isinstance(node, (ListNode, DictNode)):
|
|
return None
|
|
|
|
if self._folding_mode == FoldingMode.COLLAPSE:
|
|
return node.node_id in self._nodes_to_track
|
|
else:
|
|
return node.node_id not in self._nodes_to_track
|
|
|
|
def _mk_folding(self, node: Node, must_expand: bool | None):
|
|
if must_expand is None:
|
|
return None
|
|
|
|
return Span(icon_expanded if must_expand else icon_collapsed,
|
|
cls="icon-16-inline mmt-jsonviewer-folding",
|
|
style=f"margin-left: -{INDENT_SIZE}px;",
|
|
**self._commands.fold(node.node_id, FoldingMode.COLLAPSE if must_expand else FoldingMode.EXPAND)
|
|
)
|
|
|
|
def _get_next_id(self):
|
|
self._node_id += 1
|
|
return f"{self._id}-{self._node_id}"
|
|
|
|
def _render_value(self, key, node, must_expand):
|
|
if must_expand is False:
|
|
return Span("[...]" if isinstance(node, ListNode) else "{...}",
|
|
id=node.node_id,
|
|
**self._commands.fold(node.node_id, FoldingMode.EXPAND))
|
|
|
|
for predicate, renderer in self.hooks:
|
|
if predicate(key, node, self._helper):
|
|
return renderer(key, node, self._helper)
|
|
|
|
if isinstance(node, DictNode):
|
|
return self._render_dict(key, node)
|
|
elif isinstance(node, ListNode):
|
|
return self._render_list(key, node)
|
|
else:
|
|
data_tooltip = None
|
|
htmx_params = {}
|
|
icon = None
|
|
|
|
if isinstance(node.value, bool): # order is important bool is an int in Python !
|
|
str_value = "true" if node.value else "false"
|
|
data_class = "bool"
|
|
elif isinstance(node.value, (int, float)):
|
|
str_value = str(node.value)
|
|
data_class = "number"
|
|
elif node.value is None:
|
|
str_value = "null"
|
|
data_class = "null"
|
|
elif self._helper.is_sha256(node.value):
|
|
str_value = str(node.value)
|
|
data_class = "digest"
|
|
htmx_params = self._commands.open_digest(self.user_id, node.value)
|
|
elif node.hint == NODE_OBJECT:
|
|
icon = icon_class
|
|
str_value = node.value.split(".")[-1]
|
|
data_class = "object"
|
|
elif isinstance(node.value, DataFrame):
|
|
dg = DataGrid(self._session)
|
|
dg.init_from_dataframe(node.value)
|
|
str_value = dg
|
|
data_class = "dataframe"
|
|
else:
|
|
as_str = str(node.value)
|
|
if len(as_str) > MAX_TEXT_LENGTH:
|
|
str_value = as_str[:MAX_TEXT_LENGTH] + "..."
|
|
data_tooltip = as_str
|
|
else:
|
|
str_value = as_str
|
|
str_value = self.add_quotes(str_value)
|
|
data_class = "string"
|
|
|
|
if data_tooltip is not None:
|
|
cls = f"mmt-jsonviewer-{data_class} mmt-tooltip"
|
|
else:
|
|
cls = f"mmt-jsonviewer-{data_class}"
|
|
|
|
if icon is not None:
|
|
return Span(Span(icon, cls="icon-16-inline mr-1"),
|
|
Span(str_value, data_tooltip=data_tooltip, **htmx_params),
|
|
cls=cls)
|
|
|
|
return Span(str_value, cls=cls, data_tooltip=data_tooltip, **htmx_params)
|
|
|
|
def _render_dict(self, key, node: DictNode):
|
|
return Span("{",
|
|
*[
|
|
self._render_node(child_key, value)
|
|
for child_key, value in node.children.items()
|
|
],
|
|
Div("}"),
|
|
id=node.node_id)
|
|
|
|
def _render_list(self, key, node: ListNode):
|
|
def _all_the_same(_key, _node):
|
|
if len(_node.children) == 0:
|
|
return False
|
|
|
|
sample_node = _node.children[0]
|
|
sample_value = sample_node.value
|
|
|
|
if sample_value is None:
|
|
return False
|
|
|
|
type_ = type(sample_value)
|
|
if type_ in (int, float, str, bool, list, dict, ValueNode):
|
|
return False
|
|
|
|
# a specific rendering is specified
|
|
for predicate, renderer in self.hooks:
|
|
if predicate(_key, sample_node, self._helper):
|
|
return False
|
|
|
|
return all(type(item.value) == type_ for item in _node.children)
|
|
|
|
def _render_as_grid(_node):
|
|
type_ = type(_node.children[0].value)
|
|
icon = icon_class
|
|
str_value = type_.__name__.split(".")[-1]
|
|
|
|
data = [child.value.__dict__ for child in _node.children]
|
|
df = DataFrame(data)
|
|
dg = DataGrid(self._session)
|
|
dg.init_from_dataframe(df)
|
|
return Span(Span(Span(icon, cls="icon-16-inline mr-1"), Span(str_value), cls="mmt-jsonviewer-object"),
|
|
dg,
|
|
id=_node.node_id)
|
|
|
|
def _render_as_list(_node):
|
|
return Span("[",
|
|
*[
|
|
self._render_node(index, item)
|
|
for index, item in enumerate(_node.children)
|
|
],
|
|
Div("]"),
|
|
)
|
|
|
|
return _render_as_grid(node) if _all_the_same(key, node) else _render_as_list(node)
|
|
|
|
def _render_node(self, key, node):
|
|
must_expand = self._must_expand(node) # to be able to update the folding when the node is updated
|
|
return Div(
|
|
|
|
self._mk_folding(node, must_expand),
|
|
Span(f'{key} : ') if key is not None else None,
|
|
self._render_value(key, node, must_expand),
|
|
|
|
style=f"margin-left: {INDENT_SIZE}px;",
|
|
id=node.node_id if hasattr(node, "node_id") else None,
|
|
)
|
|
|
|
def __ft__(self):
|
|
return Div(
|
|
Div(self._render_node(None, self.node),
|
|
id=f"{self._id}-root",
|
|
style="margin-left: 0px;"),
|
|
cls="mmt-jsonviewer",
|
|
id=f"{self._id}",
|
|
**apply_boundaries(self._boundaries),
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
if type(other) is type(self):
|
|
return self._key is not None and self._key == other._key
|
|
else:
|
|
return False
|
|
|
|
def __hash__(self):
|
|
return hash(self._key) if self._key is not None else super().__hash__()
|
|
|
|
@staticmethod
|
|
def add_quotes(value: str):
|
|
if '"' in value and "'" in value:
|
|
# Value contains both double and single quotes, escape double quotes
|
|
return f'"{value.replace("\"", "\\\"")}"'
|
|
elif '"' in value:
|
|
# Value contains double quotes, use single quotes
|
|
return f"'{value}'"
|
|
else:
|
|
# Default case, use double quotes
|
|
return f'"{value}"'
|
|
|
|
@staticmethod
|
|
def create_component_id(session, prefix=None, suffix=None):
|
|
if suffix is None:
|
|
suffix = get_unique_id()
|
|
|
|
return f"{prefix}{suffix}"
|