import ast import logging from cache.Cache import Cache from core.ast_helpers import ast_to_props from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, ConceptParts, DEFINITION_TYPE_BNF, concept_part_value from core.global_symbols import NotInit, NotFound, CURRENT_OBJ from core.rule import Rule from core.utils import as_bag from parsers.BaseNodeParser import SourceCodeNode, ConceptNode, UnrecognizedTokensNode, SourceCodeWithConceptNode, \ RuleNode, VariableNode from parsers.BaseParser import ParsingError PARSE_STEPS = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING] EVAL_STEPS = PARSE_STEPS + [BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION] PARSERS = ["EmptyString", "ShortTermMemory", "Sequence", "Bnf", "Sya", "Python"] def remove_python_nodes(context, return_values): """ Try to reduce the number of return_values by removing return values with python node :param context: :param return_values: :return: """ res = [] for ret_val in return_values: value = context.sheerka.objvalue(ret_val) if not hasattr(value, "get_python_node"): res.append(ret_val) return res def is_same_success(context, return_values): """ Returns True if all returns values are successful and have the same value :param context: :param return_values: :return: True False or None (None if the concept is not evaluated) """ assert isinstance(return_values, list) def _get_value(ret_val): if not ret_val.status: raise Exception("Status is false") if isinstance(ret_val.body, Concept) and not ret_val.body.get_metadata().is_evaluated: raise Exception("Concept is not evaluated") return context.sheerka.objvalue(ret_val) try: reference = _get_value(return_values[0]) for return_value in return_values[1:]: actual = _get_value(return_value) if actual != reference: return False except Exception as ex: context.log_error(ex) return None return True def expect_one(context, return_values): """ Checks if there is at least one success return value If there is more than one, check if it's the same value :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) if len(return_values) == 1: return return_values[0] successful_results = [item for item in return_values if item.status] number_of_successful = len(successful_results) # total_items = len(return_values) # remove errors when a winner is found if number_of_successful == 1: return sheerka.ret( context.who, True, successful_results[0].body, parents=return_values) # too many winners, which one to choose ? if number_of_successful > 1: # first, try to remove python node results. # In case of conflict, the concept take precedence over the natural Python result # as it is considered as an override (overload ?) successful_results = remove_python_nodes(context, successful_results) if len(successful_results) == 1: return sheerka.ret( context.who, True, successful_results[0].body, parents=return_values) if is_same_success(context, successful_results): return sheerka.ret( context.who, True, successful_results[0].value, parents=return_values) else: if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many successful results found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results), parents=return_values) # number_of_successful == 0, only errors, i cannot help you if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many errors found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) if len(return_values) == 1: return sheerka.ret( context.who, False, return_values[0].body, parents=return_values) else: # test if only one evaluator in error from evaluators.OneErrorEvaluator import OneErrorEvaluator one_error_evaluator = OneErrorEvaluator() reduce_requested = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.REDUCE_REQUESTED)) if one_error_evaluator.matches(context, return_values + [reduce_requested]): return sheerka.ret( context.who, False, one_error_evaluator.eval(context, return_values).body, parents=return_values) else: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values), parents=return_values) def only_successful(context, return_values): """ Removes all return values that are not successful Return error when no successful return value :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) successful_results = [item for item in return_values if item.status] if len(successful_results) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values), parents=return_values) return sheerka.ret( context.who, True, sheerka.new(BuiltinConcepts.ONLY_SUCCESSFUL, body=successful_results), parents=return_values) def resolve_ambiguity(context, concepts): """ From the list of concepts, elect the one(s) that best suit(s) the context Use the PRE metadata to choose the correct concepts :param context: :param concepts: :return: """ # we first sort by condition complexity. The more complex is the PRE condition, the more likely # the concept matches the context by_complexity = {} for c in concepts: by_complexity.setdefault(get_condition_complexity(c, concept_part_value(ConceptParts.PRE)), []).append(c) remaining_concepts = [] for complexity in sorted(by_complexity.keys(), reverse=True): if complexity == 0: remaining_concepts.extend(by_complexity[complexity]) else: for c in by_complexity[complexity]: evaluated = context.sheerka.evaluate_concept(context, c, metadata=[ConceptParts.PRE]) if context.sheerka.is_success(evaluated) or evaluated.key == c.key: remaining_concepts.append(c) if len(remaining_concepts) > 0: break # no need to check concept with lower complexity if len(remaining_concepts) in (0, 1): return remaining_concepts # they all failed the pre conditions or one champ is found # for concepts with the same condition complexity, we choose the one that has the less number of variables # We consider that Concept("hello world") is more specific than Concept("hello a").def_var("a") # when the input is "hello world" by_number_of_vars = {} for c in remaining_concepts: by_number_of_vars.setdefault(len(c.get_metadata().variables), []).append(c) return by_number_of_vars[min(by_number_of_vars.keys())] def get_condition_complexity(concept, concept_part_str): """ Need to find a proper algorithm to compute the complexity of a concept metadata So far, the concept is considered as complex if it has concept_part_str (so far with concept_part_str='pre') :param concept: :param concept_part_str: :return: """ value = getattr(concept.get_metadata(), concept_part_str) if value is None or value.strip() == 0: return 0 return 1 # no real computing as of now def only_parsers_results(context, return_values): """ Filters the return_values and returns when the result is a ParserResult regardless of the status So it filters parsers in error (ERROR, NOT_FOR_ME, EMPTY...) :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) return_values_ok = [item for item in return_values if sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)] # hack because some parsers don't follow the NOT_FOR_ME rule temp_ret_val = [] for ret_val in return_values_ok: if isinstance(ret_val.body.body, ParsingError): continue if isinstance(ret_val.body.body, list) and \ len(ret_val.body.body) == 1 and \ isinstance(ret_val.body.body[0], UnrecognizedTokensNode): continue temp_ret_val.append(ret_val) return_values_ok = temp_ret_val if len(return_values_ok) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS if len(return_values) > 1 else BuiltinConcepts.ERROR, body=return_values), parents=return_values) return sheerka.ret( context.who, True, sheerka.new(BuiltinConcepts.FILTERED, body=return_values_ok, iterable=return_values, predicate="sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)"), parents=return_values) def evaluate(context, source, evaluators="all", desc=None, eval_body=True, eval_where=True, is_question=False, expect_success=False, stm=None): """ :param context: :param source: :param evaluators: :param desc: :param eval_body: :param eval_where: :param is_question: :param expect_success: :param stm: short term memories entries :return: """ sheerka = context.sheerka desc = desc or f"Eval '{source}'" with context.push(BuiltinConcepts.EVALUATE_SOURCE, source, desc=desc) as sub_context: if eval_body: sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) if eval_where: sub_context.protected_hints.add(BuiltinConcepts.EVAL_WHERE_REQUESTED) if expect_success: sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED) if is_question: sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED) if stm: for k, v in stm.items(): sub_context.add_to_short_term_memory(k, v) # disable all evaluators but the requested ones if evaluators != "all": from evaluators.BaseEvaluator import BaseEvaluator sub_context.add_preprocess(BaseEvaluator.PREFIX + "*", enabled=False) for evaluator in evaluators: sub_context.add_preprocess(BaseEvaluator.PREFIX + evaluator, enabled=True) user_input = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.USER_INPUT, body=source)) ret = sheerka.execute(sub_context, [user_input], EVAL_STEPS) sub_context.add_values(return_values=ret) return ret def get_lexer_nodes(return_values, start, tokens): """ Transform all elements from return_values into lexer nodes (ConceptNode, UnrecognizedTokensNode, SourceCodeNode...) On the contrary of the other method (get_lexer_nodes_using_positions), all created lexer node will use the same offset (start) :param return_values: :param start: :param tokens: :return: list of list (list of concept node sequence) """ lexer_nodes = [] for ret_val in return_values: if ret_val.who == "parsers.Python": if ret_val.body.source.strip().isidentifier(): # Discard SourceCodeNode which seems to be a concept name # It may be a wrong idea, so let's see continue end = start + len(tokens) - 1 lexer_nodes.append([SourceCodeNode(start, end, tokens, ret_val.body.source, python_node=ret_val.body.body, return_value=ret_val)]) elif ret_val.who == "parsers.ExactConcept": concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] end = start + len(tokens) - 1 for concept in concepts: lexer_nodes.append([ConceptNode(concept, start, end, tokens, ret_val.body.source)]) elif ret_val.who in ("parsers.Bnf", "parsers.Sya", "parsers.Sequence"): nodes = [node for node in ret_val.body.body] for node in nodes: node.start += start node.end += start if isinstance(node, ConceptNode): for k, v in node.concept.get_compiled().items(): if hasattr(v, "start"): v.start += start v.end += start # but append the whole sequence if when it's a sequence lexer_nodes.append(nodes) elif ret_val.who == "parsers.Rule": rules = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] end = start + len(tokens) - 1 for rule in rules: lexer_nodes.append([RuleNode(rule, start, end, tokens, ret_val.body.source)]) else: raise NotImplementedError() return lexer_nodes def get_lexer_nodes_using_positions(return_values, positions): """ Transform all elements from return_values into lexer nodes (ConceptNode, UnrecognizedTokensNode, SourceCodeNode...) Use positions to compute the exact new positions On the contrary of the other method (get_lexer_nodes), one return value is mapped with one position. it's not a offset, but an absolute position :param return_values: :param positions: is a list of triplets (start, end, tokens) :return: """ lexer_nodes = [] for ret_val, position in zip(return_values, positions): if ret_val.who in ("parsers.Python", 'parsers.PythonWithConcepts'): lexer_nodes.append(SourceCodeNode(position.start, position.end, position.tokens, ret_val.body.source, python_node=ret_val.body.body, return_value=ret_val)) elif ret_val.who == "parsers.ExactConcept": concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] for concept in concepts: lexer_nodes.append(ConceptNode(concept, position.start, position.end, position.tokens, ret_val.body.source)) elif ret_val.who in ("parsers.Bnf", "parsers.Sya", "parsers.Sequence"): nodes = [node for node in ret_val.body.body] for node in nodes: node.start = position.start node.end = position.end if isinstance(node, ConceptNode): for k, v in node.concept.get_compiled().items(): if hasattr(v, "start"): v.start += position.start v.end += position.start # but append the whole sequence if when it's a sequence lexer_nodes.extend(nodes) elif ret_val.who == "parsers.Rule": rules = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] for rule in rules: lexer_nodes.append(RuleNode(rule, position.start, position.end, position.tokens, ret_val.body.source)) elif ret_val.who == "parsers.Function": node = ret_val.body.body node.start = position.start node.end = position.end lexer_nodes.append(node) else: raise NotImplementedError() return lexer_nodes def ensure_evaluated(context, concept, eval_body=True, metadata=None): """ Evaluate a concept is not already evaluated :param context: :param concept: :param eval_body: :param metadata: :return: """ if concept.get_metadata().is_evaluated: return concept # do not try to evaluate concept that are not fully initialized if concept.get_metadata().definition_type != DEFINITION_TYPE_BNF: for var_name, var_default_value in concept.get_metadata().variables: if var_default_value is None and \ var_name not in concept.get_compiled() and \ (var_name not in concept.values() or concept.get_value(var_name) == NotInit): return concept evaluated = context.sheerka.evaluate_concept(context, concept, eval_body=eval_body, metadata=metadata) return evaluated def get_lexer_nodes_from_unrecognized(context, unrecognized_tokens_node, parsers): """ Using parsers, try to recognize concepts from source :param context: :param unrecognized_tokens_node: :param parsers: :return: """ # first look into short term memory to see if the unrecognized is not a variable of the current object if (current_obj := context.sheerka.get_from_short_term_memory(context, CURRENT_OBJ)) is not NotFound: if isinstance(current_obj, Concept): source = unrecognized_tokens_node.source if source in current_obj.get_compiled() or source in current_obj.variables(): return [[VariableNode(current_obj, source, unrecognized_tokens_node.start, unrecognized_tokens_node.end, unrecognized_tokens_node.tokens, unrecognized_tokens_node.source)]] res = context.sheerka.parse_unrecognized(context, unrecognized_tokens_node.source, parsers) res = only_parsers_results(context, res) if not res.status: return None return get_lexer_nodes(res.body.body, unrecognized_tokens_node.start, unrecognized_tokens_node.tokens) def update_compiled(context, concept, errors, parsers=None): """ TL;DR; Recursively iterate over concept.get_compiled() to replace LexerNode into concepts or list of ReturnValueConcept Long version: When parsing using a LexerNodeParser (SyaNodeParser, BnfNodeParser...) the result will be a LexerNode. In the specific case of a ConceptNode, the compiled variables will also be LexerNode (UnrecognizedTokensNode...) This function iterate over the compile to transform these nodes into concept of compiled AST :param context: :param concept: :param errors: a list the must be initialized by the caller :param parsers: to customize the parsers to use :return: """ sheerka = context.sheerka parsers = parsers or PARSERS def _validate_concept(c): """ Recursively browse the compiled properties in order to find unrecognized :param c: :return: """ for k, v in c.get_compiled().items(): if isinstance(v, Concept): _validate_concept(v) elif isinstance(v, SourceCodeWithConceptNode): if v.return_value: res = v.return_value else: from parsers.PythonWithConceptsParser import PythonWithConceptsParser parser_helper = PythonWithConceptsParser() res = parser_helper.parse_nodes(context, v.get_all_nodes()) if res.status: c.get_compiled()[k] = [res] else: errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'")) elif isinstance(v, UnrecognizedTokensNode): res = context.sheerka.parse_unrecognized(context, v.source, parsers) res = only_successful(context, res) # only key successful parsers if res.status: c.get_compiled()[k] = res.body.body else: errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'")) def _get_source(compiled, var_name): if var_name not in compiled: return None if not isinstance(compiled[var_name], list): return None if not len(compiled[var_name]) == 1: return None if not sheerka.isinstance(compiled[var_name][0], BuiltinConcepts.RETURN_VALUE): return None if not sheerka.isinstance(compiled[var_name][0].body, BuiltinConcepts.PARSER_RESULT): return None if compiled[var_name][0].body.name == "parsers.ShortTermMemory": return None return compiled[var_name][0].body.source _validate_concept(concept) # Special case where the values of the variables are the names of the variable # example : Concept("a plus b").def_var("a").def_var("b") # and the user has entered 'a plus b' # Chances are that we are talking about the concept itself, and not an instantiation (like '10 plus 2') # This means that 'a' and 'b' don't have any real values if len(concept.get_metadata().variables) > 0: for name, value in concept.get_metadata().variables: if _get_source(concept.get_compiled(), name) != name: break else: concept.get_metadata().is_evaluated = True def add_to_ret_val(sheerka, context, return_values, concept_key): concept = sheerka.new(concept_key) ret_val = sheerka.ret(context.who, True, concept) return_values.append(ret_val) return return_values def remove_from_ret_val(sheerka, return_values, concept_key): to_remove = [] for ret_val in return_values: if ret_val.status and sheerka.isinstance(ret_val.body, concept_key): to_remove.append(ret_val) for item in to_remove: return_values.remove(item) return return_values def set_is_evaluated(concepts, check_nb_variables=False): """ set is_evaluated to True :param concepts: :param check_nb_variables: only set is_evaluated if the concept has variables :return: """ if concepts is None: return if hasattr(concepts, "__iter__"): for c in concepts: if not check_nb_variables or check_nb_variables and len(c.get_metadata().variables) > 0: c.get_metadata().is_evaluated = True else: if not check_nb_variables or check_nb_variables and len(concepts.get_metadata().variables) > 0: concepts.get_metadata().is_evaluated = True def ensure_concept(*concepts): if hasattr(concepts, "__iter__"): for concept in concepts: if not isinstance(concept, Concept): raise TypeError(f"'{concept}' must be a concept") else: if not isinstance(concepts, Concept): raise TypeError(f"'{concepts}' must be a concept") def ensure_rule(*rules): if hasattr(rules, "__iter__"): for rule in rules: if not isinstance(rule, Rule): raise TypeError(f"'{rule}' must be a rule") else: if not isinstance(rules, Rule): raise TypeError(f"'{rules}' must be a rule") def ensure_concept_or_rule(*items): if hasattr(items, "__iter__"): for item in items: if not isinstance(item, (Concept, Rule)): raise TypeError(f"'{item}' must be a concept or rule") else: if not isinstance(items, (Concept, Rule)): raise TypeError(f"'{items}' must be a concept or rule") def ensure_bnf(context, concept, parser_name="BaseNodeParser", update_bnf_for_cached_concept=True): if concept.get_metadata().definition_type == DEFINITION_TYPE_BNF and not concept.get_bnf(): from parsers.BnfDefinitionParser import BnfDefinitionParser regex_parser = BnfDefinitionParser() desc = f"Resolving BNF '{concept.get_metadata().definition}'" with context.push(BuiltinConcepts.INIT_BNF, concept, who=parser_name, obj=concept, desc=desc) as sub_context: sub_context.add_inputs(parser_input=concept.get_metadata().definition) bnf_parsing_ret_val = regex_parser.parse(sub_context, concept.get_metadata().definition) sub_context.add_values(return_values=bnf_parsing_ret_val) if not bnf_parsing_ret_val.status: raise Exception(bnf_parsing_ret_val.value) concept.set_bnf(bnf_parsing_ret_val.body.body) if concept.id and update_bnf_for_cached_concept: context.sheerka.get_by_id(concept.id).set_bnf(concept.get_bnf()) # update bnf in cache expressions_cache = Cache() def evaluate_expression(expr, bag): """ Try to evaluate expr in context of bag :param expr: :param bag: :return: """ if expr is None or expr.strip() == "": return None if expr in bag: return bag[expr] props_definitions = expressions_cache.get(expr) if props_definitions is NotFound: _ast = ast.parse(expr, mode="eval") props_definitions = [] ast_to_props(props_definitions, _ast.body, None) props_definitions.reverse() expressions_cache.put(expr, props_definitions) return evaluate_object(bag, props_definitions) def evaluate_object(bag, properties): """ Evaluate the properties of an object Works with evaluate_expression :param bag: :param properties: List of ast_helpers.PropDef :return: """ for prop in properties: try: obj = bag[prop.prop] except KeyError: try: obj = bag["self"][prop.prop] except Exception: raise NameError(prop.prop) if obj is None: return None if prop.index is not None: obj = obj[prop.index] bag = as_bag(obj) return obj def is_a_question(context, concept): """ Returns True if the concept must be executed in the context of BuiltinConcepts.EVAL_QUESTION_REQUESTED The only two ways that are currently supported are * is_question() appears in the pre condition * context.in_context(BuiltinConcepts.EVAL_QUESTION_REQUESTED) appears in the pre condition :param context: :param concept: concept to analyse """ pre = concept.get_metadata().pre if pre in (None, NotInit, ""): return False res = context.sheerka.parse_expression(context, pre) if not res.status: return False node = res.body.body from parsers.expressions import IsAQuestionVisitor return IsAQuestionVisitor().is_a_question(node) def get_inner_body(context, concept): """ For container concept, returns the body """ if context.sheerka.isinstance(concept.body, BuiltinConcepts.ONLY_SUCCESSFUL): return concept.body.body else: return concept.body class CreateObjectIdentifiers: """ Class that creates unique identifiers for Concept or Rule objects """ def __init__(self): self.identifiers = {} self.identifiers_key = {} @staticmethod def sanitize(identifier): if identifier is None: return "" res = "" for c in identifier: res += c if c.isalnum() else "0" return res def get_identifier(self, obj, wrapper): """ Get an identifier for a concept. Make sure to return the same identifier if the same concept Make sure to return a different identifier if same name but different concept Internal function because I don't want identifiers, identifiers_key and python_ids_mappings to be instance variables I would like to keep this parser as stateless as possible :param obj: :param wrapper: string or char that will wrap the result (ex '__C__' or '__R__') :return: """ if id(obj) in self.identifiers: return self.identifiers[id(obj)] identifier = wrapper + self.sanitize(obj.key or obj.name) if obj.id: identifier += "__" + obj.id if identifier in self.identifiers_key: self.identifiers_key[identifier] += 1 identifier += f"_{self.identifiers_key[identifier]}" else: self.identifiers_key[identifier] = 0 identifier += wrapper self.identifiers[id(obj)] = identifier return identifier