import functools from dataclasses import dataclass import core.builtin_helpers from core.ast_helpers import UnreferencedVariablesVisitor from core.builtin_concepts import ReturnValueConcept from core.builtin_concepts_ids import BuiltinConcepts from core.concept import AllConceptParts, Concept from core.global_symbols import ErrorItem, ErrorObj, NotFound, NotInit, SyaAssociativity from core.rule import Rule from core.sheerka.ExecutionContext import ExecutionContext from core.sheerka.services.SheerkaAdmin import SheerkaAdmin from core.tokenizer import Token, TokenKind from core.utils import get_inner_set, sheerka_getattr, sheerka_hasattr from core.var_ref import VariableRef TO_DISABLED = ["breakpoint", "callable", "compile", "delattr", "eval", "exec", "exit", "input", "locals", "open", "print", "quit", "setattr"] @dataclass class MethodAccessError(Exception, ErrorObj): method_name: str class ObjectContainer: """ Container for list of object (or whatever), to easily use SheerkaQueryLanguage on collection """ def __init__(self, items): self.items = items class Expando: def __init__(self, name, bag): self.__name = name for k, v in bag.items(): setattr(self, k, v) def __repr__(self): return f"{vars(self)}" def get_name(self): return self.__name def __eq__(self, other): if id(other) == id(self): return True if not isinstance(other, Expando): return False if other.get_name() != self.get_name(): return False for k, v in vars(self).items(): if getattr(other, k) != v: return False return True def __hash__(self): hash_content = [self.__name] + list(vars(self).keys()) return hash(tuple(hash_content)) class Pipe: """ https://github.com/JulienPalard/Pipe/pull/23 Represent a Pipeable Element : Described as : first = Pipe(lambda iterable: next(iter(iterable))) and used as : print [1, 2, 3] | first printing 1 Or represent a Pipeable Function : It's a function returning a Pipe Described as : select = Pipe(lambda iterable, predicate: (predicate(x) for x in iterable)) and used as : print [1, 2, 3] | select(lambda x: x * 2) # 2, 4, 6 """ def __init__(self, function): self.function = function functools.update_wrapper(self, function) def __ror__(self, other): return self.function(other) def __call__(self, *args, **kwargs): return Pipe(lambda x: self.function(x, *args, **kwargs)) def get_type(obj): if isinstance(obj, Concept): return obj.name else: return type(obj).__name__ sheerka_globals = { "Concept": Concept, "BuiltinConcepts": BuiltinConcepts, "Expando": Expando, "ExecutionContext": ExecutionContext, "SyaAssociativity": SyaAssociativity, "get_type": get_type, "hasattr": sheerka_hasattr, "getattr": sheerka_getattr, "ErrorItem": ErrorItem } # Adds all concepts that have their own class definition for c in core.utils.get_classes("core.builtin_concepts"): if issubclass(c, Concept) and c != Concept: sheerka_globals[c.__name__] = c def inject_context(context): """ function Decorator used to inject the context in methods that needed :param context: :return: """ def wrapped(func): @functools.wraps(func) def inner(*args, **kwargs): return func(context, *args, **kwargs) return inner return wrapped def resolve_object(context, who, obj): """ Try to find a concept by its obj, id or the pattern c:key|id: :param context: :param who: :param obj: :return: """ if isinstance(obj, VariableRef): return getattr(obj.obj, obj.prop) if isinstance(obj, Rule): return context.sheerka.resolve_rule(context, obj) if isinstance(obj, Concept): obj = core.builtin_helpers.ensure_evaluated(context, obj) return obj if isinstance(obj, Token) and obj.type == TokenKind.RULE: return context.sheerka.resolve_rule(context, obj) if isinstance(obj, tuple): # To make sure that there is no tuple that resembles to a concept raise Exception() if (isinstance(obj, str) and obj.startswith("c:")) or isinstance(obj, Token): concept = context.sheerka.fast_resolve(obj, force_instance=True) if concept is None: return None if hasattr(concept, "__iter__"): raise NotImplementedError("Too many concepts") concept = core.builtin_helpers.ensure_evaluated(context, concept) return concept return obj def get_sheerka_method(context, who, name, expression_only): try: method = context.sheerka.sheerka_methods[name] context.log(f"Resolving '{name}'. It's a sheerka method.", who) if expression_only and method.has_side_effect: context.log(f"...but with side effect when {expression_only=}. Discarding.", who) raise MethodAccessError(name) else: method_to_use = inject_context(context)(method.method) if name in context.sheerka.methods_with_context \ else method.method if name in context.sheerka.pipe_functions: return Pipe(method_to_use) else: return method_to_use except KeyError: return None def create_namespace(context, who, names, sheerka_names, objects, expression_only, allow_builtins=False): """ Create a namespace for the requested names :param context: :param who: who is asking :param names: requested names :param sheerka_names: requested sheerka names (ex sheerka.isinstance) :param objects: local objects that can be added :param expression_only: if true, discard method that can alter the global state :param allow_builtins: automatically add python builtins symbols :return: """ result = dict(__builtins__) if allow_builtins else {} for name in names: if name in sheerka_globals: result[name] = sheerka_globals[name] continue if expression_only and name in TO_DISABLED: result[name] = None continue if name == "in_context": result[name] = context.in_context continue # need to add it manually to avoid conflict with sheerka.isinstance if name == "isinstance": result["isinstance"] = context.sheerka.services[SheerkaAdmin.NAME].extended_isinstance continue # support reference to sheerka if name.lower() == "sheerka": bag = {} for sheerka_name in sheerka_names: if (method := get_sheerka_method(context, who, sheerka_name, expression_only)) is not None: bag[sheerka_name] = method result[name] = Expando("sheerka", bag) continue # search in short term memory if (obj := context.get_from_short_term_memory(name)) is not NotFound: context.log(f"Resolving '{name}'. Using value found in STM.", who) result[name] = obj continue # search in memory if (obj := context.sheerka.get_last_from_memory(context, name)) is not NotFound: context.log(f"Resolving '{name}'. Using value found in Long Term Memory.", who) result[name] = obj.obj continue # search in sheerka methods if (method := get_sheerka_method(context, who, name, expression_only)) is not None: result[name] = method continue # search in context.obj (to replace by short time memory ?) if context.obj: if name == "self": result["self"] = context.obj continue try: attribute = context.obj.variables()[name] if attribute != NotInit: result[name] = attribute continue context.log(f"Resolving '{name}'. It's obj attribute (obj={context.obj}).", who) except KeyError: pass # search in current node (if the name was found during the parsing) if name in objects: context.log(f"Resolving '{name}'. Using value from node.", who) obj = resolve_object(context, who, objects[name]) # at last, try to instantiate a new concept else: context.log(f"Resolving '{name}'. Instantiating new concept.", who) obj = resolve_object(context, who, f"c:{name}:") if obj is None: context.log(f"...'{name}' is not found or cannot be instantiated. Skipping.", who) continue result[name] = obj return result def get_variables_from_concept_asts(context, concept, known_variables, parameters_only=True): """ From a given concept that is already compiled, browse the compiled to see if there is any symbol that is unknown, eg variable It is used to detect all mandatory variables before concept evaluation :param context: :param concept: :param known_variables: :param parameters_only: only return concept variables that are also parameters :return: """ if not concept.get_hints().is_instance and not known_variables: return {} core.builtin_helpers.ensure_asts(context, concept) variables = {} for prop_name, prop_value in concept.get_compiled().items(): if isinstance(prop_value, Concept): prop_value_vars = get_variables_from_concept_asts(context, prop_value, known_variables, parameters_only) inner_variables = get_inner_set(prop_value_vars) if inner_variables: variables[prop_name] = inner_variables else: return_values = [prop_value] if not isinstance(prop_value, list) else prop_value unreferenced_names_visitor = UnreferencedVariablesVisitor(context) for ret_val in [r for r in return_values if isinstance(r, ReturnValueConcept)]: if isinstance(ret_val.body.body, list) and len(ret_val.body.body) != 1: continue elif isinstance(ret_val.body.body, list) and len(ret_val.body.body) == 1: body = ret_val.body.body[0] else: body = ret_val.body.body if hasattr(body, "get_python_node"): node = body.get_python_node() possibles_vars = unreferenced_names_visitor.get_names(node.ast_) variables_found = {v for v in possibles_vars if is_concept_variable(context, concept, v, prop_name, known_variables)} if variables_found: variables.setdefault(prop_name, set()).update(variables_found) elif hasattr(body, "get_concept"): sub_concept = body.get_concept() if sub_concept.name in known_variables: variables.setdefault(prop_name, set()).add(sub_concept.name) elif hasattr(body, "get_expr_node"): expr_node = body.get_expr_node() for compiled in expr_node.compiled: variables.setdefault(prop_name, set()).update(compiled.variables) if prop_name not in variables: # add an empty entry to handle cases like '__o_00__ + 1' # No variable is required, but the property must be computed variables[prop_name] = set() if parameters_only: variables = {k: v for k, v in variables.items() if k in concept.get_metadata().parameters} return variables def get_possible_variables_from_concept(context, concept): """ Given a concept, get its symbols that may be considered as variables for other concepts :param context: :param concept: :return: """ possible_variables = set(concept.get_metadata().parameters) core.builtin_helpers.ensure_bnf(context, concept) if concept.get_bnf(): from parsers.BnfNodeParser import BnfNodeConceptExpressionVisitor visitor = BnfNodeConceptExpressionVisitor() visitor.visit(concept.get_bnf()) possible_variables.update([c.name if isinstance(c, Concept) else c for c in visitor.references]) return possible_variables def is_variable(context, name): """ tells whether or not the name can be a variable :param context: :param name: :return: """ if not name.isidentifier(): return False if context.sheerka.is_a_concept_name(name): return False try: eval(name, sheerka_globals) except: return True return False def is_concept_variable(context, concept, variable, current_property, known_variables): """ Tells whether or not a symbol is unknown for the concept ie, Can the concept be evaluated without resolving this symbol ? First the variable must be a valid variable (variable name + not a concept) Plus it must not be the name of a concept parameter :param context: :param concept: :param known_variables: :param variable: :param current_property: :return: """ if variable in known_variables: # forced variable return True if not is_variable(context, variable): # not a valid identifier or may be a known concept name return False if current_property not in AllConceptParts: # variable referencing other variable return True return variable not in concept.variables()