import ast import logging import core.ast.nodes from core.ast.nodes import CallNodeConcept, GenericNodeConcept from core.ast.visitors import UnreferencedNamesVisitor from core.builtin_concepts import BuiltinConcepts def is_same_success(sheerka, return_values): """ Returns True if all returns values are successful and have the same value :param sheerka: :param return_values: :return: """ assert isinstance(return_values, list) if not return_values[0].status: return False reference = sheerka.value(return_values[0].value) for return_value in return_values[1:]: if not return_value.status: return False actual = sheerka.value(return_value.value) if actual != reference: return False return True def expect_one(context, return_values): """ Checks if there is at least one success return value If there is more than one, check if it's the same value :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) successful_results = [item for item in return_values if item.status] number_of_successful = len(successful_results) # total_items = len(return_values) # remove errors when a winner is found if number_of_successful == 1: return sheerka.ret( context.who, True, successful_results[0].body, parents=return_values) # too many winners, which one to choose ? if number_of_successful > 1: if is_same_success(sheerka, successful_results): return sheerka.ret( context.who, True, successful_results[0].value, parents=return_values) else: if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many successful results found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results), parents=return_values) # only errors, i cannot help you if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many errors found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) if len(return_values) == 1: return sheerka.ret( context.who, False, return_values[0], parents=return_values) else: # test if only one evaluator in error from evaluators.OneErrorEvaluator import OneErrorEvaluator one_error_evaluator = OneErrorEvaluator() reduce_requested = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.REDUCE_REQUESTED)) if one_error_evaluator.matches(context, return_values + [reduce_requested]): return sheerka.ret( context.who, False, one_error_evaluator.eval(context, return_values).body, parents=return_values) else: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values), parents=return_values) def get_names(sheerka, concept_node): """ Finds all the names referenced by the concept_node :param sheerka: :param concept_node: :return: """ unreferenced_names_visitor = UnreferencedNamesVisitor(sheerka) unreferenced_names_visitor.visit(concept_node) return list(unreferenced_names_visitor.names) def extract_predicates(sheerka, expression, variables_to_include, variables_to_exclude): """ from a given expression and a variable (or list of variables) tries to find out all the predicates referencing the(se) variable(s), and the(se) variable(s) solely for example exp : isinstance(a, int) and isinstance(b, str) will return 'isinstance(a, int)' if variable_name == 'a' :param sheerka: :param expression: :param variables_to_include: :param variables_to_exclude: :return: list of predicates """ if len(variables_to_include) == 0: return [] def _get_predicates(_nodes): _predicates = [] for _node in _nodes: python_node = ast.Expression(body=core.ast.nodes.concept_to_python(_node)) python_node = ast.fix_missing_locations(python_node) _predicates.append(python_node) return _predicates if isinstance(expression, str): node = ast.parse(expression, mode="eval") else: return NotImplementedError() concept_node = core.ast.nodes.python_to_concept(node) main_op = concept_node.get_prop("body") return _get_predicates(_extract_predicates(sheerka, main_op, variables_to_include, variables_to_exclude)) def _extract_predicates(sheerka, node, variables_to_include, variables_to_exclude): predicates = [] def _matches(_names, to_include, to_exclude): _res = None for n in _names: if n in to_include and _res is None: _res = True if n in to_exclude: _res = False return _res if node.node_type == "Compare": if node.get_prop("left").node_type == "Name": """Simple case of one comparison""" comparison_name = sheerka.value(node.get_prop("left")) if comparison_name in variables_to_include and comparison_name not in variables_to_exclude: predicates.append(node) else: """The left part is an expression""" res = _extract_predicates(sheerka, node.get_prop("left"), variables_to_include, variables_to_exclude) if len(res) > 0: predicates.append(node) elif node.node_type == "Call": """Simple case predicate""" call_node = node if isinstance(node, CallNodeConcept) else CallNodeConcept().update_from(node) args = list(call_node.get_args_names(sheerka)) if _matches(args, variables_to_include, variables_to_exclude): predicates.append(node) elif node.node_type == "UnaryOp" and node.get_prop("op").node_type == "Not": """Simple case of negation""" res = _extract_predicates(sheerka, node.get_prop("operand"), variables_to_include, variables_to_exclude) if len(res) > 0: predicates.append(node) elif node.node_type == "BinOp": names = get_names(sheerka, node) if _matches(names, variables_to_include, variables_to_exclude): predicates.append(node) elif node.node_type == "BoolOp": all_op = True temp_res = [] for op in node.get_prop("values").body: res = _extract_predicates(sheerka, op, variables_to_include, variables_to_exclude) if len(res) == 0: all_op = False else: temp_res.extend(res) if all_op: predicates.append(node) else: for res in temp_res: predicates.append(res) return predicates