Reimplemented explain feature

This commit is contained in:
2020-06-04 18:43:15 +02:00
parent c498b394e3
commit d7573f095f
27 changed files with 1673 additions and 1161 deletions
+19 -1
View File
@@ -62,6 +62,8 @@ class BuiltinConcepts(Enum):
PRECEDENCE = "precedence" # use to set priority among concepts when parsing
ASSOCIATIVITY = "associativity" # use to set priority among concepts when parsing
NOT_INITIALIZED = "not initialized"
NOT_FOUND = "not found" # when the wanted resource is not found
FORMAT_INSTRUCTIONS = "format instructions" # to express how to print the concept
NODE = "node"
GENERIC_NODE = "generic node"
@@ -73,6 +75,21 @@ class BuiltinConcepts(Enum):
def __str__(self):
return "__" + self.name
def __eq__(self, other):
if id(self) == id(other):
return True
if isinstance(other, str):
return str(self) == other
if not isinstance(other, BuiltinConcepts):
return False
return self.value == other.value
def __hash__(self):
return hash(self.value)
BuiltinUnique = [
BuiltinConcepts.BEFORE_PARSING,
@@ -106,7 +123,8 @@ BuiltinErrors = [str(e) for e in {
BuiltinConcepts.NOT_A_SET,
BuiltinConcepts.WHERE_CLAUSE_FAILED,
BuiltinConcepts.CHICKEN_AND_EGG,
BuiltinConcepts.NOT_INITIALIZED
BuiltinConcepts.NOT_INITIALIZED,
BuiltinConcepts.NOT_FOUND
}]
"""
+13 -5
View File
@@ -278,10 +278,10 @@ class Concept:
def to_dict(self, props_to_use=None):
"""
Returns a dict representing 'self'
to_dict() is used for serializing the definition of the concept
You will not that it does not dump the actual values of the properties, nor the body
to_dict() is used for serializing the **definition** of the concept
If you need a dictionary version of the Concept, use to_bag()
It does not dump the actual values of the properties, nor the body
If you need a dictionary version of the Concept, use as_bag()
:return:
"""
@@ -369,7 +369,7 @@ class Concept:
:param concept_key: name of the behaviour
:return:
"""
return self.metadata.props[concept_key] if concept_key in self.metadata.props else None
return self.metadata.props.get(concept_key, None)
def set_value(self, name, value):
"""
@@ -426,7 +426,7 @@ class Concept:
def get_original_definition_hash(self):
return self.original_definition_hash
def to_bag(self):
def as_bag(self):
"""
Creates a dictionary with the useful properties of the concept
It quicker to implement than creating the actual property mechanism with @property
@@ -441,6 +441,14 @@ class Concept:
bag[prop] = getattr(self, prop)
return bag
def get_format_instructions(self):
from core.builtin_concepts import BuiltinConcepts
return self.get_prop(BuiltinConcepts.FORMAT_INSTRUCTIONS)
def set_format_instructions(self, instructions):
from core.builtin_concepts import BuiltinConcepts
self.set_prop(BuiltinConcepts.FORMAT_INSTRUCTIONS, instructions)
class Property:
"""
+15 -6
View File
@@ -50,9 +50,10 @@ class ExecutionContext:
self._parent = None
self._id = ExecutionContext.get_id(event.get_digest()) if event else None
self._tab = ""
self._bag = {} # other variables
self._start = 0
self._stop = 0
self._bag = {} # context variables
self._start = 0 # when the execution starts (to measure elapsed time)
self._stop = 0 # when the execution stops (to measure elapses time)
self._format_instructions = None # how to print the execution context
self.who = who # who is asking
self.event = event # what was the (original) trigger
@@ -65,11 +66,13 @@ class ExecutionContext:
self.global_hints = set() if global_hints is None else global_hints
self.global_errors = [] if global_errors is None else global_errors
self.inputs = {} # what was the parameters of the execution context
self.values = {} # what was produced by the execution context
self.obj = kwargs.pop("obj", None)
self.concepts = kwargs.pop("concepts", {})
self.obj = kwargs.pop("obj", None) # current obj we are working on
self.concepts = kwargs.pop("concepts", {}) # known concepts specific to this context
# update the other elements
for k, v in kwargs.items():
self._bag[k] = v
@@ -313,7 +316,7 @@ class ExecutionContext:
return None
return ret_val.status
def to_bag(self):
def as_bag(self):
"""
Creates a dictionary with the useful properties of the concept
It quicker to implement than creating the actual property mechanism with @property
@@ -338,3 +341,9 @@ class ExecutionContext:
value = value[:47] + "..."
to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})"
return to_str
def get_format_instructions(self):
return self._format_instructions
def set_format_instructions(self, instructions):
self._format_instructions = instructions
+43 -2
View File
@@ -83,6 +83,7 @@ class Sheerka(Concept):
"test": self.test,
"test_using_context": self.test_using_context
}
self.sheerka_pipeables = {}
@property
def resolved_concepts_by_first_keyword(self):
@@ -121,6 +122,15 @@ class Sheerka(Concept):
setattr(self, as_name, bound_method)
def add_pipeable(self, func_name, function):
"""
Adds a function that can bu used with pipe '|'
:param func_name:
:param function:
:return:
"""
self.sheerka_pipeables[func_name] = function
def initialize(self, root_folder: str = None, save_execution_context=True):
"""
Starting Sheerka
@@ -360,8 +370,11 @@ class Sheerka(Concept):
if self.cache_manager.is_dirty:
self.cache_manager.commit(execution_context)
if self.save_execution_context and self.load(self.name, "save_execution_context"):
self.sdp.save_result(execution_context)
try:
if self.save_execution_context and self.load(self.name, "save_execution_context"):
self.sdp.save_result(execution_context)
except Exception as ex:
self.log.error(f"Failed to save execution context. Reason: {ex}")
# # hack to save valid concept definition
# if not self.during_restore:
@@ -760,6 +773,9 @@ class Sheerka(Concept):
except IOError:
pass
def get_last_execution(self):
return self._last_execution
def test(self):
return f"I have access to Sheerka !"
@@ -841,6 +857,28 @@ class Sheerka(Concept):
@staticmethod
def init_logging(debug, loggers):
def add_coloring_to_emit_ansi(fn):
# add methods we need to the class
def new(*args):
levelno = args[1].levelno
if levelno >= 50:
color = '\x1b[31m' # red
elif levelno >= 40:
color = '\x1b[31m' # red
elif levelno >= 30:
color = '\x1b[33m' # yellow
elif levelno >= 20:
color = '\x1b[32m' # green
elif levelno >= 10:
color = '\x1b[35m' # pink
else:
color = '\x1b[0m' # normal
args[1].msg = color + str(args[1].msg) + '\x1b[0m' # normal
# print "after"
return fn(*args)
return new
core.sheerka_logger.init_config(loggers)
if debug:
log_format = "%(asctime)s"
@@ -853,3 +891,6 @@ class Sheerka(Concept):
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
# uncomment the following line to enable colors
#logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit)
@@ -24,8 +24,8 @@ class SheerkaComparisonManager(BaseService):
Manage partitioning of concepts
"""
NAME = "ComparisonManager"
COMPARISON_ENTRY = "Comparison"
RESOLVED_COMPARISON_ENTRY = "Resolved_Comparison"
COMPARISON_ENTRY = "ComparisonManager:Comparison"
RESOLVED_COMPARISON_ENTRY = "ComparisonManager:Resolved_Comparison"
def __init__(self, sheerka):
super().__init__(sheerka)
+2 -3
View File
@@ -137,16 +137,15 @@ class SheerkaExecute(BaseService):
"""
NAME = "Execute"
PARSERS_INPUTS_ENTRY = "ParserInput" # entry for admin or internal variables
PARSERS_INPUTS_ENTRY = "Execute:ParserInput" # entry for admin or internal variables
def __init__(self, sheerka):
super().__init__(sheerka)
self.pi_cache = None
self.pi_cache = Cache(default=lambda key: ParserInput(key), max_size=20)
def initialize(self):
self.sheerka.bind_service_method(self.execute)
self.pi_cache = Cache(default=lambda key: ParserInput(key), max_size=20)
self.sheerka.cache_manager.register_cache(self.PARSERS_INPUTS_ENTRY, self.pi_cache, False)
def get_parser_input(self, text, tokens=None):
+434
View File
@@ -0,0 +1,434 @@
# the principle and the Pipe class are taken from
# https://github.com/JulienPalard/Pipe
#
import builtins
import functools
import inspect
import itertools
import sys
from collections import deque
from cache.Cache import Cache
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept, ConceptParts
from core.sheerka.services.sheerka_service import BaseService
from core.utils import as_bag
from printer.FormatInstructions import FormatInstructions
from sheerkapickle.utils import is_primitive
class PropDesc:
def __init__(self, class_name, props):
self.class_name = class_name
self.props = props
def __repr__(self):
return f"({self.class_name}{self.props})"
def __eq__(self, other):
if id(other) == id(self):
return True
if not isinstance(other, PropDesc):
return False
return self.class_name == other.class_name and sorted(self.props) == sorted(other.props)
class Pipe:
"""
Represent a Pipeable Element :
Described as :
first = Pipe(lambda iterable: next(iter(iterable)))
and used as :
print [1, 2, 3] | first
printing 1
Or represent a Pipeable Function :
It's a function returning a Pipe
Described as :
select = Pipe(lambda iterable, pred: (pred(x) for x in iterable))
and used as :
print [1, 2, 3] | select(lambda x: x * 2)
# 2, 4, 6
"""
def __init__(self, function, context=None):
self.context = context
if isinstance(function, Pipe):
self.function = function.function
self.need_context = function.need_context
else:
signature = inspect.signature(function)
if len(signature.parameters) > 0 and list(signature.parameters.keys())[0] == "context":
self.need_context = True
self.function = (lambda x: function(context, x)) if len(signature.parameters) == 2 else function
else:
self.need_context = False
self.function = function
functools.update_wrapper(self, function)
def __ror__(self, other):
if isinstance(other, Concept) and other.key == str(BuiltinConcepts.EXPLANATION):
other.set_value(ConceptParts.BODY, self.function(other.body))
return other
return self.function(other)
def __call__(self, *args, **kwargs):
if self.need_context:
return Pipe(lambda x: self.function(self.context, x, *args, **kwargs), self.context)
else:
return Pipe(lambda x: self.function(x, *args, **kwargs), self.context)
class SheerkaFilter(BaseService):
NAME = "Filter"
PREDICATES_ENTRY = "Filter:Predicates"
def __init__(self, sheerka):
super().__init__(sheerka)
self.cache = Cache(max_size=30)
def initialize(self):
# For a weird reason, when the attribute @Pipe is directly added to the function
# all following instances have the context property null
for k, v in SheerkaFilter.__dict__.items():
if k.startswith("pipe_"):
if isinstance(v, staticmethod):
self.sheerka.add_pipeable(k[5:], v.__func__)
else:
self.sheerka.add_pipeable(k[5:], v.__get__(self, self.__class__))
self.sheerka.cache_manager.register_cache(self.PREDICATES_ENTRY, self.cache, False, False)
def get_compiled(self, file_name, predicate):
"""
Returns the compiled version of the predicate
:param file_name:
:param predicate:
:return:
"""
compiled = self.cache.get(predicate)
if compiled is not None:
return compiled
compiled = compile(predicate, f"<{file_name}>", "eval")
self.cache.put(predicate, compiled)
return compiled
@staticmethod
def pipe_first(iterable):
"""
Return the first element of the list
:param iterable:
:return:
"""
return next(iter(iterable))
@staticmethod
def pipe_take(iterable, n):
"""
Take the n first element of a list
:param iterable:
:param n:
:return:
"""
for item in iterable:
if n > 0:
n -= 1
yield item
else:
return
@staticmethod
def pipe_props(iterable):
"""
Return the list of available properties of the iterable
:return:
"""
for item in iterable:
yield PropDesc(type(item).__name__, list(as_bag(item).keys()))
@staticmethod
def pipe_tail(iterable, qte):
"Yield qte of elements in the given iterable."
return deque(iterable, maxlen=qte)
@staticmethod
def pipe_skip(iterable, qte):
"Skip qte elements in the given iterable, then yield others."
for item in iterable:
if qte == 0:
yield item
else:
qte -= 1
@staticmethod
def pipe_dedup(iterable, key=lambda x: x):
"""Only yield unique items. Use a set to keep track of duplicate data."""
seen = set()
for item in iterable:
dupkey = key(item)
if dupkey not in seen:
seen.add(dupkey)
yield item
@staticmethod
def pipe_uniq(iterable, key=lambda x: x):
"""Deduplicate consecutive duplicate values."""
iterator = iter(iterable)
try:
prev = next(iterator)
except StopIteration:
return
yield prev
prevkey = key(prev)
for item in iterator:
itemkey = key(item)
if itemkey != prevkey:
yield item
prevkey = itemkey
@staticmethod
def pipe_all(iterable, pred):
"""Returns True if ALL elements in the given iterable are true for the
given pred function"""
return builtins.all(pred(x) for x in iterable)
@staticmethod
def pipe_any(iterable, pred):
"""Returns True if ANY element in the given iterable is True for the
given pred function"""
return builtins.any(pred(x) for x in iterable)
@staticmethod
def pipe_average(iterable):
"""Build the average for the given iterable, starting with 0.0 as seed
Will try a division by 0 if the iterable is empty...
"""
# warnings.warn(
# "pipe.average is deprecated, use statistics.mean instead.",
# DeprecationWarning,
# stacklevel=4,
# )
total = 0.0
qte = 0
for element in iterable:
total += element
qte += 1
return total / qte
@staticmethod
def pipe_count(iterable):
"Count the size of the given iterable, walking thrue it."
# warnings.warn(
# "pipe.count is deprecated, use the builtin len() instead.",
# DeprecationWarning,
# stacklevel=4,
# )
count = 0
for element in iterable:
count += 1
return count
@staticmethod
def pipe_max(iterable, **kwargs):
# warnings.warn(
# "pipe.max is deprecated, use the builtin max() instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return builtins.max(iterable, **kwargs)
@staticmethod
def pipe_min(iterable, **kwargs):
# warnings.warn(
# "pipe.min is deprecated, use the builtin min() instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return builtins.min(iterable, **kwargs)
@staticmethod
def pipe_as_dict(iterable):
# warnings.warn(
# "pipe.as_dict is deprecated, use dict(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return dict(iterable)
@staticmethod
def pipe_as_set(iterable):
# warnings.warn(
# "pipe.as_set is deprecated, use set(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return set(iterable)
@staticmethod
def pipe_permutations(iterable, r=None):
# permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC
# permutations(range(3)) --> 012 021 102 120 201 210
for x in itertools.permutations(iterable, r):
yield x
# @staticmethod
# def pipe_netcat(to_send, host, port):
# with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
# s.connect((host, port))
# for data in to_send | traverse:
# s.send(data)
# while 1:
# data = s.recv(4096)
# if not data:
# break
# yield data
#
# @staticmethod
# def pipe_netwrite(to_send, host, port):
# warnings.warn("pipe.netwite is deprecated.", DeprecationWarning, stacklevel=4)
# with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
# s.connect((host, port))
# for data in to_send | SheerkaFilter.pipe_traverse:
# s.send(data)
@staticmethod
def pipe_traverse(args):
for arg in args:
try:
if isinstance(arg, str):
yield arg
else:
for i in arg | SheerkaFilter.pipe_traverse:
yield i
except TypeError:
# not iterable --- output leaf
yield arg
@staticmethod
def pipe_concat(iterable, separator=", "):
# warnings.warn(
# "pipe.concat is deprecated, use ', '.join(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return separator.join(builtins.map(str, iterable))
@staticmethod
def pipe_as_list(iterable):
# warnings.warn(
# "pipe.as_list is deprecated, use list(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return list(iterable)
@staticmethod
def pipe_as_tuple(iterable):
# warnings.warn(
# "pipe.as_tuple is deprecated, use tuple(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return tuple(iterable)
@staticmethod
def pipe_tee(iterable):
for item in iterable:
sys.stdout.write(str(item) + "\n")
yield item
@staticmethod
def pipe_write(iterable, fname, glue="\n"):
with open(fname, "w") as f:
for item in iterable:
f.write(str(item) + glue)
@staticmethod
def pipe_add(x):
# warnings.warn(
# "pipe.add is deprecated, use sum(your | pipe) instead.",
# DeprecationWarning,
# stacklevel=4,
# )
return sum(x)
@staticmethod
def pipe_select(iterable, selector):
return builtins.map(selector, iterable)
@staticmethod
def pipe_format_l(iterable, template, when=None):
"""
Define a formatting when printing a list of items
:param iterable:
:param template:
:param when: format_l is set when the condition is verified
:return:
"""
for item in iterable:
if hasattr(item, "get_format_instructions"):
instructions = item.get_format_instructions() or FormatInstructions()
instructions.set_format_l(item, template)
item.set_format_instructions(instructions)
yield item
@staticmethod
def pipe_format_d(iterable, *props, when=None, **format_l):
"""
Define a formatting when printing the detail of an item
:param iterable:
:param props: list of properties to display
:param when: format_d is set when the condition is verified
:param format_l: custom formatting when printing the value of a property
:return:
"""
template = dict((p, "{" + p + "}") for p in props)
for k, v in format_l.items():
template[k] = v
for item in iterable:
if hasattr(item, "get_format_instructions"):
if len(template) == 0:
bag = as_bag(item)
template = dict((p, "{" + p + "}") for p in bag)
instructions = item.get_format_instructions() or FormatInstructions()
instructions.set_format_d(item, template)
item.set_format_instructions(instructions)
yield item
@staticmethod
def pipe_recurse(iterable, depth, prop_name="children", when=None):
"""
When printing an object that has sub properties,
indicate the depth of recursion to apply to a specific properties
Quick and dirty version because the prop name is not taken from the item (but set to 'children' by default)
:param iterable:
:param depth:
:param prop_name:
:param when: recurse is set when the condition is verified
:return:
"""
for item in iterable:
if hasattr(item, "get_format_instructions"):
instructions = item.get_format_instructions() or FormatInstructions()
instructions.set_recurse(prop_name, depth)
item.set_format_instructions(instructions)
yield item
def pipe_filter(self, iterable, predicate):
compiled = self.get_compiled("filter", predicate)
for item in iterable:
try:
context = {} if is_primitive(item) else as_bag(item)
context["self"] = item
if eval(compiled, context):
yield item
except NameError:
pass
@@ -11,6 +11,7 @@ class History:
self.event = event
self.result = result
self._status = None
self._format_instructions = None
def __str__(self):
msg = f"{self.event.get_digest()} {self.event.date.strftime('%d/%m/%Y %H:%M:%S')} : {self.event.message}"
@@ -42,6 +43,12 @@ class History:
self._status = self.result.get_status() if self.result else None
return self._status
def get_format_instructions(self):
return self._format_instructions
def set_format_instructions(self, instructions):
self._format_instructions = instructions
class SheerkaHistoryManager(BaseService):
NAME = "History"
@@ -0,0 +1,125 @@
from core.builtin_concepts import BuiltinConcepts
from core.sheerka.services.sheerka_service import BaseService
class SheerkaResultConcept(BaseService):
NAME = "Result"
def __init__(self, sheerka, page_size=30):
super().__init__(sheerka)
self.page_size = page_size
def initialize(self):
self.sheerka.bind_service_method(self.get_results_by_digest)
self.sheerka.bind_service_method(self.get_results_by_command)
self.sheerka.bind_service_method(self.get_last_results)
self.sheerka.bind_service_method(self.get_results)
def get_results_by_digest(self, context, digest, record_digest=True):
"""
Gets the entire execution tree for the given event digest
:param context:
:param digest:
:param record_digest:
:return:
"""
if digest is None:
return None
try:
result = self.sheerka.sdp.load_result(digest)
event = self.sheerka.sdp.load_event(digest)
if record_digest:
context.log(f"Recording digest '{digest}'")
self.sheerka.record(context, self.NAME, "digest", digest)
return self.sheerka.new(BuiltinConcepts.EXPLANATION,
digest=event.get_digest(),
command=event.message,
body=self.as_list(result))
except FileNotFoundError as ex:
context.log_error(f"Digest {digest} is not found.", self.NAME, ex)
return self.sheerka.new(BuiltinConcepts.NOT_FOUND, body={"digest": digest})
def get_results_by_command(self, context, command, record_digest=True):
"""
Get the result of the command that starts with command
:param context:
:param command:
:param record_digest:
:return:
"""
if command is None:
return None
start = 0
consumed = 0
while True:
for event in self.sheerka.sdp.load_events(self.page_size, start):
consumed += 1
if event.message.startswith(command):
return self.get_results_by_digest(context, event.get_digest(), record_digest)
if consumed < self.page_size:
break
start += self.page_size
consumed = 0
return self.sheerka.new(BuiltinConcepts.NOT_FOUND, body={"command": command})
def get_last_results(self, context, record_digest=True):
"""
Gets the results of the last command
:param context:
:param record_digest:
:return:
"""
start = 0
page_size = 2
consumed = 0
while True:
for event in self.sheerka.sdp.load_events(page_size, start):
consumed += 1
if self.sheerka.sdp.has_result(event.get_digest()):
return self.get_results_by_digest(context, event.get_digest(), record_digest)
if consumed < page_size:
break
if page_size < 100:
page_size *= 2
start += page_size
consumed = 0
return self.sheerka.new(BuiltinConcepts.NOT_FOUND, body={"query": "last"})
def get_results(self, context):
"""
Use the last digest saved to get the execution results
:param context:
:return:
"""
digest = self.sheerka.load(self.NAME, "digest")
if digest is None:
context.log("No recorded digest found.")
return None
return self.get_results_by_digest(context, digest, False)
@staticmethod
def as_list(execution_context):
def _yield_result(lst):
for e in lst:
yield e
if e.children:
yield from _yield_result(e.children)
return _yield_result([execution_context])
@@ -10,7 +10,7 @@ GROUP_PREFIX = 'All_'
class SheerkaSetsManager(BaseService):
NAME = "SetsManager"
CONCEPTS_GROUPS_ENTRY = "Concepts_Groups"
CONCEPTS_GROUPS_ENTRY = "SetsManager:Concepts_Groups"
def __init__(self, sheerka):
super().__init__(sheerka)
@@ -21,7 +21,7 @@ class Variable(ServiceObj):
class SheerkaVariableManager(BaseService):
NAME = "VariableManager"
VARIABLES_ENTRY = "Variables" # entry for admin or internal variables
VARIABLES_ENTRY = "VariableManager:Variables" # entry for admin or internal variables
def __init__(self, sheerka):
super().__init__(sheerka)
+15
View File
@@ -424,3 +424,18 @@ def tokens_index(tokens, sub_tokens, skip=0):
skip -= 1
raise ValueError(f"sub tokens '{sub_tokens}' not found")
def as_bag(obj):
"""
Get the properties of an object (static and dynamic)
:param obj:
:return:
"""
if hasattr(obj, "as_bag"):
bag = obj.as_bag()
else:
bag = {prop: getattr(obj, prop) for prop in dir(obj) if not prop.startswith("_")}
bag["self"] = obj
return bag
-150
View File
@@ -1,150 +0,0 @@
from typing import List
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
from core.sheerka.ExecutionContext import ExecutionContext
from evaluators.BaseEvaluator import OneReturnValueEvaluator
from parsers.ExplainParser import ExplanationNode, FilterNode, RecurseDefNode, FormatLNode, FormatDNode
from parsers.ExpressionParser import ExpressionVisitor, IsaNode
from printer.SheerkaPrinter import FormatInstructions
class ExplainExpressionVisitor(ExpressionVisitor):
def __init__(self):
self.instructions = FormatInstructions()
def visit_RecurseDefNode(self, expr_node):
self.instructions.set_recurse("children", expr_node.depth)
def visit_FormatLNode(self, expr_node):
self.instructions.set_format_l(ExecutionContext, expr_node.template)
class ExplainEvaluator(OneReturnValueEvaluator):
NAME = "Explain"
def __init__(self):
super().__init__(self.NAME, [BuiltinConcepts.EVALUATION], 60)
def get_event_digest(self, sheerka, explanation_node):
if explanation_node.digest and sheerka.sdp.has_result(explanation_node.digest):
return explanation_node.digest
if not explanation_node.digest and not explanation_node.record_digest:
# use a previous digest if found
digest = sheerka.load(self.name, "digest")
if digest is not None:
return digest
start = 0
while True:
events = list(sheerka.sdp.load_events(5, start))
if not events:
break
for event in events:
if not sheerka.sdp.has_result(event.get_digest()):
continue
if not explanation_node.digest or explanation_node.digest == event.message:
# maybe explanation_node.digest is not a real digest, but the command we want to explain
return event.get_digest()
start += 5
if start > 20:
break
return None
@staticmethod
def get_execution_result(sheerka, digest):
if digest is None:
# the test is done here to ease the unit tests
return None
return [sheerka.sdp.load_result(digest)]
@staticmethod
def get_instructions(filter_node: FilterNode):
instructions = FormatInstructions()
for directive in filter_node.directives:
if isinstance(directive, RecurseDefNode):
instructions.set_recurse("children", directive.depth)
elif isinstance(directive, FormatLNode):
instructions.set_format_l(ExecutionContext, directive.template)
elif isinstance(directive, FormatDNode):
instructions.add_format_d(IsaNode(ExecutionContext), directive.properties)
return instructions
@staticmethod
def get_title(filter_node):
return "<title>"
def matches(self, context, return_value):
if not return_value.status:
return False
if not isinstance(return_value.value, ParserResultConcept):
return False
return isinstance(return_value.value.value, ExplanationNode)
def eval(self, context, return_value):
sheerka = context.sheerka
explanation_node = return_value.value.value
if explanation_node.digest and not explanation_node.record_digest:
context.log(f"Deleting recorded digest")
sheerka.delete(context, self.name, "digest")
digest = self.get_event_digest(sheerka, explanation_node)
executions_results = self.get_execution_result(sheerka, digest)
if executions_results is None and not digest:
res = sheerka.new(BuiltinConcepts.ERROR, body=f"No result found (digest={explanation_node.digest})")
else:
# record the digest if needed
if explanation_node.record_digest:
context.log(f"Recording digest '{digest}'")
sheerka.record(context, self.name, "digest", digest)
filter_nodes = explanation_node.expr.filters
global_instructions = self.get_instructions(filter_nodes[0])
if len(filter_nodes) == 1:
filtered = [[]]
self.filter(executions_results, filter_nodes, filtered)
res = sheerka.new(BuiltinConcepts.EXPLANATION,
digest=digest,
command=explanation_node.command,
title="<all>",
body=filtered[0],
instructions=global_instructions)
else:
res = []
filter_nodes = filter_nodes[1:] # remove the first filter_node (which always returns True)
filtered = []
for i in range(len(filter_nodes)):
filtered.append([])
self.filter(executions_results, filter_nodes, filtered)
for i, filter_node in enumerate(filter_nodes):
instructions = global_instructions.clone().merge(self.get_instructions(filter_node))
res.append(sheerka.new(BuiltinConcepts.EXPLANATION,
digest=digest,
command=explanation_node.command,
title=self.get_title(filter_node),
body=filtered[i],
instructions=instructions))
if len(res) == 1:
res = res[0]
return sheerka.ret(self.name, not sheerka.isinstance(res, BuiltinConcepts.ERROR), res, parents=[return_value])
def filter(self, executions_results, filter_nodes: List[FilterNode], res):
for execution_result in executions_results:
for i, filter_node in enumerate(filter_nodes):
if filter_node.expr.eval(execution_result):
res[i].append(execution_result)
if execution_result.children:
self.filter(execution_result.children, filter_nodes, res)
return res
+15 -9
View File
@@ -1,8 +1,10 @@
import ast
import copy
import traceback
from functools import partial, update_wrapper
import core.ast.nodes
from core.sheerka.services.SheerkaFilter import Pipe
import core.utils
from core.ast.visitors import UnreferencedNamesVisitor
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
@@ -86,22 +88,22 @@ class PythonEvaluator(OneReturnValueEvaluator):
return sheerka.ret(self.name, False, error, parents=[return_value])
def get_globals(self, context, node):
my_locals = {
my_globals = {
"Concept": core.concept.Concept,
"BuiltinConcepts": core.builtin_concepts.BuiltinConcepts,
}
# has to tbe the first, to allow override
method_from_sheerka = self.update_globals_with_sheerka_methods(my_locals, context)
# has to be the first, to allow override
method_from_sheerka = self.update_globals_with_sheerka_methods(my_globals, context)
self.update_globals_with_context(my_locals, context)
self.update_globals_with_node(my_locals, context, node)
self.update_globals_with_context(my_globals, context)
self.update_globals_with_node(my_globals, context, node)
if self.locals: # when exta values are given. Add them
my_locals.update(self.locals)
if self.locals: # when extra values are given. Add them
my_globals.update(self.locals)
my_locals["sheerka"] = Expando(method_from_sheerka) # it's the last, so I cannot be overridden
return my_locals
my_globals["sheerka"] = Expando(method_from_sheerka) # it's the last, so I cannot be overridden
return my_globals
@staticmethod
def update_globals_with_sheerka_methods(my_locals, context):
@@ -118,6 +120,10 @@ class PythonEvaluator(OneReturnValueEvaluator):
for method_name, method in methods_from_sheerka.items():
my_locals[method_name] = method
# Add pipeable functions
for func_name, function in context.sheerka.sheerka_pipeables.items():
my_locals[func_name] = Pipe(function, context)
return methods_from_sheerka # to allow access using prefix "sheerka."
def update_globals_with_context(self, my_locals, context):
-361
View File
@@ -1,361 +0,0 @@
from dataclasses import dataclass, field
from typing import List, Dict
from core.builtin_concepts import BuiltinConcepts
from core.tokenizer import LexerError, Token
from parsers.BaseParser import Node, UnexpectedTokenErrorNode, BaseSplitIterParser, UnexpectedEof, ErrorNode
from parsers.ExpressionParser import ExprNode, TrueNode, PropertyEqualsNode, PropertyContainsNode, OrNode, AndNode
@dataclass()
class ValueErrorNode(ErrorNode):
"""
When the value parse has an incorrect type or value
"""
message: str
token: Token # token when the error is detected
@dataclass()
class MultipleDigestError(ErrorNode):
message: str
token: Token
@dataclass()
class ExplanationNode(Node):
digest: str # digest of the event to explain
command: str # original explain command
expr: ExprNode = None
record_digest: bool = False
@dataclass
class FilterNode(ExprNode):
"""
Wraps predicates
"""
expr: ExprNode
directives: List[ExprNode] = field(default_factory=list)
def eval(self, obj):
return self.expr.eval(obj)
@dataclass
class RecurseDefNode(ExprNode):
"""
It is used to defined the depth of the recursion
"""
depth: int
@dataclass
class FormatLNode(ExprNode):
"""
Define the template to use for ExecutionContext when printed in line
"""
template: str
@dataclass
class FormatDNode(ExprNode):
"""
Defines the properties to display, and their format
"""
properties: Dict[str, str]
@dataclass
class UnionNode(ExprNode):
"""
Define the template to use for ExecutionContext when printed in line
"""
filters: List[FilterNode]
def eval(self, obj):
if len(self.filters) == 0:
return False
if len(self.filters) == 0:
return self.filters[0].eval(obj)
res = False
for f in self.filters[1:]:
res |= f.eval(obj)
return res
class ExplainParser(BaseSplitIterParser):
def __init__(self, **kwargs):
super().__init__("Explain", 81, none_on_eof=True)
def parse_explain(self):
token = self.get_token()
if token is None:
return BuiltinConcepts.IS_EMPTY
if token.value != 'explain':
self.add_error(UnexpectedTokenErrorNode("", token, ["explain"]))
return BuiltinConcepts.NOT_FOR_ME
digest = ""
record_digest = False
expr_node = UnionNode([FilterNode(TrueNode(), [])])
self.next_token()
while True:
# no need to continue when error
if self.has_error:
return None
token = self.get_token()
if token is None:
break
if token.value == "-f" or token.value == "--filter":
self.next_token()
expr_node.filters.append(self.parse_filter())
elif token.value in ("-r", "--recurse"):
self.next_token()
expr_node.filters[-1].directives.append(self.parse_recurse())
elif token.value == "--format_l":
self.next_token()
expr_node.filters[-1].directives.append(self.parse_format_l())
elif token.value == "--format_d":
self.next_token()
expr_node.filters[-1].directives.append(self.parse_format_d())
elif token.value in ("-d", "--digest"):
self.next_token()
digest = self.parse_digest(digest)
record_digest = True
elif token.value.startswith("-"):
self.add_error(UnexpectedTokenErrorNode("", token, []))
else:
digest = self.parse_digest(digest)
return ExplanationNode(digest, self.text, expr=expr_node, record_digest=record_digest)
def parse_digest(self, digest):
token = self.get_token()
if token is None or token.value.startswith("-"):
return ""
if digest != "":
self.add_error(MultipleDigestError("Too many digest", token))
return None
digest = token.value
self.next_token()
return digest
def parse_filter(self):
node = self.parse_or()
if node is None:
return None
return FilterNode(node)
def parse_or(self):
parts = []
node = self.parse_and()
if node is None:
return None
parts.append(node)
while True:
token = self.get_token()
if token is None or token.value != "or":
break
self.next_token()
node = self.parse_and()
if node is None:
return None
else:
parts.append(node)
return parts[0] if len(parts) == 1 else OrNode(*parts)
def parse_and(self):
parts = []
node = self.parse_predicate()
if node is None:
return None
parts.append(node)
while True:
token = self.get_token()
if token is None or token.value != "and":
break
self.next_token()
node = self.parse_predicate()
if node is None:
return None
else:
parts.append(node)
return parts[0] if len(parts) == 1 else AndNode(*parts)
def parse_predicate(self):
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing filter"))
return None
if token.value == "(":
self.next_token()
expr = self.parse_or()
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Missing right parenthesis"))
return None
if token.value != ")":
self.add_error(UnexpectedTokenErrorNode("Parenthesis mismatch", token, [")"]))
return None
self.next_token()
else:
expr = self.parse_property_predicate()
return expr
def parse_recurse(self):
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing recurse"))
return None
try:
depth = int(token.value)
self.next_token()
return RecurseDefNode(depth)
except ValueError:
self.add_error(ValueErrorNode(f"'{token.value}' is not an integer", token))
return None
def parse_format_l(self):
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing format_l"))
return None
if token.value.startswith("-"):
self.add_error(UnexpectedTokenErrorNode("parsing format_l", token, ["<property name>"]))
return None
template = token.value
self.next_token()
return FormatLNode(template)
def parse_format_d(self):
props = {}
while TrueNode:
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing format_d"))
return None
if token.value.startswith("-"):
self.add_error(UnexpectedTokenErrorNode("parsing format_d", token, ["<property name>"]))
return None
parts = token.value.split(':')
if len(parts) == 1:
props[token.value] = "{" + token.value + "}"
else:
props[parts[0]] = parts[1]
self.next_token()
token = self.get_token()
if token is None or token.value.startswith("-"):
break
elif token.value == ",":
self.next_token()
else:
self.add_error(UnexpectedTokenErrorNode("parsing format_d", token, ["<eof>", ","]))
return FormatDNode(props)
def parse_property_predicate(self):
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing predicate"))
return None
prop_name = token.value
if prop_name.startswith("-"):
self.add_error(UnexpectedTokenErrorNode("while parsing predicate", token, ["<property_name>"]))
return None
self.next_token()
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing predicate"))
return None
operand = token.value
if operand not in ("=", "=="):
self.add_error(UnexpectedTokenErrorNode("Unexpected token when parsing predicate", token, ['=', "=="]))
return None
self.next_token()
token = self.get_token()
if token is None:
self.add_error(UnexpectedEof("Unexpected EOF while parsing filter"))
return None
self.next_token()
prop_value = token.value
return PropertyEqualsNode(prop_name, prop_value) if operand == "==" else \
PropertyContainsNode(prop_name, prop_value)
def parse(self, context, parser_input):
"""
parser_input can be string, but text can also be an list of tokens
:param context:
:param parser_input:
:return:
"""
context.log(f"Parsing '{parser_input}'", self.name)
sheerka = context.sheerka
if not isinstance(parser_input, str):
return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.NOT_FOR_ME, reason=parser_input))
explanation_node = None
try:
self.reset_parser(context, parser_input)
self.next_token()
explanation_node = self.parse_explain()
except LexerError as e:
self.add_error(e, False)
if self.has_error or not isinstance(explanation_node, ExplanationNode):
if explanation_node in (BuiltinConcepts.NOT_FOR_ME, BuiltinConcepts.IS_EMPTY):
error_body = sheerka.new(
BuiltinConcepts.NOT_FOR_ME,
body=parser_input,
reason=self.error_sink if self.has_error else BuiltinConcepts.IS_EMPTY)
else:
error_body = sheerka.new(
BuiltinConcepts.ERROR,
body=self.error_sink)
ret = sheerka.ret(self.name, False, error_body)
else:
ret = sheerka.ret(self.name, True,
sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=parser_input,
body=explanation_node))
self.log_result(context, parser_input, ret)
return ret
+29 -10
View File
@@ -5,7 +5,6 @@ from typing import Dict
from core.concept import Concept
from core.utils import get_full_qualified_name
from parsers.ExpressionParser import ExprNode
class FormatDetailType(Enum):
@@ -18,12 +17,21 @@ class FormatDetailDesc:
"""
class that describes how to print the details
"""
predicate: ExprNode # the detail will be printed if the predicate is matched
format_type: FormatDetailType
properties: Dict[str, str] # name of the property, format to use
def __post_init__(self):
if isinstance(self.properties, list):
self.properties = {p: "{" + p + "}" for p in self.properties}
class FormatInstructions:
"""
This class instructs how a concept should be displayed
In order to be processed, it has to be placed under the property FORMAT_INSTRUCTION of the concept
Note that the format instructions can be modified by rules (when there will be rules !)
"""
def __init__(self, tab_indent=None, tab=None, no_color=None):
self._tab_indent = 2
self._tab = ""
@@ -31,12 +39,13 @@ class FormatInstructions:
self.recursive_props = {} # which property that does in recursion and what depth
self.format_l = {} # what format to use when printing obj line by line
self.format_d = [] # list of FormatDetailDesc
self.format_d = {} # What to use when printing obj details
# keep track of the modifications
# keep track of the modifications in order to merge
self.modified = set()
self.recursive_props_modified = set()
self.format_l_modified = set()
self.format_d_modified = set()
if tab_indent is not None:
self.tab_indent = tab_indent
@@ -89,10 +98,16 @@ class FormatInstructions:
self.format_l_modified.add(key)
return self
def add_format_d(self, predicate, properties, format_type=FormatDetailType.Props_In_Line):
if isinstance(properties, list):
properties = dict((p, "{" + p + " }") for p in properties)
self.format_d.append(FormatDetailDesc(predicate, format_type, properties))
def set_format_d(self, obj, properties, format_type=FormatDetailType.Props_In_Line):
"""
Defines how to print the detail of an object
:param properties:
:param format_type:
:return:
"""
key = self.get_obj_key(obj)
self.format_d[key] = FormatDetailDesc(format_type, properties)
self.format_d_modified.add(key)
return self
def clone(self):
@@ -100,6 +115,9 @@ class FormatInstructions:
return clone
def merge(self, other):
if other is None:
return self
for prop in other.modified:
setattr(self, prop, getattr(other, prop))
@@ -109,12 +127,13 @@ class FormatInstructions:
for key in other.format_l_modified:
self.set_format_l(key, other.format_l[key])
self.format_d.extend(other.format_d)
for key in other.format_d_modified:
self.set_format_d(key, other.format_d[key].properties, other.format_d[key].format_type)
return self
@staticmethod
def get_obj_key(obj):
return obj.id if isinstance(obj, Concept) else \
return f"c:{obj.id}:" if isinstance(obj, Concept) else \
obj if isinstance(obj, str) else \
get_full_qualified_name(obj)
+169 -24
View File
@@ -1,6 +1,16 @@
from dataclasses import dataclass
from core.utils import as_bag
from printer.FormatInstructions import FormatDetailDesc, FormatDetailType, FormatInstructions
@dataclass
class BraceToken:
start: int
end: int
colon: int
class Formatter:
def __init__(self):
@@ -10,22 +20,34 @@ class Formatter:
def reset_formats(self):
self.custom_l_formats = {}
self.custom_d_formats = []
self.custom_d_formats = {}
def register_format_l(self, obj, template):
key = FormatInstructions.get_obj_key(obj)
self.custom_l_formats[key] = template
return self
def register_format_d(self, predicate, properties, format_type=FormatDetailType.Props_In_Line):
if isinstance(properties, list):
properties = dict([(p, "{" + p + "}") for p in properties])
self.custom_d_formats.append(FormatDetailDesc(predicate, format_type, properties))
def register_format_d(self, obj, properties=None, format_type=FormatDetailType.Props_In_Line):
key = FormatInstructions.get_obj_key(obj)
if properties is None:
if isinstance(obj, str):
raise Exception("I need the instance of the obj to compute the properties")
bag = as_bag(obj)
properties = dict((p, "{" + p + "}") for p in bag)
self.custom_d_formats[key] = FormatDetailDesc(format_type, properties)
return self
def compute_format_l(self, custom_formats_override, key):
if custom_formats_override and key in custom_formats_override:
custom_template = custom_formats_override[key]
def compute_format_l(self, format_l_override, key):
"""
merge format_l_override and default format_l
:param format_l_override:
:param key:
:return:
"""
if format_l_override and key in format_l_override:
custom_template = format_l_override[key]
if custom_template in ("+", "\\+", "+\\"):
return custom_template
elif custom_template.startswith("+"):
@@ -45,28 +67,50 @@ class Formatter:
else:
return None
def compute_format_d(self, custom_formats_override):
if custom_formats_override and not self.custom_d_formats:
return custom_formats_override
if self.custom_d_formats and not custom_formats_override:
def compute_format_d(self, format_d_override, key):
"""
merge format_d_override and default format_d
:param format_d_override:
:param key:
:return:
"""
if format_d_override and key in format_d_override:
return format_d_override
elif self.custom_d_formats and key in self.custom_d_formats:
return self.custom_d_formats
if self.custom_d_formats and custom_formats_override:
return self.custom_d_formats + custom_formats_override
return []
else:
return None
def format_l(self, obj, custom_formats_override=None):
"""
Get the one-line representation of obj
:param obj:
:param custom_formats_override:
:return:
"""
key = FormatInstructions.get_obj_key(obj)
format_l = self.compute_format_l(custom_formats_override, key)
return self.to_string(obj, format_l) if format_l else str(obj)
return self.to_string_l(obj, format_l) if format_l else str(obj)
def format_d(self, obj, format_d_desc: FormatDetailDesc):
def format_d(self, obj, format_d_override=None):
"""
Get the detail representation of an object
:param obj: object to format
:param format_d_override: dictionary of format_d templates
:return: Formatted representation or None
"""
key = FormatInstructions.get_obj_key(obj)
format_d = self.compute_format_d(format_d_override, key)
if not format_d:
return None
format_d_desc = format_d[key]
max_prop_length = self.get_properties_max_length(format_d_desc.properties.keys())
res = ""
for prop, template in format_d_desc.properties.items():
if res:
res += "\n"
#value = getattr(obj, prop) if hasattr(obj, prop) else "*Undefined*"
res += prop.ljust(max_prop_length) + ": " + self.to_string(obj, template)
res += prop.ljust(max_prop_length) + ": " + self.to_string_d(obj, template, max_prop_length + 2)
return res
@@ -75,9 +119,110 @@ class Formatter:
return max((len(p) for p in properties))
@staticmethod
def to_string(obj, template):
def to_string_l(obj, template):
try:
bag = obj.to_bag() if hasattr(obj, "to_bag") else obj.__dict__
return template.format(**bag)
except KeyError:
return "*Undefined*"
bag = as_bag(obj)
return eval("f'" + template + "'", bag)
except KeyError as kerr:
return f"*Undefined {kerr}*"
except NameError as nerr:
return f"*{nerr}*"
def to_string_d(self, obj, template, tab):
def format_obj(o, _tab, override=None):
if o == obj:
return o
res = self.format_d(o, override)
if res is not None:
return res
if isinstance(o, str):
return f'"{o}"' if "'" in o else f"'{o}'"
if isinstance(o, dict):
if len(o) == 0:
return "{}"
res = "{"
max_prop_length = self.get_properties_max_length([f"'{k}'" for k in o.keys()])
for prop in o:
if res != "{":
res += "\n" + (" " * (_tab + 1))
details = str(format_obj(o[prop], _tab + max_prop_length + 3, override))
res += f"'{prop}'".ljust(max_prop_length) + ": " + details
return res + "}"
if hasattr(o, "__iter__"):
res = "[" if isinstance(o, list) else "{" if isinstance(o, set) else "("
for item in o:
if res not in ("[", "(", "{"):
res += "," + "\n" + (" " * (_tab + 1))
res += str(format_obj(item, _tab + 1, override))
return res + ("]" if isinstance(o, list) else "}" if isinstance(o, set) else ")")
return o
template = self.inject_format_obj(template, tab)
try:
bag = as_bag(obj)
bag["format_obj"] = format_obj
return eval("f'" + template + "'", bag)
except KeyError as kerr:
return f"*Undefined {kerr}*"
except NameError as nerr:
return f"*{nerr}*"
@staticmethod
def braces(template):
if template is None:
return
i = 0
start = -1
colon = -1
while i < len(template):
c = template[i]
if c == "{":
if i + 1 < len(template) and template[i + 1] == "{":
i += 1
else:
start = i
elif start != -1 and c == ":":
colon = i
elif start != -1 and c == "}":
if i + 1 < len(template) and template[i + 1] == "}":
i += 1
else:
yield BraceToken(start, i, colon)
start = -1
colon = -1
i += 1
@staticmethod
def inject_format_obj(template, tab):
if template is None:
return None
if template == "":
return ""
res = ""
previous = 0
for brace_token in iter(Formatter.braces(template)):
res += template[previous:brace_token.start]
res += "{format_obj("
if brace_token.colon != -1:
res += template[brace_token.start + 1: brace_token.colon]
res += f", {tab})"
res += template[brace_token.colon: brace_token.end + 1]
else:
res += template[brace_token.start + 1: brace_token.end]
res += f", {tab})"
res += template[brace_token.end]
previous = brace_token.end + 1
res += template[previous: len(template)]
return res
+56 -16
View File
@@ -1,4 +1,5 @@
import click
import types
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
from printer.FormatInstructions import FormatInstructions, FormatDetailType
@@ -28,7 +29,6 @@ class SheerkaPrinter:
def __init__(self, sheerka):
self.sheerka = sheerka
self.formatter = Formatter()
self.formatter.register_format_l(EXECUTION_CONTEXT_CLASS, "[{id:3}] %tab%{desc} ({status})")
self.custom_concepts_printers = None
self.reset()
@@ -38,6 +38,10 @@ class SheerkaPrinter:
str(BuiltinConcepts.RETURN_VALUE): self.print_return_value,
}
self.formatter.reset_formats()
self.formatter.register_format_l(EXECUTION_CONTEXT_CLASS, "[{id:3}] %tab%{desc} ({status})")
self.formatter.register_format_l(SyntaxError,
'%red%{self.__class__.__name__}: {msg}\\n{text}\\n{"^": >{offset}}%reset%')
self.formatter.register_format_l(Exception, "%red%{self}%reset%")
def register_custom_printer(self, concept, custom_format):
key = concept.key if isinstance(concept, Concept) else concept
@@ -47,12 +51,22 @@ class SheerkaPrinter:
def register_format_l(self, obj, template):
self.formatter.register_format_l(obj, template)
def register_format_d(self, predicate, properties, format_type=FormatDetailType.Props_In_Line):
self.formatter.register_format_d(predicate, properties, format_type)
def register_format_d(self, obj, properties=None, format_type=FormatDetailType.Props_In_Line):
self.formatter.register_format_d(obj, properties, format_type)
def print(self, to_print, instructions=None):
"""
Print using SheerkaPrinter.out
:param to_print:
:param instructions: FormatInstructions
:return:
"""
instructions = instructions or FormatInstructions()
self.fp(instructions, to_print)
try:
self.fp(instructions, to_print)
except Exception as ex:
self.fp(instructions, ex)
return
def fp(self, instructions, item):
"""
@@ -61,11 +75,12 @@ class SheerkaPrinter:
:param item:
:return:
"""
if isinstance(item, (list, tuple)):
for i in item:
self.fp(instructions, i)
return
elif isinstance(item, str):
# first, get the merged instructions
instructions = self.merge_instructions(instructions, item)
# We can only print string
if isinstance(item, str):
for color in COLORS:
item = item.replace("%" + color + "%", "" if instructions.no_color else COLORS[color])
if "%tab%" in item:
@@ -74,23 +89,48 @@ class SheerkaPrinter:
self.out(instructions.tab + item)
return
# if list or generator, print one by one
elif hasattr(item, "__iter__"):
for i in item:
self.fp(instructions, i)
return
# Custom print required
elif isinstance(item, Concept) and item.key in self.custom_concepts_printers:
self.custom_concepts_printers[item.key](self, instructions, item)
# get the format per line and print
else:
self.fp(instructions, self.formatter.format_l(item, instructions.format_l))
# print details
format_d = self.formatter.compute_format_d(instructions.format_d)
for format_d_desc in reversed(format_d):
if format_d_desc.predicate.eval(item):
self.fp(instructions, self.formatter.format_d(item, format_d_desc))
break
format_d = self.formatter.format_d(item, instructions.format_d)
if format_d:
self.fp(instructions, format_d)
if instructions.recursive_props:
for k, v in instructions.recursive_props.items():
if hasattr(item, k) and v > 0 and (value := getattr(item, k)) != BuiltinConcepts.NOT_INITIALIZED:
self.fp(instructions.recurse(k), value)
@staticmethod
def merge_instructions(instructions, obj):
"""
Merge the format instruction coming from the context with the one from obj
Note that if obj is not a Concept, there is not instruction to merge
:param instructions:
:param obj:
:return:
"""
if not hasattr(obj, "get_format_instructions"):
return instructions
obj_instructions = obj.get_format_instructions()
if obj_instructions is None:
return instructions
return instructions.clone().merge(obj_instructions)
@staticmethod
def print_explanation(printer, instructions, item):
explanation_instructions = instructions.clone().merge(item.instructions)
@@ -102,7 +142,7 @@ class SheerkaPrinter:
if printer.sheerka.isinstance(item.body, BuiltinConcepts.EXPLANATION):
return printer.fp(instructions, item.body)
if isinstance(item.body, (list, tuple)):
if isinstance(item.body, (list, tuple, types.GeneratorType)):
return printer.fp(instructions, item.body)
status = item.status
+3
View File
@@ -174,6 +174,9 @@ class SheerkaDataProviderDictionaryIO(SheerkaDataProviderIO):
stream.close = on_close(self, file_path, stream)(stream.close)
return stream
if file_path not in self.cache:
raise FileNotFoundError(file_path)
return io.BytesIO(self.cache[file_path]) if "b" in mode else io.StringIO(self.cache[file_path])
def reset(self):