Added first version of DebugManager. Implemented draft of the rule engine

This commit is contained in:
2020-11-20 13:41:45 +01:00
parent cd066881b4
commit 315f8ea09b
156 changed files with 8388 additions and 2852 deletions
+187 -13
View File
@@ -1,15 +1,54 @@
import ast
import importlib
import inspect
import pkgutil
import re
from core.tokenizer import TokenKind
from cache.Cache import Cache
from core.ast_helpers import ast_to_props
from core.tokenizer import TokenKind, Tokenizer
default_debug_name = "*default*"
debug_activated = set()
COLORS = {
"black",
"red",
"green",
"yellow",
"blue",
"magenta",
"cyan",
"white",
}
CONSOLE_COLORS_MAP = {
"reset": "\u001b[0m",
"black": "\u001b[30m",
"red": "\u001b[31m",
"green": "\u001b[32m",
"yellow": "\u001b[33m",
"blue": "\u001b[34m",
"magenta": "\u001b[35m",
"cyan": "\u001b[36m",
"white": "\u001b[37m",
}
PRIMITIVES_TYPES = (str, bool, type(None), int, float, list, dict, set, bytes, tuple, type)
expressions_cache = Cache()
def my_debug(*args, check_started=None):
"""
Write one line per arg in 'debug.txt'
:param args:
:param check_started:
True : first check if start_debug() was called
<name> : first check if start_debug(name) was called
list of <names> : first check if start_debug() is called for all names
:return:
"""
if check_started and default_debug_name not in debug_activated:
return
@@ -30,14 +69,14 @@ def my_debug(*args, check_started=None):
f.write(f"{arg}\n")
def start_debug(msg=None, debug_name=default_debug_name):
def start_debug(debug_name=default_debug_name, msg=None):
debug_activated.add(debug_name)
if msg:
with open("debug.txt", "a") as f:
f.write(f"{msg}\n")
def stop_debug(msg=None, debug_name=default_debug_name):
def stop_debug(debug_name=default_debug_name, msg=None):
if msg:
with open("debug.txt", "a") as f:
f.write(f"{msg}\n")
@@ -357,6 +396,26 @@ def strip_tokens(tokens, strip_eof=False):
return tokens[start: end + 1]
def index_tokens(tokens, value):
"""
Returns the index of the token whose value equals 'value'
>>> assert index_tokens(Tokenizer("xxx=yyy"), "=") == 1
>>> assert index_tokens(Tokenizer("xxx = yyy"), "=") == 2
>>> assert index_tokens(Tokenizer("yyy"), "=") == -1
>>> assert index_tokens(Tokenizer("xxx = yyy"), " = ") == -1 # " = " is not valid token
:param tokens:
:param value:
:return:
"""
if not tokens:
return -1
for i, t in enumerate(tokens):
if t.value == value:
return i
return -1
def escape_char(text, to_escape):
res = ""
@@ -392,7 +451,7 @@ def decode_enum(enum_repr: str):
return None
def str_concept(t, drop_name=None):
def str_concept(t, drop_name=None, prefix="c:"):
"""
The key,id identifiers of a concept are stored in a tuple
we want to return the key and the id, separated by a pipe
@@ -404,25 +463,29 @@ def str_concept(t, drop_name=None):
>>> assert str_concept((None, None)) == ""
>>> assert str_concept(Concept(name="foo", id="bar")) == "c:foo|bar:"
>>> assert str_concept(Concept(name="foo", id="bar"), drop_name=True) == "c:|bar:"
>>> assert str_concept(("key", "id"), prefix='r:') == "r:key|id:"
:param t:
:param drop_name: True if we only want the id (and not the key)
:param prefix:
:return:
"""
if isinstance(t, tuple):
name, id_ = t[0], t[1]
elif prefix == "r:":
name, id_ = t.metadata.name, t.id
else:
name, id_ = t.key, t.id
if name is None and id_ is None:
return ""
result = 'c:' if (name is None or drop_name) else "c:" + name
result = prefix if (name is None or drop_name) else prefix + name
if id_:
result += "|" + id_
return result + ":"
def unstr_concept(concept_repr):
def unstr_concept(concept_repr, prefix='c:'):
"""
if concept_repr is like :c:key:id:
return the key and the id
@@ -430,6 +493,7 @@ def unstr_concept(concept_repr):
>>> assert unstr_concept("c:key|id:") == ("key", "id")
>>> assert unstr_concept("c:|id:") == ("None", "id")
>>> assert unstr_concept("c:key|:") == ("key", "None")
>>> assert unstr_concept("r:key|id:", prefix='r:') == ("key", "id")
>>> # Otherwise, return (None,None)
:param concept_repr:
@@ -437,7 +501,7 @@ def unstr_concept(concept_repr):
"""
if not (concept_repr and
isinstance(concept_repr, str) and
concept_repr.startswith("c:") and
concept_repr.startswith(prefix) and
concept_repr.endswith(":")):
return None, None
@@ -470,7 +534,7 @@ def unstr_concept(concept_repr):
return key if key != "" else None, id if id != "" else None
def encode_concept(t):
def encode_concept(t, wrapper="C"):
"""
Given a tuple of concept id, concept id
Create a valid Python identifier that can be parsed back
@@ -480,24 +544,27 @@ def encode_concept(t):
>>> assert encode_concept(("key", None)) == "__C__KEY_key__ID_00None00__C__"
:param t:
:param wrapper:
:return:
"""
key, id_ = (t[0], t[1]) if isinstance(t, tuple) else (t.key, t.id)
prefix = "__C"
sanitized_key = "".join(c if c.isalnum() else "0" for c in key) if key else "00None00"
return prefix + f"__KEY_{sanitized_key}__ID_{id_ or '00None00'}__C__"
return f"__{wrapper}__KEY_{sanitized_key}__ID_{id_ or '00None00'}__{wrapper}__"
decode_regex = re.compile(r"__KEY_(\w+)__ID_(\w+)__C__")
concept_decode_regex = re.compile(r"__KEY_(\w+)__ID_(\w+)__C__") # it is compiled only once
rule_decode_regex = re.compile(r"__KEY_(\w+)__ID_(\w+)__R__") # it is compiled only once
def decode_concept(text):
def decode_concept(text, wrapper="C"):
"""
Decode what was encoded by encode_concept_key_id
:param text:
:param wrapper:
:return:
"""
decode_regex = concept_decode_regex if wrapper == "C" else rule_decode_regex
m = decode_regex.search(text)
lookup = {"00None00": None}
if m:
@@ -539,7 +606,114 @@ def as_bag(obj):
if hasattr(obj, "as_bag"):
bag = obj.as_bag()
else:
bag = {prop: getattr(obj, prop) for prop in dir(obj) if not prop.startswith("_")}
bag = {} if type(obj) in PRIMITIVES_TYPES else {prop: getattr(obj, prop)
for prop in dir(obj) if not prop.startswith("_")}
bag["self"] = obj
return bag
def flatten_all_children(item, get_children):
"""
Return a list containing the current item and all its children, recursively
:param item:
:param get_children: lambda to get the children
:return:
"""
def inner_get_all_children(inner_item):
yield inner_item
for child in get_children(inner_item):
yield from inner_get_all_children(child)
return inner_get_all_children(item)
def evaluate_expression(expr, bag):
"""
Try to evaluate expr in context of bag
:param expr:
:param bag:
:return:
"""
if expr is None or expr.strip() == "":
return None
if expr in bag:
return bag[expr]
props_definitions = expressions_cache.get(expr)
if props_definitions is None:
_ast = ast.parse(expr, mode="eval")
props_definitions = []
ast_to_props(props_definitions, _ast.body, None)
props_definitions.reverse()
expressions_cache.put(expr, props_definitions)
return evaluate_object(bag, props_definitions)
def evaluate_object(bag, properties):
"""
Evaluate the properties of an object
Works with evaluate_expression
:param bag:
:param properties: List of ast_helpers.PropDef
:return:
"""
for prop in properties:
try:
obj = bag[prop.prop]
except KeyError:
try:
obj = bag["self"][prop.prop]
except Exception:
raise NameError(prop.prop)
if obj is None:
return None
if prop.index is not None:
obj = obj[prop.index]
bag = as_bag(obj)
return obj
def get_text_from_tokens(tokens, custom_switcher=None, tracker=None):
"""
Create the source code, from the list of token
:param tokens: list of tokens
:param custom_switcher: to override the behaviour (the return value) of some token
:param tracker: keep track of the original token value when custom switched
:return:
"""
if tokens is None:
return ""
res = ""
if not hasattr(tokens, "__iter__"):
tokens = [tokens]
switcher = {
# TokenKind.CONCEPT: lambda t: core.utils.str_concept(t.value),
}
if custom_switcher:
switcher.update(custom_switcher)
for token in tokens:
value = switcher.get(token.type, lambda t: t.str_value)(token)
res += value
if tracker is not None and token.type in custom_switcher:
tracker[value] = token
return res
def dump_ast(node):
dump = ast.dump(node)
for to_remove in [", ctx=Load()", ", kind=None", ", type_ignores=[]"]:
dump = dump.replace(to_remove, "")
return dump