First Working version. I can add table
This commit is contained in:
380
src/core/utils.py
Normal file
380
src/core/utils.py
Normal file
@@ -0,0 +1,380 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import importlib
|
||||
import inspect
|
||||
import pkgutil
|
||||
import re
|
||||
import types
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from io import BytesIO
|
||||
|
||||
import pandas as pd
|
||||
|
||||
PRIMITIVES = (str, bool, type(None), int, float)
|
||||
|
||||
|
||||
def get_stream_digest(stream):
|
||||
"""
|
||||
Compute a SHA256 from a stream
|
||||
:param stream:
|
||||
:type stream:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
sha256_hash = hashlib.sha256()
|
||||
stream.seek(0)
|
||||
for byte_block in iter(lambda: stream.read(4096), b""):
|
||||
sha256_hash.update(byte_block)
|
||||
|
||||
return sha256_hash.hexdigest()
|
||||
|
||||
|
||||
def has_tag(obj, tag):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:param tag:
|
||||
:return:
|
||||
"""
|
||||
return type(obj) is dict and tag in obj
|
||||
|
||||
|
||||
def is_primitive(obj):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
return isinstance(obj, PRIMITIVES)
|
||||
|
||||
|
||||
def is_dictionary(obj):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
return isinstance(obj, dict)
|
||||
|
||||
|
||||
def is_list(obj):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
return isinstance(obj, list)
|
||||
|
||||
|
||||
def is_set(obj):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
return isinstance(obj, set)
|
||||
|
||||
|
||||
def is_tuple(obj):
|
||||
"""
|
||||
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
return isinstance(obj, tuple)
|
||||
|
||||
|
||||
def is_enum(obj):
|
||||
return isinstance(obj, Enum)
|
||||
|
||||
|
||||
def is_object(obj):
|
||||
"""Returns True is obj is a reference to an object instance."""
|
||||
|
||||
return (isinstance(obj, object) and
|
||||
not isinstance(obj, (type,
|
||||
types.FunctionType,
|
||||
types.BuiltinFunctionType,
|
||||
types.GeneratorType)))
|
||||
|
||||
|
||||
def get_full_qualified_name(obj):
|
||||
"""
|
||||
Returns the full qualified name of a class (including its module name )
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
if obj.__class__ == type:
|
||||
module = obj.__module__
|
||||
if module is None or module == str.__class__.__module__:
|
||||
return obj.__name__ # Avoid reporting __builtin__
|
||||
else:
|
||||
return module + '.' + obj.__name__
|
||||
else:
|
||||
module = obj.__class__.__module__
|
||||
if module is None or module == str.__class__.__module__:
|
||||
return obj.__class__.__name__ # Avoid reporting __builtin__
|
||||
else:
|
||||
return module + '.' + obj.__class__.__name__
|
||||
|
||||
|
||||
def importable_name(cls):
|
||||
"""
|
||||
Fully qualified name (prefixed by builtin when needed)
|
||||
"""
|
||||
# Use the fully-qualified name if available (Python >= 3.3)
|
||||
name = getattr(cls, '__qualname__', cls.__name__)
|
||||
|
||||
# manage python 2
|
||||
lookup = dict(__builtin__='builtins', exceptions='builtins')
|
||||
module = lookup.get(cls.__module__, cls.__module__)
|
||||
|
||||
return f"{module}.{name}"
|
||||
|
||||
|
||||
def get_class(qualified_class_name: str):
|
||||
"""
|
||||
Dynamically loads and returns a class type from its fully qualified name.
|
||||
Note that the class is not instantiated.
|
||||
|
||||
:param qualified_class_name: Fully qualified name of the class (e.g., 'some.module.ClassName').
|
||||
:return: The class object.
|
||||
:raises ImportError: If the module cannot be imported.
|
||||
:raises AttributeError: If the class cannot be resolved in the module.
|
||||
"""
|
||||
module_name, class_name = qualified_class_name.rsplit(".", 1)
|
||||
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
except ModuleNotFoundError as e:
|
||||
raise ImportError(f"Could not import module '{module_name}' for '{qualified_class_name}': {e}")
|
||||
|
||||
if not hasattr(module, class_name):
|
||||
raise AttributeError(f"Component '{class_name}' not found in '{module.__name__}'.")
|
||||
|
||||
return getattr(module, class_name)
|
||||
|
||||
|
||||
def make_html_id(s: str | None) -> str | None:
|
||||
"""
|
||||
Creates a valid html id
|
||||
:param s:
|
||||
:return:
|
||||
"""
|
||||
if s is None:
|
||||
return None
|
||||
|
||||
s = str(s).strip()
|
||||
# Replace spaces and special characters with hyphens or remove them
|
||||
s = re.sub(r'[^a-zA-Z0-9_-]', '-', s)
|
||||
|
||||
# Ensure the ID starts with a letter or underscore
|
||||
if not re.match(r'^[a-zA-Z_]', s):
|
||||
s = 'id_' + s # Add a prefix if it doesn't
|
||||
|
||||
# Collapse multiple consecutive hyphens into one
|
||||
s = re.sub(r'-+', '-', s)
|
||||
|
||||
# Replace trailing hyphens with underscores
|
||||
s = re.sub(r'-+$', '_', s)
|
||||
|
||||
return s
|
||||
|
||||
|
||||
def snake_case_to_capitalized_words(s: str) -> str:
|
||||
"""
|
||||
Try to (re)create the column title from the column id
|
||||
>>> assert snake_case_to_capitalized_words("column_id") == "Column Id"
|
||||
>>> assert snake_case_to_capitalized_words("this_is_a_column_name") == "This Is A Column Name"
|
||||
:param s:
|
||||
:return:
|
||||
"""
|
||||
parts = s.split('_')
|
||||
capitalized_parts = [part.capitalize() for part in parts]
|
||||
|
||||
# Join the capitalized parts with spaces
|
||||
transformed_name = ' '.join(capitalized_parts)
|
||||
|
||||
return transformed_name
|
||||
|
||||
|
||||
def make_column_id(s: str | None):
|
||||
if s is None:
|
||||
return None
|
||||
|
||||
res = re.sub('-', '_', make_html_id(s)) # replace '-' by '_'
|
||||
return res.lower() # no uppercase
|
||||
|
||||
|
||||
def update_elements(elts, updates: list[dict]):
|
||||
"""
|
||||
walk through elements and update them if needed
|
||||
:param elts:
|
||||
:param updates:
|
||||
:return:
|
||||
"""
|
||||
|
||||
def _update_elt(_elt):
|
||||
if hasattr(_elt, 'attrs'):
|
||||
for blue_print in updates:
|
||||
if "id" in _elt.attrs and _elt.attrs["id"] == blue_print["id"]:
|
||||
method = blue_print["method"]
|
||||
_elt.attrs[method] = blue_print["value"]
|
||||
|
||||
if hasattr(_elt, "children"):
|
||||
for child in _elt.children:
|
||||
_update_elt(child)
|
||||
|
||||
if elts is None:
|
||||
return None
|
||||
|
||||
to_use = elts if isinstance(elts, (list, tuple, set)) else [elts]
|
||||
for elt in to_use:
|
||||
_update_elt(elt)
|
||||
|
||||
return elts
|
||||
|
||||
|
||||
def get_sheets_names(file_content):
|
||||
try:
|
||||
excel_file = pd.ExcelFile(BytesIO(file_content))
|
||||
sheet_names = excel_file.sheet_names
|
||||
except Exception:
|
||||
sheet_names = []
|
||||
|
||||
return sheet_names
|
||||
|
||||
|
||||
def to_bool(value: str):
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
|
||||
if value is None:
|
||||
return False
|
||||
|
||||
if not isinstance(value, str):
|
||||
raise NotImplemented("Cannot convert to bool")
|
||||
|
||||
return value.lower() in ("yes", "true", "t", "1")
|
||||
|
||||
|
||||
def from_bool(value: bool):
|
||||
return "true" if value else "false"
|
||||
|
||||
|
||||
def append_once(lst: list, elt):
|
||||
if elt in lst:
|
||||
return
|
||||
|
||||
lst.append(elt)
|
||||
|
||||
|
||||
def find_classes_in_modules(modules, base_class_name):
|
||||
"""
|
||||
Recursively search for all classes in the given list of modules (and their submodules)
|
||||
that inherit from a specified base class.
|
||||
|
||||
:param modules: List of top-level module names (e.g., ["core.settings_objects", "another.module"])
|
||||
:param base_class_name: Name of the base class to search for (e.g., "BaseSettingObj")
|
||||
"""
|
||||
# List to store matching classes
|
||||
derived_classes = []
|
||||
|
||||
def inspect_module(_module_name):
|
||||
"""Recursively inspect a module and its submodules for matching classes."""
|
||||
try:
|
||||
# Import the module dynamically
|
||||
module = importlib.import_module(_module_name)
|
||||
|
||||
# Iterate over all objects in the module
|
||||
for name, obj in inspect.getmembers(module, inspect.isclass):
|
||||
# Check if the class inherits from the specified base class
|
||||
for base in obj.__bases__:
|
||||
if base.__name__ == base_class_name:
|
||||
derived_classes.append(f"{_module_name}.{name}")
|
||||
|
||||
# Recursively inspect submodules
|
||||
if hasattr(module, "__path__"): # Check if the module has submodules
|
||||
for submodule_info in pkgutil.iter_modules(module.__path__):
|
||||
inspect_module(f"{_module_name}.{submodule_info.name}")
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Start inspecting from the top-level modules
|
||||
for module_name in modules:
|
||||
inspect_module(module_name)
|
||||
|
||||
return derived_classes
|
||||
|
||||
|
||||
def instantiate_class(qualified_class_name):
|
||||
"""
|
||||
Dynamically instantiates a class provided its full module path. The function takes
|
||||
the fully-qualified class path, imports the corresponding module at runtime,
|
||||
retrieves the class from the module, and instantiates it. Any exceptions during
|
||||
this process are caught and logged.
|
||||
|
||||
:param qualified_class_name: Full dot-separated path to the class to be instantiated.
|
||||
Example: 'module.submodule.ClassName'
|
||||
:type qualified_class_name: str
|
||||
:return: An instance of the dynamically instantiated class.
|
||||
:rtype: object
|
||||
:raises ValueError: If the class path fails to split correctly into module and
|
||||
class parts.
|
||||
:raises ModuleNotFoundError: If the specified module cannot be imported.
|
||||
:raises AttributeError: If the specified class does not exist in the module.
|
||||
:raises TypeError: For errors in class instantiation process.
|
||||
"""
|
||||
try:
|
||||
# Split module and class name
|
||||
module_name, class_name = qualified_class_name.rsplit(".", 1)
|
||||
|
||||
# Dynamically import the module
|
||||
module = importlib.import_module(module_name)
|
||||
|
||||
# Get the class from the module
|
||||
cls = getattr(module, class_name)
|
||||
|
||||
# Instantiate the class (pass arguments here if required)
|
||||
return cls()
|
||||
except Exception as e:
|
||||
print(f"Failed to instantiate {qualified_class_name}: {e}")
|
||||
|
||||
|
||||
def get_unique_id(prefix: str = None):
|
||||
suffix = base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b'=').decode('ascii')
|
||||
if prefix is None:
|
||||
return suffix
|
||||
else:
|
||||
return f"{prefix}_{suffix}"
|
||||
|
||||
|
||||
def merge_classes(*args):
|
||||
all_elements = []
|
||||
for element in args:
|
||||
if element is None or element == '':
|
||||
continue
|
||||
|
||||
if isinstance(element, (tuple, list, set)):
|
||||
all_elements.extend(element)
|
||||
|
||||
elif isinstance(element, dict):
|
||||
if "cls" in element:
|
||||
all_elements.append(element.pop("cls"))
|
||||
elif "class" in element:
|
||||
all_elements.append(element.pop("class"))
|
||||
|
||||
elif isinstance(element, str):
|
||||
all_elements.append(element)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Cannot merge {element} of type {type(element)}")
|
||||
|
||||
if all_elements:
|
||||
# Remove duplicates while preserving order
|
||||
unique_elements = list(dict.fromkeys(all_elements))
|
||||
return " ".join(unique_elements)
|
||||
else:
|
||||
return None
|
||||
Reference in New Issue
Block a user