Files
MyManagingTools/src/core/utils.py

530 lines
13 KiB
Python

import ast
import base64
import cProfile
import functools
import hashlib
import importlib
import inspect
import pkgutil
import re
import time
import types
import uuid
from datetime import datetime
from enum import Enum
from io import BytesIO
from urllib.parse import urlparse
import pandas as pd
from constants import SESSION_USER_ID_KEY, NOT_LOGGED, NO_SESSION
PRIMITIVES = (str, bool, type(None), int, float)
def get_stream_digest(stream):
"""
Compute a SHA256 from a stream
:param stream:
:type stream:
:return:
:rtype:
"""
sha256_hash = hashlib.sha256()
stream.seek(0)
for byte_block in iter(lambda: stream.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def has_tag(obj, tag):
"""
:param obj:
:param tag:
:return:
"""
return type(obj) is dict and tag in obj
def is_primitive(obj):
"""
:param obj:
:return:
"""
return isinstance(obj, PRIMITIVES)
def is_dictionary(obj):
"""
:param obj:
:return:
"""
return isinstance(obj, dict)
def is_list(obj):
"""
:param obj:
:return:
"""
return isinstance(obj, list)
def is_set(obj):
"""
:param obj:
:return:
"""
return isinstance(obj, set)
def is_tuple(obj):
"""
:param obj:
:return:
"""
return isinstance(obj, tuple)
def is_enum(obj):
return isinstance(obj, Enum)
def is_object(obj):
"""Returns True is obj is a reference to an object instance."""
return (isinstance(obj, object) and
not isinstance(obj, (type,
types.FunctionType,
types.BuiltinFunctionType,
types.GeneratorType)))
def get_full_qualified_name(obj):
"""
Returns the full qualified name of a class (including its module name )
:param obj:
:return:
"""
if obj.__class__ == type:
module = obj.__module__
if module is None or module == str.__class__.__module__:
return obj.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__name__
else:
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__class__.__name__
def importable_name(cls):
"""
Fully qualified name (prefixed by builtin when needed)
"""
# Use the fully-qualified name if available (Python >= 3.3)
name = getattr(cls, '__qualname__', cls.__name__)
# manage python 2
lookup = dict(__builtin__='builtins', exceptions='builtins')
module = lookup.get(cls.__module__, cls.__module__)
return f"{module}.{name}"
def get_class(qualified_class_name: str):
"""
Dynamically loads and returns a class type from its fully qualified name.
Note that the class is not instantiated.
:param qualified_class_name: Fully qualified name of the class (e.g., 'some.module.ClassName').
:return: The class object.
:raises ImportError: If the module cannot be imported.
:raises AttributeError: If the class cannot be resolved in the module.
"""
module_name, class_name = qualified_class_name.rsplit(".", 1)
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError as e:
raise ImportError(f"Could not import module '{module_name}' for '{qualified_class_name}': {e}")
if not hasattr(module, class_name):
raise AttributeError(f"Component '{class_name}' not found in '{module.__name__}'.")
return getattr(module, class_name)
def make_html_id(s: str | None) -> str | None:
"""
Creates a valid html id
:param s:
:return:
"""
if s is None:
return None
s = str(s).strip()
# Replace spaces and special characters with hyphens or remove them
s = re.sub(r'[^a-zA-Z0-9_-]', '-', s)
# Ensure the ID starts with a letter or underscore
if not re.match(r'^[a-zA-Z_]', s):
s = 'id_' + s # Add a prefix if it doesn't
# Collapse multiple consecutive hyphens into one
s = re.sub(r'-+', '-', s)
# Replace trailing hyphens with underscores
s = re.sub(r'-+$', '_', s)
return s
def snake_case_to_capitalized_words(s: str) -> str:
"""
Try to (re)create the column title from the column id
>>> assert snake_case_to_capitalized_words("column_id") == "Column Id"
>>> assert snake_case_to_capitalized_words("this_is_a_column_name") == "This Is A Column Name"
:param s:
:return:
"""
parts = s.split('_')
capitalized_parts = [part.capitalize() for part in parts]
# Join the capitalized parts with spaces
transformed_name = ' '.join(capitalized_parts)
return transformed_name
def make_safe_id(s: str | None):
if s is None:
return None
res = re.sub('-', '_', make_html_id(s)) # replace '-' by '_'
return res.lower() # no uppercase
def update_elements(elts, updates: list[dict]):
"""
walk through elements and update them if needed
:param elts:
:param updates:
:return:
"""
def _update_elt(_elt):
if hasattr(_elt, 'attrs'):
for blue_print in updates:
if "id" in _elt.attrs and _elt.attrs["id"] == blue_print["id"]:
method = blue_print["method"]
_elt.attrs[method] = blue_print["value"]
if hasattr(_elt, "children"):
for child in _elt.children:
_update_elt(child)
if elts is None:
return None
to_use = elts if isinstance(elts, (list, tuple, set)) else [elts]
for elt in to_use:
_update_elt(elt)
return elts
def get_sheets_names(file_content):
try:
excel_file = pd.ExcelFile(BytesIO(file_content))
sheet_names = excel_file.sheet_names
except Exception:
sheet_names = []
return sheet_names
def to_bool(value: str):
if isinstance(value, bool):
return value
if value is None:
return False
if not isinstance(value, str):
raise NotImplemented("Cannot convert to bool")
return value.lower() in ("yes", "true", "t", "1")
def from_bool(value: bool):
return "true" if value else "false"
def append_once(lst: list, elt):
if elt in lst:
return
lst.append(elt)
def find_classes_in_modules(modules, base_class_name):
"""
Recursively search for all classes in the given list of modules (and their submodules)
that inherit from a specified base class.
:param modules: List of top-level module names (e.g., ["core.settings_objects", "another.module"])
:param base_class_name: Name of the base class to search for (e.g., "BaseSettingObj")
"""
# List to store matching classes
derived_classes = []
def inspect_module(_module_name):
"""Recursively inspect a module and its submodules for matching classes."""
try:
# Import the module dynamically
module = importlib.import_module(_module_name)
# Iterate over all objects in the module
for name, obj in inspect.getmembers(module, inspect.isclass):
# Check if the class inherits from the specified base class
for base in obj.__bases__:
if base.__name__ == base_class_name:
derived_classes.append(f"{_module_name}.{name}")
# Recursively inspect submodules
if hasattr(module, "__path__"): # Check if the module has submodules
for submodule_info in pkgutil.iter_modules(module.__path__):
inspect_module(f"{_module_name}.{submodule_info.name}")
except Exception:
pass
# Start inspecting from the top-level modules
for module_name in modules:
inspect_module(module_name)
return derived_classes
def instantiate_class(qualified_class_name):
"""
Dynamically instantiates a class provided its full module path. The function takes
the fully-qualified class path, imports the corresponding module at runtime,
retrieves the class from the module, and instantiates it. Any exceptions during
this process are caught and logged.
:param qualified_class_name: Full dot-separated path to the class to be instantiated.
Example: 'module.submodule.ClassName'
:type qualified_class_name: str
:return: An instance of the dynamically instantiated class.
:rtype: object
:raises ValueError: If the class path fails to split correctly into module and
class parts.
:raises ModuleNotFoundError: If the specified module cannot be imported.
:raises AttributeError: If the specified class does not exist in the module.
:raises TypeError: For errors in class instantiation process.
"""
try:
# Split module and class name
module_name, class_name = qualified_class_name.rsplit(".", 1)
# Dynamically import the module
module = importlib.import_module(module_name)
# Get the class from the module
cls = getattr(module, class_name)
# Instantiate the class (pass arguments here if required)
return cls()
except Exception as e:
print(f"Failed to instantiate {qualified_class_name}: {e}")
def get_unique_id(prefix: str = None):
suffix = base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b'=').decode('ascii')
if prefix is None:
return suffix
else:
return f"{prefix}_{suffix}"
def merge_classes(*args):
all_elements = []
for element in args:
if element is None or element == '':
continue
if isinstance(element, (tuple, list, set)):
all_elements.extend(element)
elif isinstance(element, dict):
if "cls" in element:
all_elements.append(element.pop("cls"))
elif "class" in element:
all_elements.append(element.pop("class"))
elif isinstance(element, str):
all_elements.append(element)
else:
raise ValueError(f"Cannot merge {element} of type {type(element)}")
if all_elements:
# Remove duplicates while preserving order
unique_elements = list(dict.fromkeys(all_elements))
return " ".join(unique_elements)
else:
return None
def get_user_id(session: dict | None):
return str(session.get(SESSION_USER_ID_KEY, NOT_LOGGED)) if session is not None else NO_SESSION
def split_host_port(url):
"""
Split a URL into host and port components.
Args:
url (str): The full URL to split
Returns:
tuple: (host, port) where port is an integer if specified, otherwise None
"""
parsed_url = urlparse(url)
# Get netloc (host:port part)
netloc = parsed_url.netloc
# Split netloc by ':' to separate host and port
if ':' in netloc:
host, port_str = netloc.split(':', 1)
port = int(port_str)
else:
host = netloc
# Use default ports based on scheme if port is not specified
if parsed_url.scheme == 'http':
port = 80
elif parsed_url.scheme == 'https':
port = 443
else:
port = None
return host, port
def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
# get class name
class_name = None
if args:
# check the first argument to see if it's a class'
if inspect.isclass(args[0]):
class_name = args[0].__name__ # class method
elif hasattr(args[0], "__class__"):
class_name = args[0].__class__.__name__ # instance method
if class_name:
print(f"[PERF] {class_name}.{func.__name__} took {end - start:.4f} sec")
else:
print(f"[PERF] {func.__name__} took {end - start:.4f} sec")
return result
return wrapper
def profile_function(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
try:
profiler.enable()
result = func(*args, **kwargs)
finally:
profiler.disable()
# Determine class name if any
class_name = None
if args:
if inspect.isclass(args[0]):
class_name = args[0].__name__ # class method
elif hasattr(args[0], "__class__"):
class_name = args[0].__class__.__name__ # instance method
# Compose filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if class_name:
filename = f"{class_name}_{func.__name__}_{timestamp}.prof"
else:
filename = f"{func.__name__}_{timestamp}.prof"
# Dump stats to file
profiler.dump_stats(filename)
print(f"[PROFILE] Profiling data saved to {filename}")
return result
return wrapper
class UnreferencedNamesVisitor(ast.NodeVisitor):
"""
Try to find symbols that will be requested by the ast
It can be variable names, but also function names
"""
def __init__(self):
self.names = set()
def get_names(self, node):
self.visit(node)
return self.names
def visit_Name(self, node):
self.names.add(node.id)
def visit_For(self, node: ast.For):
self.visit_selected(node, ["body", "orelse"])
def visit_selected(self, node, to_visit):
"""Called if no explicit visitor function exists for a node."""
for field in to_visit:
value = getattr(node, field)
if isinstance(value, list):
for item in value:
if isinstance(item, ast.AST):
self.visit(item)
elif isinstance(value, ast.AST):
self.visit(value)
def visit_Call(self, node: ast.Call):
self.visit_selected(node, ["args", "keywords"])
def visit_keyword(self, node: ast.keyword):
"""
Keywords are parameters that are defined with a double star (**) in function / method definition
ex: def fun(positional, *args, **keywords)
:param node:
:type node:
:return:
:rtype:
"""
self.names.add(node.arg)
self.visit_selected(node, ["value"])