Files
Sheerka-Old/src/core/utils.py
T

757 lines
19 KiB
Python

import ast
import importlib
import inspect
import os
import pkgutil
import re
from copy import deepcopy
# from pyparsing import *
from pyparsing import Literal, Word, nums, Combine, Optional, delimitedList, oneOf, alphas, Suppress
from core.global_symbols import CustomType
from core.tokenizer import TokenKind, Tokenizer, Token
COLORS = {
"black",
"red",
"green",
"yellow",
"blue",
"magenta",
"cyan",
"white",
}
CONSOLE_COLORS_MAP = {
"reset": "\u001b[0m",
"black": "\u001b[30m",
"red": "\u001b[31m",
"green": "\u001b[32m",
"yellow": "\u001b[33m",
"blue": "\u001b[34m",
"magenta": "\u001b[35m",
"cyan": "\u001b[36m",
"white": "\u001b[37m",
}
PRIMITIVES_TYPES = (str, bool, type(None), int, float, list, dict, set, bytes, tuple, type)
ESC = Literal('\x1b')
integer = Word(nums)
escapeSeq = Combine(ESC + '[' + Optional(delimitedList(integer, ';')) +
oneOf(list(alphas)))
try:
CONSOLE_ROWS, CONSOLE_COLUMNS = os.popen('stty size', 'r').read().split()
CONSOLE_ROWS, CONSOLE_COLUMNS = int(CONSOLE_ROWS), int(CONSOLE_COLUMNS)
except ValueError:
CONSOLE_ROWS, CONSOLE_COLUMNS = 50, 80
def no_color_str(text):
return Suppress(escapeSeq).transformString(str(text))
def sysarg_to_string(argv):
"""
Transform a list of strings into a single string
Add quotes if needed
:return:
"""
if argv is None or not argv:
return ""
result = ""
first = True
for s in argv:
if not first:
result += " "
result += '"' + s + '"' if " " in s else s
first = False
if result[0] in ('"', "'"):
result = result[1:-1] # strip quotes
return result
def get_all_loaded_modules(prefix):
import sys
if prefix:
return [m for m in sys.modules.keys() if m.startswith(prefix)]
else:
return sys.modules.keys()
def get_class(qname):
"""
Loads a class from its full qualified name
:param qname:
:return:
"""
parts = qname.split('.')
module = ".".join(parts[:-1])
m = __import__(module)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def get_module(qname):
"""
Loads a module from its full qualified name
:param qname:
:return:
"""
parts = qname.split('.')
m = __import__(qname)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def new_object(kls, *args, **kwargs):
"""
New instance of an object
:param kls:
:param args:
:param kwargs:
:return:
"""
obj_type = get_class(kls)
return obj_type(*args, **kwargs)
def get_full_qualified_name(obj):
"""
Returns the full qualified name of a class (including its module name )
:param obj:
:return:
"""
if obj.__class__ == type:
module = obj.__module__
if module is None or module == str.__class__.__module__:
return obj.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__name__
else:
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__class__.__name__
def get_classes(module_name):
"""
Gets all classes, for a given module_name
:param module_name: name of the module
:return:
"""
mod = get_module(module_name)
for name in dir(mod):
obj = getattr(mod, name)
if inspect.isclass(obj):
yield obj
def get_classes_from_package(package_name):
"""
Gets all classes, for a given package
:param package_name: name of the package
:return:
"""
pkg = __import__(package_name)
prefix = pkg.__name__ + "."
for importer, modname, ispkg in pkgutil.iter_modules(pkg.__path__, prefix):
for c in get_classes(modname):
yield c
def import_module_and_sub_module(module_name):
"""
Import the module, and one sub level
:param module_name:
:return:
"""
mod = get_module(module_name)
for (module_loader, name, ispkg) in pkgutil.iter_modules(mod.__path__, module_name + "."):
importlib.import_module(name)
def get_sub_classes(package_name, base_class):
def _get_class(name):
modname, _, clsname = name.rpartition('.')
mod = importlib.import_module(modname)
cls = getattr(mod, clsname)
return cls
base_class = _get_class(base_class) if isinstance(base_class, str) else base_class
all_class = set(base_class.__subclasses__()).union(
[s for c in base_class.__subclasses__() for s in get_sub_classes(package_name, c)])
# limit to the classes of the package
return [c for c in all_class if c.__module__.startswith(package_name)]
def remove_from_list(lst, to_remove_predicate):
"""
Removes elements from a list if they exist
:param lst:
:param to_remove_predicate:
:return:
"""
flagged = []
for item in lst:
if to_remove_predicate(item):
flagged.append(item)
for item in flagged:
lst.remove(item)
return lst
def remove_list_from_list(lst, to_remove):
# https://stackoverflow.com/questions/2514961/remove-all-values-within-one-list-from-another-list/30353802
# explains that list comprehension is not the best approach
for item in to_remove:
try:
lst.remove(item)
except ValueError:
pass
return lst
def make_unique(lst, get_id=None):
"""
All items in the list are now uniq and the order is kept
>>> assert make_unique(["a", "a", "b", "c", "c"]) == ["a", "b", "c"]
:param lst:
:param get_id: define your own way to recognize the items
:return:
"""
def _make_unique(seq, get_id=None):
seen = set()
if get_id is None:
for x in seq:
if x not in seen:
seen.add(x)
yield x
else:
for x in seq:
_id = get_id(x)
if _id not in seen:
seen.add(_id)
yield x
return list(_make_unique(lst, get_id))
def sheerka_product(a, b):
"""
Kind of cartesian product between lists a and b
knowing that a is also a list : a is a list of list !!!
So it's a cartesian product between a list of list and a list
"""
if a is None or len(a) == 0:
return b
if b is None or len(b) == 0:
return a
res = []
for item_b in b:
for item_a in a:
# items = item_a + [item_b]
items = item_a[:]
if hasattr(item_b, "__iter__"):
items.extend(item_b)
else:
items.append(item_b)
res.append(items)
return res
def dict_product(a, b):
"""
Cartesian product like where a and b are list of dictionaries
>>> a = [{"a": "a", "b":"b", "c":"c"}]
>>> b = [{"d":"d1"}, {"d":"d2"}]
>>>
>>> assert dict_product(a, b) == [{"a": "a", "b":"b", "c":"c", "d":"d1"}, {"a": "a", "b":"b", "c":"c", "d":"d2"}]
:param a:
:param b:
:return:
"""
if a is None or len(a) == 0:
return b
if b is None or len(b) == 0:
return a
res = []
for item_a in a:
for item_b in b:
items = item_a.copy()
items.update(item_b)
res.append(items)
return res
def get_n_clones(obj, n):
objs = [obj]
for i in range(n - 1):
objs.append(obj.clone())
return objs
def obj_product(list_of_objs, new_items, add_item):
if list_of_objs is None or len(list_of_objs) == 0:
return list_of_objs
res = []
for obj in list_of_objs:
instances = get_n_clones(obj, len(new_items))
res.extend(instances)
for instance, item in zip(instances, new_items):
add_item(instance, item)
return res
def strip_quotes(text):
if not isinstance(text, str):
return text
if text == "":
return ""
if text[0] == "'" or text[0] == '"':
return text[1:-1]
return text
def strip_tokens(tokens, strip_eof=False):
"""
Remove the starting and trailing spaces and newline
"""
if tokens is None:
return None
start = 0
length = len(tokens)
while start < length and tokens[start].type in (TokenKind.WHITESPACE, TokenKind.NEWLINE):
start += 1
if start == length:
return []
end_tokens = (TokenKind.WHITESPACE, TokenKind.NEWLINE, TokenKind.EOF) \
if strip_eof \
else (TokenKind.WHITESPACE, TokenKind.NEWLINE)
end = length - 1
while end > 0 and tokens[end].type in end_tokens:
end -= 1
return tokens[start: end + 1]
def index_tokens(tokens, value):
"""
Returns the index of the token whose value equals 'value'
>>> assert index_tokens(Tokenizer("xxx=yyy"), "=") == 1
>>> assert index_tokens(Tokenizer("xxx = yyy"), "=") == 2
>>> assert index_tokens(Tokenizer("yyy"), "=") == -1
>>> assert index_tokens(Tokenizer("xxx = yyy"), " = ") == -1 # " = " is not valid token
:param tokens:
:param value:
:return:
"""
if not tokens:
return -1
for i, t in enumerate(tokens):
if t.value == value:
return i
return -1
def escape_char(text, to_escape):
res = ""
for c in text:
res += ("\\" + c) if c in to_escape else c
return res
def decode_enum(enum_repr: str):
"""
Tries to transform ClassName.Name into an enum
:param enum_repr:
:return:
"""
if not (enum_repr and isinstance(enum_repr, str)):
return None
try:
idx = enum_repr.rindex(".")
if idx == len(enum_repr):
return None
cls_name = enum_repr[:idx]
cls = get_class(cls_name)
name = enum_repr[idx + 1:]
return cls[name]
except ValueError:
return None
except TypeError:
return None
def str_concept(t, drop_name=None, prefix="c:"):
"""
The key,id identifiers of a concept are stored in a tuple
we want to return the key and the id, separated by a pipe
None value must be replaced by an empty string
>>> assert str_concept(("key", "id")) == "c:key|id:"
>>> assert str_concept((None, "id")) == "c:|id:"
>>> assert str_concept(("key", None)) == "c:key:"
>>> assert str_concept((None, None)) == ""
>>> assert str_concept(Concept(name="foo", id="bar")) == "c:foo|bar:"
>>> assert str_concept(Concept(name="foo", id="bar"), drop_name=True) == "c:|bar:"
>>> assert str_concept(("key", "id"), prefix='r:') == "r:key|id:"
:param t:
:param drop_name: True if we only want the id (and not the key)
:param prefix:
:return:
"""
if isinstance(t, tuple):
name, id_ = t[0], t[1]
elif prefix == "r:":
name, id_ = t.metadata.name, t.id
else:
name, id_ = t.key or t.name, t.id
if name is None and id_ is None:
return ""
result = prefix if (name is None or drop_name) else prefix + name
if id_:
result += "|" + id_
return result + ":"
def unstr_concept(concept_repr, prefix='c:'):
"""
if concept_repr is like :c:key:id:
return the key and the id
>>> assert unstr_concept("c:key:") == "key"
>>> assert unstr_concept("c:key|id:") == ("key", "id")
>>> assert unstr_concept("c:|id:") == ("None", "id")
>>> assert unstr_concept("c:key|:") == ("key", "None")
>>> assert unstr_concept("r:key|id:", prefix='r:') == ("key", "id")
>>> # Otherwise, return (None,None)
:param concept_repr:
:return:
"""
if not (concept_repr and
isinstance(concept_repr, str) and
concept_repr.startswith(prefix) and
concept_repr.endswith(":")):
return None, None
i = 2
length = len(concept_repr)
key = ""
while i < length:
c = concept_repr[i]
if c in (":", "|"):
break
key += c
i += 1
else:
return None, None
if c == ":":
return key if key != "" else None, None
i += 1
id = ""
while i < length:
c = concept_repr[i]
if c == ":":
break
id += c
i += 1
else:
return None, None
return key if key != "" else None, id if id != "" else None
def encode_concept(t, wrapper="C"):
"""
Given a tuple of concept id, concept id
Create a valid Python identifier that can be parsed back
>>> assert encode_concept(("key", "id")) == "__C__KEY_key__ID_id__C__"
>>> assert encode_concept((None, "id")) == "__C__KEY_00None00__ID_id__C__"
>>> assert encode_concept(("key", None)) == "__C__KEY_key__ID_00None00__C__"
:param t:
:param wrapper:
:return:
"""
key, id_ = (t[0], t[1]) if isinstance(t, tuple) else (t.key, t.id)
sanitized_key = "".join(c if c.isalnum() else "0" for c in key) if key else "00None00"
return f"__{wrapper}__KEY_{sanitized_key}__ID_{id_ or '00None00'}__{wrapper}__"
concept_decode_regex = re.compile(r"__KEY_(\w+)__ID_(\w+)__C__") # it is compiled only once
rule_decode_regex = re.compile(r"__KEY_(\w+)__ID_(\w+)__R__") # it is compiled only once
def decode_concept(text, wrapper="C"):
"""
Decode what was encoded by encode_concept_key_id
:param text:
:param wrapper:
:return:
"""
decode_regex = concept_decode_regex if wrapper == "C" else rule_decode_regex
m = decode_regex.search(text)
lookup = {"00None00": None}
if m:
key = lookup.get(m.group(1), m.group(1))
id_ = lookup.get(m.group(2), m.group(2))
return key, id_
return None, None
def tokens_index(tokens, sub_tokens, skip=0, start_from_end=False):
"""
Index of the sub tokens in tokens
:param tokens: tokens
:param sub_tokens: sub tokens to search
:param skip: number of found to skip
:param start_from_end: start by the end
:return:
"""
expected = [token.value for token in sub_tokens if token.type != TokenKind.EOF]
indexes = range(0, len(tokens) - len(expected) + 1)
if start_from_end:
indexes = reversed(indexes)
for i in indexes:
for j in range(len(expected)):
if tokens[i + j].value != expected[j]:
break
else:
if skip == 0:
return i
else:
skip -= 1
raise ValueError(f"sub tokens '{sub_tokens}' not found")
def as_bag(obj, forced_properties=None):
"""
Get the properties of an object (static and dynamic)
:param obj:
:param forced_properties: special mode where properties are given in parameter
:return:
"""
if forced_properties:
bag = {p: getattr(obj, p) for p in forced_properties}
elif hasattr(obj, "as_bag"):
bag = obj.as_bag()
else:
bag = {} if type(obj) in PRIMITIVES_TYPES else {prop: getattr(obj, prop)
for prop in dir(obj) if not prop.startswith("_")}
bag["self"] = obj
return bag
def flatten_all_children(item, get_children):
"""
Return a list containing the current item and all its children, recursively
:param item:
:param get_children: lambda to get the children
:return:
"""
def inner_get_all_children(inner_item):
yield inner_item
for child in get_children(inner_item):
yield from inner_get_all_children(child)
return inner_get_all_children(item)
def flatten(list_of_lists):
"""
Flatten an list containing other lists
"""
return [item for sublist in list_of_lists for item in sublist]
def get_text_from_tokens(tokens, custom_switcher=None, tracker=None):
"""
Create the source code, from the list of token
:param tokens: list of tokens
:param custom_switcher: to override the behaviour (the return value) of some token
:param tracker: keep track of the original token value when custom switched
:return:
"""
if tokens is None:
return ""
res = ""
if not hasattr(tokens, "__iter__"):
tokens = [tokens]
switcher = {
# TokenKind.CONCEPT: lambda t: core.utils.str_concept(t.value),
}
if custom_switcher:
switcher.update(custom_switcher)
for token in tokens:
value = switcher.get(token.type, lambda t: t.str_value)(token)
res += value
if tracker is not None and token.type in custom_switcher:
tracker[value] = token
return res
def tokens_are_matching(tokens1, tokens2, skip_tokens=True):
def get_next(it):
try:
return next(it)
except StopIteration:
return Token(TokenKind.EOF, "", -1, -1, -1)
iter1 = iter(tokens1)
iter2 = iter(tokens2)
while True:
t1 = get_next(iter1)
t2 = get_next(iter2)
if skip_tokens:
if t1.type == TokenKind.WHITESPACE:
t1 = next(iter1)
if t2.type == TokenKind.WHITESPACE:
t2 = next(iter2)
if t1.type == TokenKind.EOF and t2.type == TokenKind.EOF:
return True
if t1.type != t2.type or t1.value != t2.value:
return False
def dump_ast(node):
dump = ast.dump(node)
for to_remove in [", ctx=Load()", ", kind=None", ", type_ignores=[]"]:
dump = dump.replace(to_remove, "")
return dump
def sheerka_deepcopy(obj):
"""
Internal implementation of deepcopy that can handle Concept circular references
:param obj:
:return:
"""
already_seen = {}
def copy_concept(c):
id_c = id(c)
if id_c in already_seen:
ref = already_seen[id_c]
if ref == '_##_REF_##_':
raise Exception("Circular Ref not managed yet!")
else:
return ref
already_seen[id_c] = '_##_REF_##_'
cls = type(c)
instance = cls()
# update the metadata
for prop_name, prop_value in vars(c.get_metadata()).items():
if prop_name != "props":
setattr(instance.get_metadata(), prop_name, prop_value)
else:
setattr(instance.get_metadata(), prop_name, sheerka_deepcopy(prop_value))
# update the values
for prop_name, prop_value in c.values().items():
setattr(instance, prop_name, prop_value)
already_seen[id_c] = instance
return instance
from core.concept import Concept
if isinstance(obj, CustomType):
return obj
elif isinstance(obj, dict):
res = {sheerka_deepcopy(k): sheerka_deepcopy(v) for k, v in obj.items()}
return res
elif isinstance(obj, list):
return [sheerka_deepcopy(item) for item in obj]
elif isinstance(obj, set):
return {sheerka_deepcopy(item) for item in obj}
elif isinstance(obj, tuple):
return tuple((sheerka_deepcopy(item) for item in obj))
elif isinstance(obj, Concept):
return copy_concept(obj)
else:
return deepcopy(obj)
def escape_str(x):
"""
Returns a string representation that look like what would produce a debugger
:param x:
:return:
"""
if isinstance(x, str):
return f"'{x}'"
return x
class NextIdManager:
"""
solely return the next integer
"""
def __init__(self):
self.id = -1
def get_next_id(self):
self.id += 1
return self.id