Refactored sheerka class: splitted to use sub handlers. Refactored unit tests to use classes.

This commit is contained in:
2020-01-22 17:49:28 +01:00
parent 821614a6c4
commit c489a38ebc
120 changed files with 7947 additions and 8190 deletions
View File
View File
+152
View File
@@ -0,0 +1,152 @@
from core.builtin_concepts import BuiltinConcepts, ListConcept
from core.concept import Concept
import ast
import core.utils
import logging
log = logging.getLogger(__name__)
class NodeParent:
"""
Class that represent the ancestor of a Node
For example, the 'For' nodes has three fields (target, iter and body)
So, for a node under For.iter
node -> For
field -> iter
"""
def __init__(self, node, field):
self.node = node
self.field = field
def __repr__(self):
if self.node is None:
return None
if self.field is None:
return self.node.get_node_type()
return self.node.get_node_type() + "." + self.field
def __eq__(self, other):
# I can compare with type for simplification
if isinstance(other, tuple):
return self.node.get_node_type() == other[0] and self.field == other[1]
# normal equals implementation
if not isinstance(other, NodeParent):
return False
return self.node.get_node_type() == other.node.get_node_type() and self.field == other.field
def __hash__(self):
return hash((self.node.get_node_type(), self.field))
class NodeConcept(Concept):
def __init__(self, key, node_type, parent: NodeParent):
super().__init__(key, True, False, key)
self.parent = parent
self.node_type = node_type
def get_node_type(self):
return self.node_type
class GenericNodeConcept(NodeConcept):
def __init__(self, node_type, parent):
super().__init__(BuiltinConcepts.GENERIC_NODE, node_type, parent)
def __repr__(self):
return "Generic:" + self.node_type
def get_node_type(self):
return self.node_type
def get_value(self):
if self.node_type == "Name":
return self.get_prop("id")
if self.node_type == "arg":
return self.get_prop("arg")
return self.body
class IdentifierNodeConcept(NodeConcept):
def __init__(self, parent, name):
super().__init__(BuiltinConcepts.IDENTIFIER_NODE, "Name", parent)
self.body = name
class CallNodeConcept(NodeConcept):
def __init__(self, parent=None):
super().__init__(BuiltinConcepts.IDENTIFIER_NODE, "Call", parent)
def get_args_names(self, sheerka):
return sheerka.get_values(self.get_prop("args"))
def python_to_concept(python_node):
"""
Transform Python AST node into concept nodes
for better usage
:param python_node:
:return:
"""
def _transform(node, parent):
node_type = node.__class__.__name__
concept = GenericNodeConcept(node_type, parent).init_key()
for field in node._fields:
if not hasattr(node, field):
continue
value = getattr(node, field)
concept.def_prop(field)
if isinstance(value, list):
lst = ListConcept().init_key()
for i in value:
lst.append(_transform(i, NodeParent(concept, field)))
concept.set_prop(field, lst)
elif isinstance(value, ast.AST):
concept.set_prop(field, _transform(value, NodeParent(concept, field)))
else:
concept.set_prop(field, value)
concept.metadata.is_evaluated = True
return concept
return _transform(python_node, None)
def concept_to_python(concept_node):
"""
Transform back concept_node to Python AST node
:param concept_node:
:return:
"""
def _transform(node):
node_type = node.get_node_type()
ast_object = core.utils.new_object("_ast." + node_type)
for field in node.props:
if field not in ast_object._fields:
continue
value = node.get_prop(field)
if isinstance(value, list) or isinstance(value, Concept) and value.key == str(BuiltinConcepts.LIST):
lst = []
for i in value:
lst.append(_transform(i))
setattr(ast_object, field, lst)
elif isinstance(value, NodeConcept):
setattr(ast_object, field, _transform(value))
else:
setattr(ast_object, field, value)
return ast_object
res = _transform(concept_node)
return res
+130
View File
@@ -0,0 +1,130 @@
from core.ast.nodes import GenericNodeConcept, NodeConcept
from core.builtin_concepts import ListConcept
class ConceptNodeVisitor:
"""
Base class to visit NodeConcept
It is insolently inspired by python AST.Visitor class
"""
def visit(self, node):
"""Visit a node."""
name = node.node_type if isinstance(node, GenericNodeConcept) else node.name
name = str(name).capitalize()
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(node)
def generic_visit(self, node):
"""Called if no explicit visitor function exists for a node."""
for field, value in iter_props(node):
if isinstance(value, ListConcept):
for item in value:
if isinstance(item, NodeConcept):
self.visit(item)
elif isinstance(value, NodeConcept):
self.visit(value)
def visit_Constant(self, node):
value = node.get_prop("value")
type_name = _const_node_type_names.get(type(value))
if type_name is None:
for cls, name in _const_node_type_names.items():
if isinstance(value, cls):
type_name = name
break
if type_name is not None:
method = 'visit_' + type_name
try:
visitor = getattr(self, method)
except AttributeError:
pass
else:
import warnings
warnings.warn(f"{method} is deprecated; add visit_Constant",
PendingDeprecationWarning, 2)
return visitor(node)
return self.generic_visit(node)
class UnreferencedNamesVisitor(ConceptNodeVisitor):
def __init__(self, sheerka):
self.names = set()
self.sheerka = sheerka
def visit_Name(self, node):
parents = get_parents(node)
if ("For", "target") in parents: # variable used by the 'for' iteration
return
if ("Call", "func") in parents: # name of the function
return
if ("Assign", "targets") in parents: # variable which is assigned
return
if self.can_be_discarded(self.sheerka.value(node), parents):
return
self.names.add(self.sheerka.value(node))
def can_be_discarded(self, variable_name, parents):
for node in (parent.node for parent in parents):
if node is None:
return False
if node.get_node_type() == "For" and self.sheerka.value(node.get_prop("target")) == variable_name:
# variable used by the loop
return True
if node.get_node_type() == "FunctionDef":
# variable defined as a function parameter
args = node.get_prop("args")
args_values = list(self.sheerka.get_values(args.get_prop("args")))
if variable_name in args_values:
return True
return False
class ExtractPredicateVisitor(ConceptNodeVisitor):
def __init__(self, variable_name):
self.predicates = []
self.variable_name = variable_name
def get_parents(node):
if node.parent is None:
return []
res = []
while True:
if node.parent is None:
break
res.append(node.parent)
node = node.parent.node
return res
def iter_props(node):
for p in node.props:
yield p, node.props[p].value
_const_node_type_names = {
bool: 'NameConstant', # should be before int
type(None): 'NameConstant',
int: 'Num',
float: 'Num',
complex: 'Num',
str: 'Str',
bytes: 'Bytes',
type(...): 'Ellipsis',
}
+387
View File
@@ -0,0 +1,387 @@
from enum import Enum
from core.concept import Concept, ConceptParts
class BuiltinConcepts(Enum):
"""
List of builtin concepts that do no need any specific implementation
Please note that the value of the enum is informal. It is not used in the system
For example, the concept 'NODE' DOES NOT have the key, the id or whatever 200
The key if the name of the concept
The id is a sequential number given just before the concept is saved in sdp
The values of the enum is not used the code
"""
SHEERKA = "sheerka"
BEFORE_PARSING = "before parsing" # activated before evaluation by the parsers
PARSING = "parsing" # activated during the parsing. It contains the text to parse
AFTER_PARSING = "after parsing" # after parsing
BEFORE_EVALUATION = "before evaluation" # before evaluation
EVALUATION = "evaluation" # activated when the parsing process seems to be finished
AFTER_EVALUATION = "after evaluation" # activated when the parsing process seems to be finished
BEFORE_RENDERING = "before rendering" # activate before the output is rendered
RENDERING = "rendering" # rendering the response from sheerka
AFTER_RENDERING = "after rendering" # rendering the response from sheerka
USER_INPUT = "user input" # represent an input from an user
SUCCESS = "success"
ERROR = "error"
UNKNOWN_CONCEPT = "unknown concept" # the request concept is not recognized
CANNOT_RESOLVE_CONCEPT = "cannot resolve concept" # when too many concepts with the same name
RETURN_VALUE = "return value" # a value is returned
CONCEPT_TOO_LONG = "concept too long" # concept cannot be processed by exactConcept parser
NEW_CONCEPT = "new concept" # when a new concept is added
UNKNOWN_PROPERTY = "unknown property" # when requesting for a unknown property
PARSER_RESULT = "parser result"
TOO_MANY_SUCCESS = "too many success" # when expecting a limited number of successful return value
TOO_MANY_ERRORS = "too many errors" # when expecting a limited number of successful return value
NOT_FOR_ME = "not for me" # a parser recognize that the entry is not meant for it
IS_EMPTY = "is empty" # when a set is empty
INVALID_RETURN_VALUE = "invalid return value" # the return value of an evaluator is not correct
CONCEPT_ALREADY_DEFINED = "concept already defined" # when you try to add the same concept twice
NOP = "no operation" # no operation concept. Does nothing
CONCEPT_EVAL_ERROR = "concept evaluation error" # cannot evaluate a property or metadata of a concept
ENUMERATION = "enum" # represents a list or a set
LIST = "list" # represents a list
CONCEPT_ALREADY_IN_SET = "concept already in set"
EVALUATOR_PRE_PROCESS = "evaluator pre process" # used modify / tweak behaviour of evaluators
CONCEPT_EVAL_REQUESTED = "concept eval requested"
REDUCE_REQUESTED = "reduce requested" # remove meaningless error when possible
NOT_A_SET = "not a set" # the concept has no entry in sets
NODE = "node"
GENERIC_NODE = "generic node"
IDENTIFIER_NODE = "identifier node"
def __repr__(self):
return "__" + self.name
def __str__(self):
return "__" + self.name
BuiltinUnique = [
BuiltinConcepts.BEFORE_PARSING,
BuiltinConcepts.PARSING,
BuiltinConcepts.AFTER_PARSING,
BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION,
BuiltinConcepts.BEFORE_RENDERING,
BuiltinConcepts.RENDERING,
BuiltinConcepts.AFTER_RENDERING,
BuiltinConcepts.SUCCESS,
BuiltinConcepts.NOP,
BuiltinConcepts.CONCEPT_EVAL_REQUESTED,
BuiltinConcepts.REDUCE_REQUESTED,
]
BuiltinErrors = [str(e) for e in {
BuiltinConcepts.ERROR,
BuiltinConcepts.UNKNOWN_CONCEPT,
BuiltinConcepts.CANNOT_RESOLVE_CONCEPT,
BuiltinConcepts.CONCEPT_TOO_LONG,
BuiltinConcepts.UNKNOWN_PROPERTY,
BuiltinConcepts.TOO_MANY_SUCCESS,
BuiltinConcepts.TOO_MANY_ERRORS,
BuiltinConcepts.INVALID_RETURN_VALUE,
BuiltinConcepts.CONCEPT_ALREADY_DEFINED,
BuiltinConcepts.CONCEPT_EVAL_ERROR,
BuiltinConcepts.CONCEPT_ALREADY_IN_SET,
BuiltinConcepts.NOT_A_SET,
}]
"""
Some concepts have a specific implementation
It's mainly to ease the usage
"""
class UserInputConcept(Concept):
def __init__(self, text=None, user_name=None):
super().__init__(BuiltinConcepts.USER_INPUT, True, False, BuiltinConcepts.USER_INPUT)
self.set_metadata_value(ConceptParts.BODY, text)
self.set_prop("user_name", user_name)
self.metadata.is_evaluated = True
@property
def text(self):
return self.body
@property
def user_name(self):
return self.props["user_name"].value
def __repr__(self):
return f"({self.id}){self.name}: '{self.body}'"
class ErrorConcept(Concept):
def __init__(self, error=None):
super().__init__(BuiltinConcepts.ERROR, True, False, BuiltinConcepts.ERROR)
self.set_metadata_value(ConceptParts.BODY, error)
self.metadata.is_evaluated = True
def __repr__(self):
return f"({self.id}){self.name}: {self.body}"
class UnknownConcept(Concept):
def __init__(self, metadata=None):
super().__init__(BuiltinConcepts.UNKNOWN_CONCEPT, True, False, BuiltinConcepts.UNKNOWN_CONCEPT)
self.set_metadata_value(ConceptParts.BODY, metadata)
self.metadata.is_evaluated = True
def __repr__(self):
return f"({self.id}){self.name}: {self.body}"
class ReturnValueConcept(Concept):
"""
This class represents the result of a data flow processing
It's the main input for the evaluators
"""
def __init__(self, who=None, status=None, value=None, message=None, parents=None):
super().__init__(BuiltinConcepts.RETURN_VALUE, True, False, BuiltinConcepts.RETURN_VALUE)
self.set_metadata_value(ConceptParts.BODY, value)
self.set_prop("who", who)
self.set_prop("status", status)
self.set_prop("message", message)
self.set_prop("parents", parents)
self.metadata.is_evaluated = True
@property
def who(self):
return self.props["who"].value
@who.setter
def who(self, value):
self.set_prop("who", value)
@property
def status(self):
return self.props["status"].value
@status.setter
def status(self, value):
self.set_prop("status", value)
@property
def value(self):
return self.body
@value.setter
def value(self, value):
self.set_metadata_value(ConceptParts.BODY, value)
@property
def message(self):
return self.props["message"].value
@message.setter
def message(self, value):
self.set_prop("message", value)
@property
def parents(self):
return self.props["parents"].value
@parents.setter
def parents(self, value):
self.set_prop("parents", value)
def __repr__(self):
return f"ReturnValue(who={self.who}, status={self.status}, value={self.value}, message={self.message})"
def __eq__(self, other):
if not isinstance(other, ReturnValueConcept):
return False
return self.who == other.who and \
self.status == other.status and \
self.value == other.value and \
self.message == other.message
def __hash__(self):
if hasattr(self.value, "__iter__") and not isinstance(self.value, str):
value_hash = hash(tuple(self.value))
else:
value_hash = hash(self.value)
return hash((self.who, self.status, value_hash))
class UnknownPropertyConcept(Concept):
"""
This error is raised when, during sheerka.new(), an unknown property is asked
"""
def __init__(self, property_name=None, concept=None):
super().__init__(BuiltinConcepts.UNKNOWN_PROPERTY, True, False, BuiltinConcepts.UNKNOWN_PROPERTY)
self.set_metadata_value(ConceptParts.BODY, property_name)
self.set_prop("concept", concept)
self.metadata.is_evaluated = True
def __repr__(self):
return f"UnknownProperty(property={self.property_name}, concept={self.concept})"
@property
def concept(self):
return self.props["concept"].value
@property
def property_name(self):
return self.body
class ParserResultConcept(Concept):
"""
Result of a parsing
"""
def __init__(self, parser=None, source=None, value=None, try_parsed=None):
super().__init__(BuiltinConcepts.PARSER_RESULT, True, False, BuiltinConcepts.PARSER_RESULT)
self.set_metadata_value(ConceptParts.BODY, value)
self.set_prop("parser", parser)
self.set_prop("source", source)
self.set_prop("try_parsed", try_parsed) # in case of error, what was found before the error
self.metadata.is_evaluated = True
def __repr__(self):
text = f"ParserResult(parser={self.props['parser'].value}"
source = self.props['source'].value
text += f", source='{source}')" if source else f", body='{self.body}')"
return text
def __eq__(self, other):
if not isinstance(other, ParserResultConcept):
return False
return self.source == other.source and \
self.parser == other.parser and \
self.body == other.body and \
self.try_parsed == other.try_parsed
def __hash__(self):
return hash(self.metadata.name)
@property
def value(self):
return self.body
@property
def try_parsed(self):
return self.props["try_parsed"].value
@property
def source(self):
return self.props["source"].value
@property
def parser(self):
return self.props["parser"].value
class InvalidReturnValueConcept(Concept):
"""
Error returned when an evaluator is not correctly coded
The accepted return value are
ReturnValueConcept, list of ReturnValueConcept or None
"""
def __init__(self, return_value=None, evaluator=None):
super().__init__(
BuiltinConcepts.INVALID_RETURN_VALUE,
True,
False,
BuiltinConcepts.INVALID_RETURN_VALUE)
self.set_metadata_value(ConceptParts.BODY, return_value)
self.set_prop("evaluator", evaluator)
self.metadata.is_evaluated = True
class ConceptEvalError(Concept):
def __init__(self, error=None, concept=None, property_name=None):
super().__init__(BuiltinConcepts.CONCEPT_EVAL_ERROR,
True,
False,
BuiltinConcepts.CONCEPT_EVAL_ERROR)
self.set_metadata_value(ConceptParts.BODY, error)
self.set_prop("concept", concept)
self.set_prop("property_name", property_name)
self.metadata.is_evaluated = True
def __repr__(self):
return f"ConceptEvalError(error={self.error}, concept={self.concept}, property={self.property_name})"
@property
def error(self):
return self.body
@property
def concept(self):
return self.props["concept"].value
@property
def property_name(self):
return self.props["property_name"].value
class EnumerationConcept(Concept):
def __init__(self, iteration=None):
super().__init__(BuiltinConcepts.ENUMERATION, True, False, BuiltinConcepts.ENUMERATION)
self.set_metadata_value(ConceptParts.BODY, iteration)
self.metadata.is_evaluated = True
def __iter__(self):
return iter(self.body)
class ListConcept(Concept):
def __init__(self, items=None):
super().__init__(BuiltinConcepts.LIST, True, False, BuiltinConcepts.LIST)
self.set_metadata_value(ConceptParts.BODY, items or [])
self.metadata.is_evaluated = True
def append(self, obj):
self.body.append(obj)
def __len__(self):
return len(self.body)
def __getitem__(self, key):
return self.body[key]
def __setitem__(self, key, value):
self.body[key] = value
def __iter__(self):
return iter(self.body)
def __contains__(self, item):
return item in self.body
class ConceptAlreadyInSet(Concept):
def __init__(self, concept=None, concept_set=None):
super().__init__(BuiltinConcepts.CONCEPT_ALREADY_IN_SET,
True,
False,
BuiltinConcepts.CONCEPT_ALREADY_IN_SET)
self.set_metadata_value(ConceptParts.BODY, concept)
self.set_prop("concept_set", concept_set)
self.metadata.is_evaluated = True
def __repr__(self):
return f"ConceptAlreadyInSet(concept={self.concept}, concept_set={self.concept_set})"
@property
def concept(self):
return self.body
@property
def concept_set(self):
return self.props["concept_set"].value
+214
View File
@@ -0,0 +1,214 @@
import ast
import logging
import core.ast.nodes
from core.ast.nodes import CallNodeConcept, GenericNodeConcept
from core.ast.visitors import UnreferencedNamesVisitor
from core.builtin_concepts import BuiltinConcepts
def is_same_success(sheerka, return_values):
"""
Returns True if all returns values are successful and have the same value
:param sheerka:
:param return_values:
:return:
"""
assert isinstance(return_values, list)
if not return_values[0].status:
return False
reference = sheerka.value(return_values[0].value)
for return_value in return_values[1:]:
if not return_value.status:
return False
actual = sheerka.value(return_value.value)
if actual != reference:
return False
return True
def expect_one(context, return_values, logger=None):
"""
Checks if there is at least one success return value
If there is more than one, check if it's the same value
:param context:
:param return_values:
:param logger:
:return:
"""
if not isinstance(return_values, list):
return return_values
sheerka = context.sheerka
if len(return_values) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
parents=return_values)
successful_results = [item for item in return_values if item.status]
number_of_successful = len(successful_results)
# total_items = len(return_values)
# remove errors when a winner is found
if number_of_successful == 1:
return sheerka.ret(
context.who,
True,
successful_results[0].body,
parents=return_values)
# too many winners, which one to choose ?
if number_of_successful > 1:
if is_same_success(sheerka, successful_results):
return sheerka.ret(
context.who,
True,
successful_results[0].value,
parents=return_values)
else:
if logger and logger.isEnabledFor(logging.DEBUG):
context.log(logger, f"Too many successful results found by expect_one()", context.who)
for s in successful_results:
context.log(logger, f"-> {s}", context.who)
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results),
parents=return_values)
# only errors, i cannot help you
if logger and logger.isEnabledFor(logging.DEBUG):
context.log(logger, f"Too many errors found by expect_one()", context.who)
for s in successful_results:
context.log(logger, f"-> {s}", context.who)
if len(return_values) == 1:
return sheerka.ret(
context.who,
False,
return_values[0],
parents=return_values)
else:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
parents=return_values)
def get_names(sheerka, concept_node):
"""
Finds all the names referenced by the concept_node
:param sheerka:
:param concept_node:
:return:
"""
unreferenced_names_visitor = UnreferencedNamesVisitor(sheerka)
unreferenced_names_visitor.visit(concept_node)
return list(unreferenced_names_visitor.names)
def extract_predicates(sheerka, expression, variables_to_include, variables_to_exclude):
"""
from a given expression and a variable (or list of variables)
tries to find out all the predicates referencing the(se) variable(s), and the(se) variable(s) solely
for example
exp : isinstance(a, int) and isinstance(b, str)
will return 'isinstance(a, int)' if variable_name == 'a'
:param sheerka:
:param expression:
:param variables_to_include:
:param variables_to_exclude:
:return: list of predicates
"""
if len(variables_to_include) == 0:
return []
def _get_predicates(_nodes):
_predicates = []
for _node in _nodes:
python_node = ast.Expression(body=core.ast.nodes.concept_to_python(_node))
python_node = ast.fix_missing_locations(python_node)
_predicates.append(python_node)
return _predicates
if isinstance(expression, str):
node = ast.parse(expression, mode="eval")
else:
return NotImplementedError()
concept_node = core.ast.nodes.python_to_concept(node)
main_op = concept_node.get_prop("body")
return _get_predicates(_extract_predicates(sheerka, main_op, variables_to_include, variables_to_exclude))
def _extract_predicates(sheerka, node, variables_to_include, variables_to_exclude):
predicates = []
def _matches(_names, to_include, to_exclude):
_res = None
for n in _names:
if n in to_include and _res is None:
_res = True
if n in to_exclude:
_res = False
return _res
if node.node_type == "Compare":
if node.get_prop("left").node_type == "Name":
"""Simple case of one comparison"""
comparison_name = sheerka.value(node.get_prop("left"))
if comparison_name in variables_to_include and comparison_name not in variables_to_exclude:
predicates.append(node)
else:
"""The left part is an expression"""
res = _extract_predicates(sheerka, node.get_prop("left"), variables_to_include, variables_to_exclude)
if len(res) > 0:
predicates.append(node)
elif node.node_type == "Call":
"""Simple case predicate"""
call_node = node if isinstance(node, CallNodeConcept) else CallNodeConcept().update_from(node)
args = list(call_node.get_args_names(sheerka))
if _matches(args, variables_to_include, variables_to_exclude):
predicates.append(node)
elif node.node_type == "UnaryOp" and node.get_prop("op").node_type == "Not":
"""Simple case of negation"""
res = _extract_predicates(sheerka, node.get_prop("operand"), variables_to_include, variables_to_exclude)
if len(res) > 0:
predicates.append(node)
elif node.node_type == "BinOp":
names = get_names(sheerka, node)
if _matches(names, variables_to_include, variables_to_exclude):
predicates.append(node)
elif node.node_type == "BoolOp":
all_op = True
temp_res = []
for op in node.get_prop("values"):
res = _extract_predicates(sheerka, op, variables_to_include, variables_to_exclude)
if len(res) == 0:
all_op = False
else:
temp_res.extend(res)
if all_op:
predicates.append(node)
else:
for res in temp_res:
predicates.append(res)
return predicates
+405
View File
@@ -0,0 +1,405 @@
import hashlib
from collections import namedtuple
from dataclasses import dataclass, field
from enum import Enum
from core.sheerka_logger import get_logger
import core.utils
from core.tokenizer import Tokenizer, TokenKind
PROPERTIES_FOR_DIGEST = ("name", "key",
"definition", "definition_type",
"is_builtin", "is_unique",
"where", "pre", "post", "body",
"desc", "props")
PROPERTIES_TO_SERIALIZE = PROPERTIES_FOR_DIGEST + tuple(["id"])
PROPERTIES_FOR_NEW = ("where", "pre", "post", "body", "desc")
VARIABLE_PREFIX = "__var__"
class ConceptParts(Enum):
"""
Lists metadata that can contains some code
"""
WHERE = "where"
PRE = "pre"
POST = "post"
BODY = "body"
@staticmethod
def get_parts():
return set(item.value for item in ConceptParts)
@dataclass
class ConceptMetadata:
name: str
is_builtin: bool
is_unique: bool
key: str # name od the concept, where prop are replaced. to ease search
body: str # main method, can also be the value of the concept
where: str # condition to recognize variables in name
pre: str # list of pre conditions before calling the main function
post: str # list of post conditions after calling the main function
definition: str # regex used to define the concept
definition_type: str # definition can be done with something else than regex
desc: str # possible description for the concept
id: str # unique identifier for a concept. The id will never be modified (but the key can)
props: list # list properties, with their default values
is_evaluated: bool = False # True is the concept is evaluated by sheerka.eval_concept()
simplec = namedtuple("concept", "name body") # for simple concept (tests purposes only)
class Concept:
"""
Default concept object
A concept is a the base object of our universe
Everything is a concept
"""
def __init__(self, name=None,
is_builtin=False,
is_unique=False,
key=None,
body=None,
where=None,
pre=None,
post=None,
definition=None,
definition_type=None,
desc=None,
id=None,
props=None):
metadata = ConceptMetadata(
str(name) if name else None,
is_builtin,
is_unique,
str(key) if key else None,
body,
where,
pre,
post,
definition,
definition_type,
desc,
id,
props or []
)
self.metadata = metadata
self.compiled = {} # cached ast for the where, pre, post and body parts
self.values = {} # values of metadata once resolved
self.props = {} # resolved properties of this concept
self.bnf = None
self.log = get_logger("core." + self.__class__.__name__)
self.init_log = get_logger("init.core." + self.__class__.__name__)
def __repr__(self):
return f"({self.metadata.id}){self.metadata.name}"
def __eq__(self, other):
if isinstance(other, simplec):
return self.name == other.name and self.body == other.body
if id(self) == id(other):
return True
if not isinstance(other, Concept):
return False
# check the metadata
for prop in PROPERTIES_TO_SERIALIZE:
# print(prop) # use full to know which id does not match
my_value = getattr(self.metadata, prop)
other_value = getattr(other.metadata, prop)
if isinstance(my_value, Concept) and isinstance(other_value, Concept):
# need to check if circular references
if id(self) == id(other):
continue
sub_value = getattr(other_value.metadata, prop)
while isinstance(sub_value, Concept):
if id(self) == id(sub_value):
return False # circular reference
sub_value = getattr(sub_value.metadata, prop)
if my_value != other_value:
return False
else:
if my_value != other_value:
return False
# checks the values
if len(self.values) != len(other.values):
return False
for metadata in self.values:
if self.get_metadata_value(metadata) != other.get_metadata_value(metadata):
return False
if len(self.props) != len(other.props):
return False
for prop in self.props:
if self.get_prop(prop) != other.get_prop(prop):
return False
return True
def __hash__(self):
return hash(self.metadata.name)
def __getattr__(self, item):
# I have this complicated implementation because of the usage of Pickle
if 'props' in vars(self) and item in self.props:
return self.props[item].value
name = self.name if 'metadata' in vars(self) else 'Concept'
raise AttributeError(f"'{name}' concept has no attribute '{item}'")
def def_prop(self, prop_name: str, default_value=None):
"""
Adds a property to the metadata
:param prop_name:
:param default_value:
:return:
"""
assert default_value is None or isinstance(default_value, str) # default properties will have to be evaluated
self.metadata.props.append((prop_name, default_value))
self.props[prop_name] = Property(prop_name, None) # do not set the default value
# why not setting props to the default values ?
# Because it may not be the real values, as metadata.props need to be evaluated
return self
def def_prop_by_index(self, index: int, value):
"""
Re-assign a value to a property (mainly used by ExactConceptParser)
:param index:
:param value:
:return:
"""
assert value is None or isinstance(value, str) # default properties will have to be evaluated
prop = self.metadata.props[index]
self.metadata.props[index] = (prop[0], value)
return self
@property
def name(self):
return self.metadata.name
@property
def id(self):
return self.metadata.id
@property
def key(self):
return self.metadata.key
def init_key(self, tokens=None):
"""
Create the key for this concept.
Must be called only when the concept if fully initialized
The method is not called set_key to make sure that no other class set the key by mistake
:param tokens:
:return:
"""
if self.metadata.key is not None:
return self
if tokens is None:
tokens = list(Tokenizer(self.metadata.name))
variables = [p[0] for p in self.metadata.props] if len(core.utils.strip_tokens(tokens, True)) > 1 else []
key = ""
first = True
for token in tokens:
if token.type == TokenKind.EOF:
break
if token.type == TokenKind.WHITESPACE:
continue
if not first:
key += " " # spaces are normalized
if token.value in variables:
key += VARIABLE_PREFIX + str(variables.index(token.value))
else:
key += token.value[1:-1] if token.type == TokenKind.STRING else token.value
first = False
self.metadata.key = key
return self
@property
def body(self):
return self.values[ConceptParts.BODY] if ConceptParts.BODY in self.values else None
def add_codes(self, codes):
"""
Gets the ASTs for 'where', 'pre', 'post' and 'body'
There ASTs are know when the concept is freshly parsed.
So the values are kept in cache.
For concepts loaded from sdp, these ASTs must be created again
TODO : Seems to be a service method. Can be put somewhere else
:param codes:
:return:
"""
if codes is None:
return
for key in codes:
self.compiled[key] = codes[key]
return self
def get_digest(self):
"""
Returns the digest of the event
:return: hexa form of the sha256
"""
return hashlib.sha256(f"Concept:{self.to_dict(PROPERTIES_FOR_DIGEST)}".encode("utf-8")).hexdigest()
def to_dict(self, props_to_use=None):
"""
Returns a dict representing 'self'
:return:
"""
props_to_use = props_to_use or PROPERTIES_TO_SERIALIZE
props_as_dict = dict((prop, getattr(self.metadata, prop)) for prop in props_to_use)
return props_as_dict
def from_dict(self, as_dict):
"""
Initializes 'self' from a dict
:param as_dict:
:return:
"""
for prop in PROPERTIES_TO_SERIALIZE:
if prop in as_dict:
if prop == "props":
for name, value in as_dict[prop]:
self.def_prop(name, value)
else:
setattr(self.metadata, prop, as_dict[prop])
return self
def update_from(self, other):
"""
Update self using the properties of another concept
This method is to mimic the class to instance pattern
'other' is the class, the template, and 'self' is a new instance
:param other:
:return:
"""
if other is None:
return self
if id(other) == id(self):
return self
# update metadata
self.from_dict(other.to_dict())
# update values
for k, v in other.values.items():
self.values[k] = v
# update properties
for k, v in other.props.items():
self.set_prop(k, v.value)
return self
def set_prop(self, prop_name: str, prop_value):
"""Directly sets a value to a property"""
self.props[prop_name] = Property(prop_name, prop_value)
return self
def get_prop(self, prop_name: str):
return self.props[prop_name].value
def set_metadata_value(self, metadata: ConceptParts, value):
"""
Set the resolved value of a metadata (not the metadata itself)
:param metadata:
:param value:
:return:
"""
self.values[metadata] = value
def get_metadata_value(self, metadata: ConceptParts):
"""
Gets the resolved value of a metadata
:param metadata:
:return:
"""
return self.values[metadata]
def auto_init(self):
"""
Sometimes (for tests purposes)
You don't need the full process of evaluation to to get the values of the concept
Directly use the values of the metadata
:return:
"""
if self.metadata.is_evaluated:
return self
for metadata in ConceptParts:
value = getattr(self.metadata, metadata.value)
if value is not None:
self.values[metadata] = value
for prop, value in self.metadata.props:
self.set_prop(prop, value)
self.metadata.is_evaluated = True
return self
class Property:
"""
Defines the variables of a concept
It as its specific class, because from experience,
property management is more complex than a key/value pair
"""
def __init__(self, name, value):
self.name = name
self.value = value
def __repr__(self):
return f"{self.name}={self.value}"
def __eq__(self, other):
if not isinstance(other, Property):
return False
return self.name == other.name and self.value == other.value
def __hash__(self):
return hash((self.name, self.value))
@dataclass()
class DoNotResolve:
"""
This class is used to that the metadata (or the prop) of the concept must not be evaluated
thru sheerka.execute
For example, if you want to set a value to the BODY that will not change when
when the concept will be evaluated,
set concept.compiled[BODY] to DoNotResolve(value)
"""
value: object
+203
View File
@@ -0,0 +1,203 @@
import logging
import time
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
from sdp.sheerkaDataProvider import Event
DEBUG_TAB_SIZE = 4
class ExecutionContext:
"""
To keep track of the execution of a request
"""
ids = {}
@staticmethod
def get_id(event_digest):
if event_digest in ExecutionContext.ids:
ExecutionContext.ids[event_digest] += 1
else:
ExecutionContext.ids[event_digest] = 0
return ExecutionContext.ids[event_digest]
def __init__(self,
who,
event: Event,
sheerka,
desc: str = None,
**kwargs):
self._parent = None
self._id = ExecutionContext.get_id(event.get_digest())
self._tab = ""
self._bag = {} # other variables
self._start = 0
self._stop = 0
self.who = who # who is asking
self.event = event # what was the (original) trigger
self.sheerka = sheerka # sheerka
self.desc = desc # human description of what is going on
self.children = []
self.preprocess = None
self.inputs = {} # what was the parameters of the execution context
self.values = {} # what was produced by the execution context
self.obj = kwargs.pop("obj", None)
self.concepts = kwargs.pop("concepts", {})
# update the other elements
for k, v in kwargs.items():
self._bag[k] = v
@property
def elapsed(self):
if self._start == 0:
return 0
return (self._stop if self._stop > 0 else time.time_ns()) - self._start
@property
def elapsed_str(self):
nano_sec = self.elapsed
dt = nano_sec / 1e6
return f"{dt} ms" if dt < 1000 else f"{dt / 1000} s"
@property
def id(self):
return self._id
def __getattr__(self, item):
if item in self._bag:
return self._bag[item]
raise AttributeError(f"'ExecutionContext' object has no attribute '{item}'")
def __enter__(self):
self._start = time.time_ns()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop = time.time_ns()
def __repr__(self):
msg = f"ExecutionContext(who={self.who}, id={self._id}"
if self.desc:
msg += f", desc='{self.desc}'"
msg += ")"
return msg
def add_preprocess(self, name, **kwargs):
preprocess = self.sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS)
preprocess.set_prop("name", name)
for k, v in kwargs.items():
preprocess.set_prop(k, v)
if not self.preprocess:
self.preprocess = set()
self.preprocess.add(preprocess)
return self
def add_inputs(self, **kwargs):
for k, v in kwargs.items():
self.inputs[k] = v
return self
def add_values(self, **kwargs):
for k, v in kwargs.items():
self.values[k] = v
return self
def get_concept(self, key):
# search in obj
if isinstance(self.obj, Concept):
if self.obj.key == key:
return self.obj
for prop in self.obj.props:
if prop == key:
value = self.obj.props[prop].value
if isinstance(value, Concept):
return value
# search in concepts
if self.concepts:
for k, c in self.concepts.items():
if k == key:
return c
return self.sheerka.get(key)
def new_concept(self, key, **kwargs):
# search in obj
if self.obj:
if self.obj.key == key:
return self.sheerka.new_from_template(self.obj, key, **kwargs)
for prop in self.obj.props:
if prop == key:
value = self.obj.props[prop].value
if isinstance(value, Concept):
return self.sheerka.new_from_template(value, key, **kwargs)
else:
return value
if self.concepts:
for k, c in self.concepts.items():
if k == key:
return self.sheerka.new_from_template(c, key, **kwargs)
return self.sheerka.new(key, **kwargs)
def push(self, who=None, desc=None, **kwargs):
who = who or self.who
_kwargs = {"obj": self.obj, "concepts": self.concepts}
_kwargs.update(self._bag)
_kwargs.update(kwargs)
new = ExecutionContext(
who,
self.event,
self.sheerka,
desc,
**_kwargs,
)
new._parent = self
new._tab = self._tab + " " * DEBUG_TAB_SIZE
new.preprocess = self.preprocess
self.children.append(new)
return new
def log_new(self, logger):
logger.debug(f"[{self._id:2}]" + self._tab + str(self))
def log(self, logger, message, who=None):
logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
def log_error(self, logger, message, who=None):
logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
def log_result(self, logger, return_values):
if not logger.isEnabledFor(logging.DEBUG):
return
if len(return_values) == 0:
logger.debug(self._tab + "No return value")
for r in return_values:
to_str = self.return_value_to_str(r)
logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str)
def to_dict(self):
from core.sheerka_transform import SheerkaTransform
st = SheerkaTransform(self.sheerka)
return st.to_dict(self)
@staticmethod
def return_value_to_str(r):
value = str(r.value)
if len(value) > 50:
value = value[:47] + "..."
to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})"
return to_str
+600
View File
@@ -0,0 +1,600 @@
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors, BuiltinUnique, \
UnknownConcept
from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW
from core.sheerka.ExecutionContext import ExecutionContext
from core.sheerka.SheerkaCreateNewConcept import SheerkaCreateNewConcept
from core.sheerka.SheerkaDump import SheerkaDump
from core.sheerka.SheerkaEvaluateConcept import SheerkaEvaluateConcept
from core.sheerka.SheerkaExecute import SheerkaExecute
from core.sheerka.SheerkaSetsManager import SheerkaSetsManager
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event
import core.utils
import core.builtin_helpers
from core.sheerka_logger import console_handler
import logging
# CONCEPT_EVALUATION_STEPS = [
# BuiltinConcepts.BEFORE_EVALUATION,
# BuiltinConcepts.EVALUATION,
# BuiltinConcepts.AFTER_EVALUATION]
CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser"
class Sheerka(Concept):
"""
Main controller for the project
"""
CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts
CONCEPTS_BY_ID_ENTRY = "Concepts_By_ID"
CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None):
self.init_logging(debug, loggers)
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
self.log.debug("Starting Sheerka.")
# cache of the most used concepts
# Note that these are only templates
# They are used as a footprint for instantiation
# Except of source when the concept is supposed to be unique
# key is the key of the concept (not the name or the id)
self.cache_by_key = {}
self.cache_by_id = {}
# cache for concept definitions,
# Primarily used for unit test that does not have access to sdp
self.concepts_definition_cache = {}
#
# cache for concepts grammars
# a grammar is a resolved BNF
self.concepts_grammars = {}
# a concept can be instantiated
# ex: File is a concept, but File('foo.txt') is an instance
# TODO: manage contexts
self.instances = []
# List of the known rules by the system
# ex: hello => say('hello')
self.rules = []
self.sdp: SheerkaDataProvider = None # SheerkaDataProvider
self.builtin_cache = {} # cache for builtin concepts
self.parsers = {} # cache for builtin parsers
self.evaluators = [] # cache for builtin evaluators
self.evaluators_prefix: str = None
self.parsers_prefix: str = None
self.skip_builtins_in_db = skip_builtins_in_db
self.execute_handler = SheerkaExecute(self)
self.create_new_concept_handler = SheerkaCreateNewConcept(self)
self.dump_handler = SheerkaDump(self)
self.sets_handler = SheerkaSetsManager(self)
self.evaluate_concept_handler = SheerkaEvaluateConcept(self)
def initialize(self, root_folder: str = None):
"""
Starting Sheerka
Loads the current configuration
Notes that when it's the first time, it also create the needed working folders
:param root_folder: root configuration folder
:return: ReturnValue(Success or Error)
"""
try:
self.sdp = SheerkaDataProvider(root_folder)
if self.sdp.first_time:
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
event = Event("Initializing Sheerka.")
self.sdp.save_event(event)
exec_context = ExecutionContext(self.key, event, self)
self.initialize_builtin_concepts()
self.initialize_builtin_parsers()
self.initialize_builtin_evaluators()
self.initialize_concepts_definitions(exec_context)
except IOError as e:
return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
return ReturnValueConcept(self, True, self)
def initialize_builtin_concepts(self):
"""
Initializes the builtin concepts
:return: None
"""
self.init_log.debug("Initializing builtin concepts")
builtins_classes = self.get_builtins_classes_as_dict()
# this all initialization of the builtins seems to be little bit complicated
# why do we need to update it from DB ?
for key in BuiltinConcepts:
concept = self if key == BuiltinConcepts.SHEERKA \
else builtins_classes[str(key)]() if str(key) in builtins_classes \
else Concept(key, True, False, key)
if key in BuiltinUnique:
concept.metadata.is_unique = True
concept.metadata.is_evaluated = True
if not concept.metadata.is_unique and str(key) in builtins_classes:
self.builtin_cache[key] = builtins_classes[str(key)]
if not self.skip_builtins_in_db:
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key)
if from_db is None:
self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.")
self.set_id_if_needed(concept, True)
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
else:
self.init_log.debug(f"Found concept '{from_db}' in db. Updating.")
concept.update_from(from_db)
self.add_in_cache(concept)
def initialize_builtin_parsers(self):
"""
Init the parsers
:return:
"""
core.utils.init_package_import("parsers")
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
for parser in core.utils.get_sub_classes("parsers", base_class):
if parser.__module__ == base_class.__module__:
continue
self.init_log.debug(f"Adding builtin parser '{parser.__name__}'")
self.parsers[core.utils.get_full_qualified_name(parser)] = parser
def initialize_builtin_evaluators(self):
"""
Init the evaluators
:return:
"""
core.utils.init_package_import("evaluators")
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
def initialize_concepts_definitions(self, execution_context):
self.init_log.debug("Initializing concepts definitions")
definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False)
if definitions is None:
self.init_log.debug("No BNF defined")
return
lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
ret_val = lexer_parser.initialize(execution_context, definitions)
if not ret_val.status:
self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body))
return
self.concepts_grammars = lexer_parser.concepts_grammars
def reset_cache(self, filter_to_use=None):
"""
reset the different cache that exists
:param filter_to_use:
:return:
"""
if filter_to_use is None:
self.cache_by_key = {}
self.cache_by_id = {}
else:
raise NotImplementedError()
return self
def evaluate_user_input(self, text: str, user_name="kodjo"):
"""
Note to KSI: If you try to add execution context to this function,
You may end in an infinite loop
:param text:
:param user_name:
:return:
"""
self.log.debug(f"Processing user input '{text}', {user_name=}.")
event = Event(text, user_name)
evt_digest = self.sdp.save_event(event)
self.log.debug(f"{evt_digest=}")
with ExecutionContext(self.key, event, self, f"Evaluating '{text}'") as execution_context:
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED))
steps = [
BuiltinConcepts.BEFORE_PARSING,
BuiltinConcepts.PARSING,
BuiltinConcepts.AFTER_PARSING,
BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION
]
ret = self.execute(execution_context, [user_input, reduce_requested], steps)
execution_context.add_values(return_values=ret)
if not self.skip_builtins_in_db:
self.sdp.save_result(execution_context)
return ret
def execute(self, execution_context, return_values, execution_steps, logger=None):
"""
Executes process for all initial contexts
:param execution_context:
:param return_values:
:param execution_steps:
:param logger: logger to use (if not directly called by sheerka)
:return:
"""
return self.execute_handler.execute(execution_context, return_values, execution_steps, logger)
def set_id_if_needed(self, obj: Concept, is_builtin: bool):
"""
Set the key for the concept if needed
For test purpose only !!!!!
:param obj:
:param is_builtin:
:return:
"""
if obj.metadata.id is not None:
return
entry = self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS
obj.metadata.id = self.sdp.get_next_key(entry)
self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.")
def create_new_concept(self, context, concept: Concept, logger=None):
"""
Adds a new concept to the system
:param context:
:param concept: DefConceptNode
:param logger
:return: digest of the new concept
"""
return self.create_new_concept_handler.create_new_concept(context, concept, logger)
def add_concept_to_set(self, context, concept, concept_set, logger=None):
"""
Add an entry in sdp to tell that concept isa concept_set
:param context:
:param concept:
:param concept_set:
:param logger:
:return:
"""
return self.sets_handler.add_concept_to_set(context, concept, concept_set, logger)
def get_set_elements(self, concept):
"""
Concept is supposed to be a set
Returns all elements if the set
:param concept:
:return:
"""
return self.sets_handler.get_set_elements(concept)
def evaluate_concept(self, context, concept: Concept, logger=None):
"""
Evaluation a concept
It means that if the where clause is True, will evaluate the body
:param context:
:param concept:
:param logger:
:return: value of the evaluation or error
"""
return self.evaluate_concept_handler.evaluate_concept(context, concept, logger)
def add_in_cache(self, concept: Concept):
"""
Adds a concept template in cache.
The cache is used as a proxy before looking at sdp
:param concept:
:return:
"""
# sanity check
if concept.key is None:
concept.init_key()
if concept.key is None:
raise KeyError()
self.cache_by_key[concept.key] = concept
if concept.id:
self.cache_by_id[concept.id] = concept
return concept
def get(self, concept_key, concept_id=None):
"""
Tries to find a concept
What is return must be used a template for another concept.
You must not modify the returned concept
:param concept_key: key of the concept
:param concept_id: when multiple concepts with the same key, use the id
:return:
"""
if concept_key is None:
return ErrorConcept("Concept key is undefined.")
if isinstance(concept_key, BuiltinConcepts):
concept_key = str(concept_key)
# first search in cache
result = self.cache_by_key[concept_key] if concept_key in self.cache_by_key else \
self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
if result and (concept_id is None or not isinstance(result, list)):
return result
if isinstance(result, list):
if concept_id:
for c in result:
if c.id == concept_id:
return c
else:
return result
metadata = [("key", concept_key), ("id", concept_id)] if concept_id else ("key", concept_key)
return self._get_unknown(metadata)
def get_by_id(self, concept_id):
if concept_id is None:
return ErrorConcept("Concept id is undefined.")
# first search in cache
result = self.cache_by_id[concept_id] if concept_id in self.cache_by_id else \
self.sdp.get_safe(self.CONCEPTS_BY_ID_ENTRY, concept_id)
return result or self._get_unknown(('id', concept_id))
def get_concept_definition(self):
if self.concepts_definition_cache:
return self.concepts_definition_cache
self.concepts_definition_cache = self.sdp.get_safe(
self.CONCEPTS_DEFINITIONS_ENTRY,
load_origin=False) or {}
return self.concepts_definition_cache
def new(self, concept_key, **kwargs):
"""
Returns an instance of a new concept
When the concept is supposed to be unique, returns the same instance
:param concept_key:
:param kwargs:
:return:
"""
if isinstance(concept_key, tuple):
concept_key, concept_id = concept_key[0], concept_key[1]
else:
concept_id = None
template = self.get(concept_key, concept_id)
# manage concept not found
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
return template
if isinstance(template, list):
# if template is a list, it means that there a multiple concepts under the same key
concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template]
return concepts
else:
return self.new_from_template(template, concept_key, **kwargs)
def new_from_template(self, template, key, **kwargs):
# manage singleton
if template.metadata.is_unique:
return template
# otherwise, create another instance
concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept()
concept.update_from(template)
if len(kwargs) == 0:
return concept
# update the properties, values, attributes
# Not quite sure that this is the correct process order
for k, v in kwargs.items():
if k in concept.props:
concept.set_prop(k, v)
elif k in PROPERTIES_FOR_NEW:
concept.values[ConceptParts(k)] = v
elif hasattr(concept, k):
setattr(concept, k, v)
else:
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
# TODO : add the concept to the list of known concepts (self.instances)
concept.metadata.is_evaluated = True
return concept
def ret(self, who: str, status: bool, value, message=None, parents=None):
"""
Creates and returns a ReturnValue concept
:param who:
:param status:
:param value:
:param message:
:param parents:
:return:
"""
return self.new(
BuiltinConcepts.RETURN_VALUE,
who=who,
status=status,
value=value,
message=message,
parents=parents)
def value(self, obj, reduce_simple_list=False):
if obj is None:
return None
if hasattr(obj, "get_value"):
return obj.get_value()
if not isinstance(obj, Concept):
return obj
if obj.body is None:
return obj
if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1:
body_to_use = obj.body[0]
else:
body_to_use = obj.body
return self.value(body_to_use)
def get_values(self, objs):
if not (isinstance(objs, list) or
self.isinstance(objs, BuiltinConcepts.LIST) or
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
objs = [objs]
return (self.value(obj) for obj in objs)
def is_success(self, obj):
if isinstance(obj, bool): # quick win
return obj
if isinstance(obj, ReturnValueConcept):
return obj.status
if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors:
return False
return obj
def is_known(self, obj):
if not isinstance(obj, Concept):
return True
return obj.key != str(BuiltinConcepts.UNKNOWN_CONCEPT)
def isinstance(self, a, b):
"""
return true if the concept a is an instance of the concept b
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
return a.key == b_key
def isa(self, a, b):
return self.sets_handler.isa(a, b)
def isagroup(self, concept):
return self.sets_handler.isagroup(concept)
def get_evaluator_name(self, name):
if self.evaluators_prefix is None:
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
self.evaluators_prefix = base_evaluator_class.PREFIX
return self.evaluators_prefix + name
def get_parser_name(self, name):
if self.parsers_prefix is None:
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
self.parsers_prefix = base_parser_class.PREFIX
return self.parsers_prefix + name
def concepts(self):
res = []
lst = self.sdp.list(self.CONCEPTS_ENTRY)
for item in lst:
if isinstance(item, list):
res.extend(item)
else:
res.append(item)
return sorted(res, key=lambda i: int(i.id))
def test(self):
return f"I have access to Sheerka !"
def test_error(self):
raise Exception("I can raise an error")
@staticmethod
def _get_unknown(metadata):
"""
Returns the concept 'UnknownConcept' for a requested id or key
Note that I don't call the new() method to prevent cyclic call
:param metadata:
:return:
"""
# metadata is a list of tuple that contains the known metadata for this concept
# ex : (key, 'not_found)
# or
# (id, invalid_id)
#
# the metadata can be a list, if several attributes where given
# (key, 'not_found), (id, invalid_id)
unknown_concept = UnknownConcept()
unknown_concept.set_metadata_value(ConceptParts.BODY, metadata)
for meta in (metadata if isinstance(metadata, list) else [metadata]):
unknown_concept.set_prop(meta[0], meta[1])
unknown_concept.metadata.is_evaluated = True
return unknown_concept
@staticmethod
def get_builtins_classes_as_dict():
res = {}
for c in core.utils.get_classes("core.builtin_concepts"):
if issubclass(c, Concept) and c != Concept:
res[c().metadata.key] = c
return res
@staticmethod
def init_logging(debug, loggers):
core.sheerka_logger.set_enabled(loggers)
if debug:
# log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
log_format = "%(asctime)s [%(levelname)s] %(message)s"
log_level = logging.DEBUG
else:
log_format = "%(message)s"
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
@@ -0,0 +1,99 @@
from core.builtin_concepts import BuiltinConcepts, ErrorConcept
from core.concept import Concept
from sdp.sheerkaDataProvider import SheerkaDataProviderDuplicateKeyError
CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser"
class SheerkaCreateNewConcept:
"""
Manage the creation of a new concept
"""
def __init__(self, sheerka):
self.sheerka = sheerka
self.logger_name = self.create_new_concept.__name__
def create_new_concept(self, context, concept: Concept, logger=None):
"""
Adds a new concept to the system
:param context:
:param concept: DefConceptNode
:param logger
:return: digest of the new concept
"""
logger = logger or self.sheerka.log
concept.init_key()
concepts_definitions = None
init_ret_value = None
# checks for duplicate concepts
# TODO checks if it exists in cache first
if self.sheerka.sdp.exists(self.sheerka.CONCEPTS_ENTRY, concept.key, concept.get_digest()):
error = SheerkaDataProviderDuplicateKeyError(self.sheerka.CONCEPTS_ENTRY + "." + concept.key, concept)
return self.sheerka.ret(
self.logger_name,
False,
self.sheerka.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept),
error.args[0])
# set id before saving in db
self.sheerka.set_id_if_needed(concept, False)
# add the BNF if known
if concept.bnf:
concepts_definitions = self.sheerka.get_concept_definition()
concepts_definitions[concept] = concept.bnf
# check if it's a valid BNF or whether it breaks the known rules
concept_lexer_parser = self.sheerka.parsers[CONCEPT_LEXER_PARSER_CLASS]()
with context.push(self.sheerka.name, desc=f"Initializing concept definition for {concept}") as sub_context:
sub_context.concepts[concept.key] = concept # the concept is not in the real cache yet
sub_context.log_new(logger)
init_ret_value = concept_lexer_parser.initialize(sub_context, concepts_definitions)
sub_context.add_values(return_values=init_ret_value)
if not init_ret_value.status:
return self.sheerka.ret(self.logger_name, False, ErrorConcept(init_ret_value.value))
# save the new concept in sdp
try:
# TODO : needs to make these calls atomic (or at least one single call)
self.sheerka.sdp.add(
context.event.get_digest(),
self.sheerka.CONCEPTS_ENTRY,
concept,
use_ref=True)
self.sheerka.sdp.add(
context.event.get_digest(),
self.sheerka.CONCEPTS_BY_ID_ENTRY,
{concept.id: concept.get_digest()},
is_ref=True)
if concepts_definitions is not None:
self.sheerka.sdp.set(
context.event.get_digest(),
self.sheerka.CONCEPTS_DEFINITIONS_ENTRY,
concepts_definitions,
use_ref=True)
except SheerkaDataProviderDuplicateKeyError as error:
context.log_error(logger, "Failed to create a new concept.", who=self.logger_name)
return self.sheerka.ret(
self.logger_name,
False,
self.sheerka.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept),
error.args[0])
# Updates the caches
self.sheerka.cache_by_key[concept.key] = self.sheerka.sdp.get_safe(self.sheerka.CONCEPTS_ENTRY, concept.key)
self.sheerka.cache_by_id[concept.id] = concept
if init_ret_value is not None and init_ret_value.status:
self.sheerka.concepts_grammars = init_ret_value.body
# process the return in needed
ret = self.sheerka.ret(self.logger_name, True, self.sheerka.new(BuiltinConcepts.NEW_CONCEPT, body=concept))
return ret
+44
View File
@@ -0,0 +1,44 @@
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
class SheerkaDump:
def __init__(self, sheerka):
self.sheerka = sheerka
def dump_concepts(self):
lst = self.sheerkasdp.list(self.sheerkaCONCEPTS_ENTRY)
for item in lst:
if hasattr(item, "__iter__"):
for i in item:
self.sheerkalog.info(i)
else:
self.sheerkalog.info(item)
def dump_definitions(self):
defs = self.sheerkasdp.get(self.sheerkaCONCEPTS_DEFINITIONS_ENTRY)
self.sheerkalog.info(defs)
def dump_desc(self, *concept_names):
first = True
for concept_name in concept_names:
if isinstance(concept_name, Concept):
concepts = concept_name
else:
concepts = self.sheerkaget(concept_name)
if self.sheerkaisinstance(concepts, BuiltinConcepts.UNKNOWN_CONCEPT):
self.sheerkalog.error(f"Concept '{concept_name}' is unknown")
return False
if not hasattr(concepts, "__iter__"):
concepts = [concepts]
for c in concepts:
if not first:
self.sheerkalog.info("")
self.sheerkalog.info(f"name : {c.name}")
self.sheerkalog.info(f"bnf : {c.metadata.definition}")
self.sheerkalog.info(f"key : {c.key}")
self.sheerkalog.info(f"body : {c.body}")
self.sheerkalog.info(f"digest : {c.get_digest()}")
first = False
+195
View File
@@ -0,0 +1,195 @@
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept, DoNotResolve, ConceptParts
import core.builtin_helpers
CONCEPT_EVALUATION_STEPS = [
BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION]
class SheerkaEvaluateConcept:
def __init__(self, sheerka):
self.sheerka = sheerka
self.logger_name = self.evaluate_concept.__name__
def initialize_concept_asts(self, context, concept: Concept, logger=None):
"""
Updates the codes of the newly created concept
Basically, it runs the parsers on all parts
:param concept:
:param context:
:param logger:
:return:
"""
steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
for part_key in ConceptParts:
if part_key in concept.compiled:
continue
source = getattr(concept.metadata, part_key.value)
if source is None or not isinstance(source, str):
continue
if source.strip() == "":
concept.compiled[part_key] = DoNotResolve(source)
else:
with context.push(desc=f"Initializing compiled for {part_key}") as sub_context:
sub_context.log_new(logger)
sub_context.add_inputs(source=source)
to_parse = self.sheerka.ret(context.who, True,
self.sheerka.new(BuiltinConcepts.USER_INPUT, body=source))
res = self.sheerka.execute(sub_context, to_parse, steps, logger)
concept.compiled[part_key] = res
sub_context.add_values(return_values=res)
for prop, default_value in concept.metadata.props:
if prop in concept.compiled:
continue
if default_value is None or not isinstance(default_value, str):
continue
if default_value.strip() == "":
concept.compiled[prop] = DoNotResolve(default_value)
else:
with context.push(desc=f"Initializing AST for property {prop}") as sub_context:
sub_context.log_new(logger)
sub_context.add_inputs(source=default_value)
to_parse = self.sheerka.ret(context.who, True,
self.sheerka.new(BuiltinConcepts.USER_INPUT, body=default_value))
res = self.sheerka.execute(context, to_parse, steps)
concept.compiled[prop] = res
sub_context.add_values(return_values=res)
# Updates the cache of concepts when possible
if concept.key in self.sheerka.cache_by_key:
entry = self.sheerka.cache_by_key[concept.key]
if isinstance(entry, list):
# TODO : manage when there are multiple entries
pass
else:
self.sheerka.cache_by_key[concept.key].compiled = concept.compiled
def resolve(self, context, to_resolve, current_prop, current_concept, logger):
if isinstance(to_resolve, DoNotResolve):
return to_resolve.value
desc = f"Evaluating {current_prop} (concept={current_concept})"
context.log(logger, desc, self.logger_name)
with context.push(desc=desc, obj=current_concept) as sub_context:
sub_context.log_new(logger)
# when it's a concept, evaluate it
if isinstance(to_resolve, Concept) and \
not context.sheerka.isinstance(to_resolve, BuiltinConcepts.RETURN_VALUE):
evaluated = self.evaluate_concept(sub_context, to_resolve, logger)
sub_context.add_values(return_values=evaluated)
if evaluated.key == to_resolve.key:
return evaluated
else:
error = evaluated
# otherwise, execute all return values to find out what is the value
else:
r = self.sheerka.execute(sub_context, to_resolve, CONCEPT_EVALUATION_STEPS, logger)
one_r = core.builtin_helpers.expect_one(context, r)
sub_context.add_values(return_values=one_r)
if one_r.status:
return one_r.value
else:
error = one_r.value
return self.sheerka.new(BuiltinConcepts.CONCEPT_EVAL_ERROR,
body=error,
concept=current_concept,
property_name=current_prop)
def resolve_list(self, context, list_to_resolve, current_prop, current_concept, logger):
"""When dealing with a list, there are two possibilities"""
# It may be a list of ReturnValueConcept to execute (always the case for metadata)
# or a list of single values (may be the case for properties)
# in this latter case, all values are to be processed one by one and a list should be returned
if len(list_to_resolve) == 0:
return []
if self.sheerka.isinstance(list_to_resolve[0], BuiltinConcepts.RETURN_VALUE):
return self.resolve(context, list_to_resolve, current_prop, current_concept, logger)
res = []
for to_resolve in list_to_resolve:
# sanity check
if self.sheerka.isinstance(to_resolve, BuiltinConcepts.RETURN_VALUE):
return self.sheerka.new(BuiltinConcepts.CONCEPT_EVAL_ERROR,
body="Mix between real values and return values",
concept=current_concept,
property_name=current_prop)
r = self.resolve(context, to_resolve, current_prop, current_concept, logger)
if self.sheerka.isinstance(r, BuiltinConcepts.CONCEPT_EVAL_ERROR):
return r
res.append(r)
return res
def evaluate_concept(self, context, concept: Concept, logger=None):
"""
Evaluation a concept
It means that if the where clause is True, will evaluate the body
:param context:
:param concept:
:param logger:
:return: value of the evaluation or error
"""
logger = logger or self.sheerka.log
if concept.metadata.is_evaluated:
return concept
# WHERE condition should already be validated by the parser.
# It's a mandatory condition for the concept before it can be recognized
#
# TODO : Validate the PRE condition
#
self.initialize_concept_asts(context, concept, logger)
# to make sure of the order, it don't use ConceptParts.get_parts()
# props must be evaluated first
all_metadata_to_eval = ["props", "where", "pre", "post", "body"]
for metadata_to_eval in all_metadata_to_eval:
if metadata_to_eval == "props":
for prop_name in (p for p in concept.props if p in concept.compiled):
prop_ast = concept.compiled[prop_name]
if isinstance(prop_ast, list):
# Do not send the current concept for the properties
resolved = self.resolve_list(context, prop_ast, prop_name, None, logger)
else:
# Do not send the current concept for the properties
resolved = self.resolve(context, prop_ast, prop_name, None, logger)
if context.sheerka.isinstance(resolved, BuiltinConcepts.CONCEPT_EVAL_ERROR):
resolved.set_prop("concept", concept) # since current concept was not sent
return resolved
else:
concept.set_prop(prop_name, resolved)
else:
part_key = ConceptParts(metadata_to_eval)
if part_key in concept.compiled and concept.compiled[part_key] is not None:
metadata_ast = concept.compiled[part_key]
resolved = self.resolve(context, metadata_ast, part_key, concept, logger)
if context.sheerka.isinstance(resolved, BuiltinConcepts.CONCEPT_EVAL_ERROR):
return resolved
else:
concept.values[part_key] = resolved
#
# TODO : Validate the POST condition
#
concept.init_key() # only does it if needed
concept.metadata.is_evaluated = True
return concept
+254
View File
@@ -0,0 +1,254 @@
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
import core.utils
class SheerkaExecute:
"""
Manage the execution of a process flow
"""
def __init__(self, sheerka):
self.sheerka = sheerka
def call_parsers(self, execution_context, return_values, logger=None):
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
# first make the distinguish between what is for the parsers and what is not
result = []
to_process = []
for r in return_values:
if not r.status or not self.sheerka.isinstance(r.body, BuiltinConcepts.USER_INPUT):
result.append(r)
else:
to_process.append(r)
if not to_process:
return result
# keep track of the originals user inputs, as they need to be removed at the end
user_inputs = to_process[:]
# group the parsers by priorities
instantiated_parsers = [parser(sheerka=self.sheerka) for parser in self.sheerka.parsers.values()]
grouped_parsers = {}
for parser in [p for p in instantiated_parsers if p.enabled]:
if logger:
parser.log = logger
grouped_parsers.setdefault(parser.priority, []).append(parser)
sorted_priorities = sorted(grouped_parsers.keys(), reverse=True)
stop_processing = False
for priority in sorted_priorities:
inputs_for_this_group = to_process[:]
for parser in grouped_parsers[priority]:
return_value_success_found = False
for return_value in inputs_for_this_group:
to_parse = return_value.body.body \
if self.sheerka.isinstance(return_value.body, BuiltinConcepts.USER_INPUT) \
else return_value.body
# if self.sheerka.log.isEnabledFor(logging.DEBUG):
# debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \
# else "'" + BaseParser.get_text_from_tokens(to_parse) + "' as tokens"
# execution_context.log(logger or self.sheerka.log, f"Parsing {debug_text}")
with execution_context.push(desc=f"Parsing using {parser.name}") as sub_context:
sub_context.add_inputs(to_parse=to_parse)
res = parser.parse(sub_context, to_parse)
if res is not None:
if hasattr(res, "__iter__"):
for r in res:
if r is None:
continue
r.parents = [return_value]
result.append(r)
if self.sheerka.isinstance(r.body, BuiltinConcepts.PARSER_RESULT):
to_process.append(r)
if r.status:
return_value_success_found = True
else:
res.parents = [return_value]
result.append(res)
if self.sheerka.isinstance(res.body, BuiltinConcepts.PARSER_RESULT):
to_process.append(res)
if res.status:
return_value_success_found = True
sub_context.add_values(return_values=res)
if return_value_success_found:
stop_processing = True
break # Stop the other return_values (but not the other parsers with the same priority)
if stop_processing:
break # Do not try the other priorities if a match is found
result = core.utils.remove_list_from_list(result, user_inputs)
return result
def call_evaluators(self, execution_context, return_values, process_step, evaluation_context=None, logger=None):
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
# Evaluation context are contexts that may modify the behaviour of the execution
# For example, a concept to indicate that the value is not wanted
# Or a concept to indicate that we want the letter form of the response
# But first, they need to be transformed into return values
if evaluation_context is None:
evaluation_return_values = []
else:
evaluation_return_values = [self.sheerka.ret(execution_context.who, True, c) for c in evaluation_context]
# add the current step as part as the evaluation context
evaluation_return_values.append(self.sheerka.ret(execution_context.who, True, self.sheerka.new(process_step)))
# the pool of return values are the mix
return_values.extend(evaluation_return_values)
# group the evaluators by priority and sort them
# The first one to be applied will be the one with the highest priority
grouped_evaluators = {}
instantiated_evaluators = [e_class() for e_class in self.sheerka.evaluators]
# pre-process evaluators if needed
instantiated_evaluators = self._preprocess_evaluators(execution_context, instantiated_evaluators)
for evaluator in [e for e in instantiated_evaluators if e.enabled and process_step in e.steps]:
if logger:
evaluator.log = logger
grouped_evaluators.setdefault(evaluator.priority, []).append(evaluator)
# order the groups by priority, the higher first
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
# process
iteration = 0
while True:
with execution_context.push(desc=f"iteration #{iteration}", iteration=iteration) as iteration_context:
simple_digest = return_values[:]
iteration_context.add_inputs(return_values=simple_digest)
for priority in sorted_priorities:
original_items = return_values[:]
evaluated_items = []
to_delete = []
for evaluator in grouped_evaluators[priority]:
evaluator = self._preprocess_evaluators(execution_context, evaluator.__class__()) # fresh copy
sub_context_desc = f"Evaluating using {evaluator.name} ({priority=})"
with iteration_context.push(desc=sub_context_desc) as sub_context:
sub_context.add_inputs(return_values=original_items)
# process evaluators that work on one simple return value at the time
from evaluators.BaseEvaluator import OneReturnValueEvaluator
if isinstance(evaluator, OneReturnValueEvaluator):
debug_result = []
for item in original_items:
if evaluator.matches(sub_context, item):
result = evaluator.eval(sub_context, item)
if result is None:
debug_result.append({"input": item, "return_value": None})
continue
to_delete.append(item)
if isinstance(result, list):
evaluated_items.extend(result)
elif isinstance(result, ReturnValueConcept):
evaluated_items.append(result)
else:
error = self.sheerka.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
evaluator=evaluator)
result = self.sheerka.ret("sheerka.process", False, error, parents=[item])
evaluated_items.append(result)
debug_result.append({"input": item, "return_value": result})
else:
debug_result.append({"input": item, "return_value": "** No Match **"})
sub_context.add_values(return_values=debug_result)
# process evaluators that work on all return values
else:
if evaluator.matches(sub_context, original_items):
results = evaluator.eval(sub_context, original_items)
if results is None:
continue
if not isinstance(results, list):
results = [results]
for result in results:
evaluated_items.append(result)
to_delete.extend(result.parents)
sub_context.add_values(return_values=results)
else:
sub_context.add_values(return_values="** No Match **")
return_values = evaluated_items
return_values.extend([item for item in original_items if item not in to_delete])
iteration_context.add_values(return_values=return_values[:])
# have we done something ?
to_compare = return_values[:]
if simple_digest == to_compare:
break
# inc the iteration and continue
iteration += 1
# remove all evaluation context that are not reduced
return_values = core.utils.remove_list_from_list(return_values, evaluation_return_values)
return return_values
def execute(self, execution_context, return_values, execution_steps, logger=None):
"""
Executes process for all initial contexts
:param execution_context:
:param return_values:
:param execution_steps:
:param logger: logger to use (if not directly called by sheerka)
:return:
"""
for step in execution_steps:
copy = return_values[:] if hasattr(return_values, "__iter__") else [return_values]
with execution_context.push(step=step, iteration=0, desc=f"{step=}", return_values=copy) as sub_context:
sub_context.log(logger or self.sheerka.log, f"{step=}, context='{sub_context}'")
if step == BuiltinConcepts.PARSING:
return_values = self.call_parsers(sub_context, return_values, logger)
else:
return_values = self.call_evaluators(sub_context, return_values, step, None, logger)
if copy != return_values:
sub_context.log_result(logger or self.sheerka.log, return_values)
sub_context.add_values(return_values=return_values)
return return_values
def _preprocess_evaluators(self, context, evaluators):
if not context.preprocess:
return evaluators
if not hasattr(evaluators, "__iter__"):
single_one = True
evaluators = [evaluators]
else:
single_one = False
for preprocess in context.preprocess:
for e in evaluators:
if preprocess.props["name"].value == e.name:
for prop, value in preprocess.props.items():
if prop == "name":
continue
if hasattr(e, prop):
setattr(e, prop, value.value)
return evaluators[0] if single_one else evaluators
+83
View File
@@ -0,0 +1,83 @@
from core.builtin_concepts import BuiltinConcepts, ErrorConcept
from core.concept import Concept
GROUP_PREFIX = 'All_'
class SheerkaSetsManager:
def __init__(self, sheerka):
self.sheerka = sheerka
self.logger_name = self.add_concept_to_set.__name__
def add_concept_to_set(self, context, concept, concept_set, logger=None):
"""
Add an entry in sdp to tell that concept isa concept_set
:param context:
:param concept:
:param concept_set:
:param logger:
:return:
"""
logger = logger or self.sheerka.log
context.log(logger, f"Adding concept {concept} to set {concept_set}", who=self.logger_name)
assert concept.id
assert concept_set.id
try:
ret = self.sheerka.sdp.add_unique(context.event.get_digest(), GROUP_PREFIX + concept_set.id, concept.id)
if ret == (None, None): # concept already in set
return self.sheerka.ret(
self.logger_name,
False,
self.sheerka.new(BuiltinConcepts.CONCEPT_ALREADY_IN_SET, body=concept, concept_set=concept_set))
else:
return self.sheerka.ret(self.logger_name, True, self.sheerka.new(BuiltinConcepts.SUCCESS))
except Exception as error:
context.log_error(logger, "Failed to add to set.", who=self.logger_name)
return self.sheerka.ret(self.logger_name, False, ErrorConcept(error), error.args[0])
def get_set_elements(self, concept):
"""
Concept is supposed to be a set
Returns all elements if the set
:param concept:
:return:
"""
assert concept.id
ids = self.sheerka.sdp.get_safe(GROUP_PREFIX + concept.id)
if ids is None:
return self.sheerka.new(BuiltinConcepts.NOT_A_SET, body=concept)
elements = [self.sheerka.get_by_id(element_id) for element_id in ids]
return elements
def isa(self, a, b):
"""
return true if the concept a is a b
Will handle when the keyword isa will be implemented
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
assert isinstance(a, Concept)
assert isinstance(b, Concept)
# TODO, first check the 'isa' property of a
return self.sheerka.sdp.exists(GROUP_PREFIX + b.id, a.id)
def isagroup(self, concept):
"""True if exists All_<concept_id> in sdp"""
if not concept.id:
return None
res = self.sheerka.sdp.get_safe(GROUP_PREFIX + concept.id)
return res is not None
View File
+50
View File
@@ -0,0 +1,50 @@
import logging
import sys
enabled = []
disabled = ["init", "sdp", "parsers", "evaluators", "verbose"]
console_handler = logging.StreamHandler(sys.stdout)
all_loggers = {}
def set_enabled(to_enable):
if to_enable is None:
return
if not hasattr(to_enable, "__iter__"):
to_enable = [to_enable]
enabled.extend(to_enable)
def to_discard(logger_class):
if logger_class is None:
return False
if logger_class in enabled or logger_class.strip(".") in enabled:
return False
if logger_class not in disabled:
return False
return True
def get_logger(logger_name):
if logger_name in all_loggers:
return all_loggers[logger_name]
logger = logging.getLogger(logger_name)
all_loggers[logger_name] = logger
for d in disabled:
if logger_name.startswith(d + ".") and to_discard(d):
logger.disabled = True
for e in enabled:
if logger_name.startswith("verbose." + e):
logger.disabled = False
return logger
+161
View File
@@ -0,0 +1,161 @@
import dataclasses
from enum import Enum
from core.concept import Concept, PROPERTIES_TO_SERIALIZE
from core.sheerka.Sheerka import ExecutionContext
from core.tokenizer import Token
from evaluators.BaseEvaluator import BaseEvaluator
from parsers.BaseParser import BaseParser, Node
from parsers.BnfParser import BnfParser
from parsers.ConceptLexerParser import UnrecognizedTokensNode, ParsingExpression
from parsers.PythonParser import PythonNode
from sdp.sheerkaDataProvider import Event
OBJ_TYPE_KEY = "__type__"
OBJ_ID_KEY = "__id__"
OBJ_NAME_KEY = "__name__"
default_concept = Concept()
class SheerkaTransformType(Enum):
Concept = 1
Reference = 2
ExecutionContext = 3
Event = 4
Node = 5
Exception = 6
def __repr__(self):
return self.__class__.__name__ + "." + self.name
class SheerkaTransform:
def __init__(self, sheerka):
self.ids = {}
self.sheerka = sheerka
self.id_count = -1
def to_dict(self, obj):
if isinstance(obj, (Concept, ExecutionContext, Event)):
exists, _id = self.exist(obj)
if exists:
return {
OBJ_TYPE_KEY: SheerkaTransformType.Reference,
OBJ_ID_KEY: _id
}
else:
self.id_count = self.id_count + 1
self.ids[obj] = self.id_count
if isinstance(obj, Concept):
return self.concept_to_dict(obj)
elif isinstance(obj, ExecutionContext):
return self.execution_context_to_dict(obj)
elif isinstance(obj, Event):
return {
OBJ_TYPE_KEY: SheerkaTransformType.Event,
OBJ_ID_KEY: self.id_count,
'digest': obj.get_digest()}
elif isinstance(obj, (BaseParser, BaseEvaluator, BnfParser)):
return obj.name
elif isinstance(obj, Token):
return obj.__dict__
elif isinstance(obj, PythonNode):
return {
OBJ_TYPE_KEY: SheerkaTransformType.Node,
OBJ_NAME_KEY: "PythonNode",
'source': obj.source,
'ast_': obj.get_dump(obj.ast_)
}
elif isinstance(obj, Node):
to_dict = {
OBJ_TYPE_KEY: SheerkaTransformType.Node,
OBJ_NAME_KEY: obj.__class__.__name__,
}
for k, v in obj.__dict__.items():
to_dict[k] = self.to_dict(v)
return to_dict
elif isinstance(obj, Exception):
to_dict = {
OBJ_TYPE_KEY: SheerkaTransformType.Exception,
OBJ_NAME_KEY: obj.__class__.__name__,
}
for k, v in obj.__dict__.items():
to_dict[k] = self.to_dict(v)
return to_dict
elif isinstance(obj, ParsingExpression):
return obj.__repr__()
elif isinstance(obj, dict):
return dict((str(k) if isinstance(k, Concept) else k, self.to_dict(v)) for k, v in obj.items())
elif hasattr(obj, "__iter__") and not isinstance(obj, str):
return list(self.to_dict(o) for o in obj)
else:
return obj
def concept_to_dict(self, obj: Concept):
to_dict = {
OBJ_TYPE_KEY: SheerkaTransformType.Concept,
OBJ_ID_KEY: self.id_count,
}
if obj.id:
ref = self.sheerka.get(obj.key, obj.id)
to_dict["id"] = obj.id
else:
ref = default_concept
# transform metadata
for prop in PROPERTIES_TO_SERIALIZE:
value = getattr(obj.metadata, prop)
ref_value = getattr(ref.metadata, prop)
if value != ref_value:
to_dict["meta." + prop] = self.to_dict(value)
# transform value
for metadata, value in obj.values.items():
ref_value = ref.values[metadata] if metadata in ref.values else None
if value != ref_value:
to_dict[metadata.value] = self.to_dict(value)
# transform properties
for prop in obj.props:
value = obj.props[prop].value
if prop not in ref.props or value != ref.props[prop].value:
if "props" not in to_dict:
to_dict["props"] = []
to_dict["props"].append((prop, self.to_dict(value)))
return to_dict
def execution_context_to_dict(self, obj: ExecutionContext):
to_dict = {
OBJ_TYPE_KEY: SheerkaTransformType.ExecutionContext,
OBJ_ID_KEY: self.id_count
}
for property_name in obj.__dict__:
if property_name == "sheerka":
continue
to_dict[property_name] = self.to_dict(getattr(obj, property_name))
return to_dict
def exist(self, obj):
for k, v in self.ids.items():
if id(k) == id(obj) or k == obj:
return True, v
return False, None
+411
View File
@@ -0,0 +1,411 @@
from dataclasses import dataclass
from enum import Enum
class TokenKind(Enum):
EOF = "eof"
WHITESPACE = "whitespace"
NEWLINE = "newline"
KEYWORD = "keyword"
IDENTIFIER = "identifier"
CONCEPT = "concept"
STRING = "string"
NUMBER = "number"
TRUE = "true"
FALSE = "false"
LPAR = "lpar"
RPAR = "rpar"
LBRACKET = "lbrace"
RBRACKET = "rbracket"
LBRACE = "lbrace"
RBRACE = "rbrace"
PLUS = "plus"
MINUS = "minus"
STAR = "star"
SLASH = "slash"
PERCENT = "percent"
COMMA = "comma"
SEMICOLON = "semicolon"
COLON = "colon"
DOT = "dot"
QMARK = "qmark"
VBAR = "vbar"
AMPER = "amper"
EQUALS = "="
AT = "at"
BACK_QUOTE = "bquote" # `
BACK_SLASH = "bslash" # \
CARAT = "carat" # ^
DOLLAR = "dollar" # $
EURO = "dollar" # €
STERLING = "steling" # £
EMARK = "emark" # !
GREATER = "greater" # >
LESS = "less" # <
HASH = "HASH" # #
TILDE = "tilde" # ~
UNDERSCORE = "underscore" # _
DEGREE = "degree" # °
@dataclass()
class Token:
type: TokenKind
value: object
index: int
line: int
column: int
def __repr__(self):
if self.type == TokenKind.IDENTIFIER:
value = str(self.value)
elif self.type == TokenKind.WHITESPACE:
value = "<ws>"
elif self.type == TokenKind.NEWLINE:
value = r"\n"
elif self.type == TokenKind.EOF:
value = "<EOF>"
else:
value = self.value
return f"Token({value})"
@dataclass()
class LexerError(Exception):
message: str
text: str
index: int
line: int
column: int
class Keywords(Enum):
DEF = "def"
CONCEPT = "concept"
FROM = "from"
BNF = "bnf"
AS = "as"
WHERE = "where"
PRE = "pre"
POST = "post"
ISA = "isa"
class Tokenizer:
"""
Class that can iterate on the tokens
"""
KEYWORDS = set(x.value for x in Keywords)
def __init__(self, text):
self.text = text
self.text_len = len(text)
self.column = 1
self.line = 1
self.i = 0
def __iter__(self):
while self.i < self.text_len:
c = self.text[self.i]
if c == "+":
if self.i + 1 < self.text_len and self.text[self.i + 1].isdigit():
number = self.eat_number(self.i)
yield Token(TokenKind.NUMBER, number, self.i, self.line, self.column)
self.i += len(number)
self.column += len(number)
else:
yield Token(TokenKind.PLUS, "+", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "-":
if self.i + 1 < self.text_len and self.text[self.i + 1].isdigit():
number = self.eat_number(self.i)
yield Token(TokenKind.NUMBER, number, self.i, self.line, self.column)
self.i += len(number)
self.column += len(number)
else:
yield Token(TokenKind.MINUS, "-", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "_":
if self.i + 1 < self.text_len and self.text[self.i + 1].isalpha():
identifier = self.eat_identifier(self.i)
token_type = TokenKind.KEYWORD if identifier in self.KEYWORDS else TokenKind.IDENTIFIER
value = Keywords(identifier) if identifier in self.KEYWORDS else identifier
yield Token(token_type, value, self.i, self.line, self.column)
self.i += len(identifier)
self.column += len(identifier)
else:
yield Token(TokenKind.UNDERSCORE, "_", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "/":
yield Token(TokenKind.SLASH, "/", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "*":
yield Token(TokenKind.STAR, "*", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "{":
yield Token(TokenKind.LBRACE, "{", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "}":
yield Token(TokenKind.RBRACE, "}", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "(":
yield Token(TokenKind.LPAR, "(", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == ")":
yield Token(TokenKind.RPAR, ")", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "[":
yield Token(TokenKind.LBRACKET, "[", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "]":
yield Token(TokenKind.RBRACKET, "]", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "=":
yield Token(TokenKind.EQUALS, "=", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == " " or c == "\t":
whitespace = self.eat_whitespace(self.i)
yield Token(TokenKind.WHITESPACE, whitespace, self.i, self.line, self.column)
self.i += len(whitespace)
self.column += len(whitespace)
elif c == ",":
yield Token(TokenKind.COMMA, ",", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == ".":
yield Token(TokenKind.DOT, ".", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == ";":
yield Token(TokenKind.SEMICOLON, ";", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == ":":
yield Token(TokenKind.COLON, ":", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "?":
yield Token(TokenKind.QMARK, "?", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "|":
yield Token(TokenKind.VBAR, "|", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "&":
yield Token(TokenKind.AMPER, "&", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "<":
yield Token(TokenKind.LESS, "<", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == ">":
yield Token(TokenKind.GREATER, ">", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "!":
yield Token(TokenKind.EMARK, "!", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "`":
yield Token(TokenKind.BACK_QUOTE, "`", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "\\":
yield Token(TokenKind.BACK_SLASH, "\\", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "^":
yield Token(TokenKind.CARAT, "^", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "$":
yield Token(TokenKind.DOLLAR, "$", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "":
yield Token(TokenKind.EURO, "", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "£":
yield Token(TokenKind.STERLING, "£", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "#":
yield Token(TokenKind.HASH, "#", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "°":
yield Token(TokenKind.DEGREE, "°", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "~":
yield Token(TokenKind.TILDE, "~", self.i, self.line, self.column)
self.i += 1
self.column += 1
elif c == "\n" or c == "\r":
newline = self.eat_newline(self.i)
yield Token(TokenKind.NEWLINE, newline, self.i, self.line, self.column)
self.i += len(newline)
self.column = 1
self.line += 1
elif c == "c" and self.i + 1 < self.text_len and self.text[self.i + 1] == ":":
concept_name = self.eat_concept_name(self.i + 2, self.line, self.column)
yield Token(TokenKind.CONCEPT, concept_name, self.i, self.line, self.column)
self.i += len(concept_name) + 3
self.column += len(concept_name) + 3
elif c.isalpha() or c == "_":
identifier = self.eat_identifier(self.i)
token_type = TokenKind.KEYWORD if identifier in self.KEYWORDS else TokenKind.IDENTIFIER
value = Keywords(identifier) if identifier in self.KEYWORDS else identifier
yield Token(token_type, value, self.i, self.line, self.column)
self.i += len(identifier)
self.column += len(identifier)
elif c.isdigit():
number = self.eat_number(self.i)
yield Token(TokenKind.NUMBER, number, self.i, self.line, self.column)
self.i += len(number)
self.column += len(number)
elif c == "'" or c == '"':
string, newlines = self.eat_string(self.i, self.line, self.column)
yield Token(TokenKind.STRING, string, self.i, self.line, self.column) # quotes are kept
self.i += len(string)
self.column = 1 if newlines > 0 else self.column + len(string)
self.line += newlines
elif c == "_":
yield Token(TokenKind.UNDERSCORE, "_", self.i, self.line, self.column)
self.i += 1
self.column += 1
else:
raise LexerError(f"Unknown token '{c}'", self.text, self.i, self.line, self.column)
yield Token(TokenKind.EOF, "", self.i, self.line, self.column)
def eat_concept_name(self, start, line, column):
result = ""
i = start
end_colon_found = False
while i < self.text_len:
c = self.text[i]
if c == "\n":
raise LexerError(f"New line is forbidden in concept name", result, i, line, column + 2 + len(result))
if c == ":":
end_colon_found = True
break
result += c
i += 1
if not end_colon_found:
raise LexerError(f"Missing ending colon", result, i, line, column + 2 + len(result))
if result == "":
raise LexerError(f"Concept name not found", result, start, line, column + 2 + len(result))
return result
def eat_whitespace(self, start):
result = self.text[start]
i = start + 1
while i < self.text_len:
c = self.text[i]
if c == " " or c == "\t":
result += c
i += 1
else:
break
return result
def eat_newline(self, start):
if start + 1 == self.text_len:
return self.text[start]
current = self.text[start]
next = self.text[start + 1]
if current == "\n" and next == "\r" or current == "\r" and next == "\n":
return current + next
return current
def eat_identifier(self, start):
result = self.text[start]
i = start + 1
while i < self.text_len:
c = self.text[i]
if c.isalpha() or c == "_" or c == "-" or c.isdigit():
result += c
i += 1
else:
break
return result
def eat_number(self, start):
result = self.text[start]
i = start + 1
while i < self.text_len:
c = self.text[i]
if c.isdigit() or c == ".":
result += c
i += 1
else:
break
return result
def eat_string(self, start_index, start_line, start_column):
quote = self.text[start_index]
result = self.text[start_index]
lines_count = 0
i = start_index + 1
escape = False
newline = None
while i < self.text_len:
c = self.text[i]
result += c
i += 1
if newline:
lines_count += 1
newline = c if c == newline else None
else:
if c == "\r" or c == "\n":
newline = c
if c == "\\":
escape = True
elif c == quote and not escape:
break
else:
escape = False
# add trailing new line if needed
if newline:
lines_count += 1
if result[-1] != quote:
raise LexerError("Missing Trailing quote", result, i, start_line + lines_count,
1 if lines_count > 0 else start_column + len(result))
return result, lines_count
+238
View File
@@ -0,0 +1,238 @@
import importlib
import inspect
import pkgutil
from core.tokenizer import TokenKind
def sysarg_to_string(argv):
"""
Transform a list of strings into a single string
Add quotes if needed
:return:
"""
if argv is None or not argv:
return ""
result = ""
first = True
for s in argv:
if not first:
result += " "
result += '"' + s + '"' if " " in s else s
first = False
if result[0] in ('"', "'"):
result = result[1:-1] # strip quotes
return result
def get_class(qname):
"""
Loads a class from its full qualified name
:param qname:
:return:
"""
parts = qname.split('.')
module = ".".join(parts[:-1])
m = __import__(module)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def get_module(qname):
"""
Loads a module from its full qualified name
:param qname:
:return:
"""
parts = qname.split('.')
m = __import__(qname)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def new_object(kls, *args, **kwargs):
"""
New instance of an object
:param kls:
:param args:
:param kwargs:
:return:
"""
obj_type = get_class(kls)
return obj_type(*args, **kwargs)
def get_full_qualified_name(obj):
"""
Returns the full qualified name of a class (including its module name )
:param obj:
:return:
"""
if obj.__class__ == type:
module = obj.__module__
if module is None or module == str.__class__.__module__:
return obj.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__name__
else:
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__ # Avoid reporting __builtin__
else:
return module + '.' + obj.__class__.__name__
def get_classes(module_name):
"""
Gets all classes, for a given module_name
:param module_name: name of the module
:return:
"""
mod = get_module(module_name)
for name in dir(mod):
obj = getattr(mod, name)
if inspect.isclass(obj):
yield obj
def get_classes_from_package(package_name):
"""
Gets all classes, for a given package
:param package_name: name of the package
:return:
"""
pkg = __import__(package_name)
prefix = pkg.__name__ + "."
for importer, modname, ispkg in pkgutil.iter_modules(pkg.__path__, prefix):
for c in get_classes(modname):
yield c
def init_package_import(package_name):
pkg = __import__(package_name)
prefix = pkg.__name__ + "."
for (module_loader, name, ispkg) in pkgutil.iter_modules(pkg.__path__, prefix):
importlib.import_module(name)
def get_sub_classes(package_name, base_class):
base_class = get_class(base_class) if isinstance(base_class, str) else base_class
all_class = set(base_class.__subclasses__()).union(
[s for c in base_class.__subclasses__() for s in get_sub_classes(package_name, c)])
# limit to the classes of the package
return [c for c in all_class if c.__module__.startswith(package_name)]
def remove_from_list(lst, to_remove_predicate):
"""
Removes elements from a list if they exist
:param lst:
:param to_remove_predicate:
:return:
"""
flagged = []
for item in lst:
if to_remove_predicate(item):
flagged.append(item)
for item in flagged:
lst.remove(item)
return lst
def remove_list_from_list(lst, to_remove):
# https://stackoverflow.com/questions/2514961/remove-all-values-within-one-list-from-another-list/30353802
# explains that list comprehension is not the best approach
for item in to_remove:
try:
lst.remove(item)
except ValueError:
pass
return lst
def product(a, b):
"""
Kind of cartesian product between lists a and b
knowing that a is also a list
So it's a cartesian product between a list of list and a list
"""
if a is None or len(a) == 0:
return b
if b is None or len(b) == 0:
return a
res = []
for item_b in b:
for item_a in a:
items = item_a + [item_b]
res.append(items)
return res
def strip_quotes(text):
if not isinstance(text, str):
return text
if text == "":
return ""
if text[0] == "'" or text[0] == '"':
return text[1:-1]
return text
def strip_tokens(tokens, strip_eof=False):
"""
Remove the starting and trailing spaces and newline
"""
if tokens is None:
return None
start = 0
length = len(tokens)
while start < length and tokens[start].type in (TokenKind.WHITESPACE, TokenKind.NEWLINE):
start += 1
if start == length:
return []
end_tokens = (TokenKind.WHITESPACE, TokenKind.NEWLINE, TokenKind.EOF) \
if strip_eof \
else (TokenKind.WHITESPACE, TokenKind.NEWLINE)
end = length - 1
while end > 0 and tokens[end].type in end_tokens:
end -= 1
return tokens[start: end + 1]
def escape_char(text, to_escape):
res = ""
for c in text:
res += ("\\" + c) if c in to_escape else c
return res
def pp(items):
if not hasattr(items, "__iter__"):
return str(items)
if len(items) == 0:
return str(items)
return " \n" + " \n".join(str(item) for item in items)