Files
Sheerka-Old/src/core/builtin_helpers.py
T

489 lines
16 KiB
Python

import ast
import logging
import core.ast.nodes
from core.ast.nodes import CallNodeConcept
from core.ast.visitors import UnreferencedNamesVisitor
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
from parsers.BaseNodeParser import SourceCodeNode, ConceptNode, UnrecognizedTokensNode
from parsers.BaseParser import BaseParser, ErrorNode
def is_same_success(context, return_values):
"""
Returns True if all returns values are successful and have the same value
:param context:
:param return_values:
:return:
"""
assert isinstance(return_values, list)
def _get_value(ret_val):
if not ret_val.status:
raise Exception("Status is false")
if isinstance(ret_val.body, Concept):
if not ret_val.body.metadata.is_evaluated:
with context.push(desc=f"Evaluating concept '{ret_val.body}'") as sub_context:
sub_context.local_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED)
evaluated = context.sheerka.evaluate_concept(sub_context, ret_val.body)
if evaluated.key != ret_val.body.key:
raise Exception("Failed to evaluate evaluate")
return context.sheerka.objvalue(evaluated)
else:
return context.sheerka.objvalue(ret_val.body)
else:
return context.sheerka.objvalue(ret_val)
try:
reference = _get_value(return_values[0])
for return_value in return_values[1:]:
actual = _get_value(return_value)
if actual != reference:
return False
except Exception as ex:
context.log_error(ex)
return False
return True
def expect_one(context, return_values):
"""
Checks if there is at least one success return value
If there is more than one, check if it's the same value
:param context:
:param return_values:
:return:
"""
if not isinstance(return_values, list):
return return_values
sheerka = context.sheerka
if len(return_values) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
parents=return_values)
if len(return_values) == 1:
return return_values[0]
successful_results = [item for item in return_values if item.status]
number_of_successful = len(successful_results)
# total_items = len(return_values)
# remove errors when a winner is found
if number_of_successful == 1:
return sheerka.ret(
context.who,
True,
successful_results[0].body,
parents=return_values)
# too many winners, which one to choose ?
if number_of_successful > 1:
if is_same_success(context, successful_results):
return sheerka.ret(
context.who,
True,
successful_results[0].value,
parents=return_values)
else:
if context.logger and context.logger.isEnabledFor(logging.DEBUG):
context.log(f"Too many successful results found by expect_one()", context.who)
for s in successful_results:
context.log(f"-> {s}", context.who)
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results),
parents=return_values)
# only errors, i cannot help you
if context.logger and context.logger.isEnabledFor(logging.DEBUG):
context.log(f"Too many errors found by expect_one()", context.who)
for s in successful_results:
context.log(f"-> {s}", context.who)
if len(return_values) == 1:
return sheerka.ret(
context.who,
False,
return_values[0].body,
parents=return_values)
else:
# test if only one evaluator in error
from evaluators.OneErrorEvaluator import OneErrorEvaluator
one_error_evaluator = OneErrorEvaluator()
reduce_requested = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.REDUCE_REQUESTED))
if one_error_evaluator.matches(context, return_values + [reduce_requested]):
return sheerka.ret(
context.who,
False,
one_error_evaluator.eval(context, return_values).body,
parents=return_values)
else:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
parents=return_values)
def only_successful(context, return_values):
"""
Removes all return values that are not successful
Return error when no successful return value
:param context:
:param return_values:
:return:
"""
if not isinstance(return_values, list):
return return_values
sheerka = context.sheerka
if len(return_values) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
parents=return_values)
successful_results = [item for item in return_values if item.status]
if len(successful_results) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
parents=return_values)
return sheerka.ret(
context.who,
True,
sheerka.new(BuiltinConcepts.ONLY_SUCCESSFUL, body=successful_results),
parents=return_values)
def only_parsers_results(context, return_values):
"""
Filters the return_values and returns when the result is a ParserResult
regardless of the status
So it filters errors
:param context:
:param return_values:
:return:
"""
if not isinstance(return_values, list):
return return_values
sheerka = context.sheerka
if len(return_values) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
parents=return_values)
return_values_ok = [item for item in return_values if sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)]
# hack because some parsers don't follow the NOT_FOR_ME rule
temp_ret_val = []
for ret_val in return_values_ok:
if isinstance(ret_val.body.body, ErrorNode):
continue
if isinstance(ret_val.body.body, list) and \
len(ret_val.body.body) == 1 and \
isinstance(ret_val.body.body[0], UnrecognizedTokensNode):
continue
temp_ret_val.append(ret_val)
return_values_ok = temp_ret_val
if len(return_values_ok) == 0:
return sheerka.ret(
context.who,
False,
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
parents=return_values)
return sheerka.ret(
context.who,
True,
sheerka.new(BuiltinConcepts.FILTERED,
body=return_values_ok,
iterable=return_values,
predicate="sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)"),
parents=return_values)
def parse_unrecognized(context, tokens, parsers):
"""
Try to recognize concepts or code from tokens using the given parsers
:param context:
:param tokens:
:param parsers:
:return:
"""
steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
sheerka = context.sheerka
with context.push(desc=f"Parsing unrecognized '{tokens}'") as sub_context:
# disable all parsers but the following ones
sub_context.add_preprocess(BaseParser.PREFIX + "*", enabled=False)
for parser in parsers:
sub_context.add_preprocess(BaseParser.PREFIX + parser, enabled=True)
sub_context.add_inputs(source=tokens)
to_parse = sheerka.ret(
context.who,
True,
sheerka.new(BuiltinConcepts.USER_INPUT, body=tokens))
res = sheerka.execute(sub_context, to_parse, steps)
sub_context.add_values(return_values=res)
# discard Python response if accepted by AtomNode
is_concept = False
for r in res:
if r.status and r.who == "parsers.AtomNode":
is_concept = True
if not is_concept:
return res
filtered = []
for r in res:
if r.who == "parsers.Python":
continue
filtered.append(r)
return filtered
def get_lexer_nodes(return_values, start, tokens):
"""
From a parser result, return the corresponding LexerNode
either ConceptNode, UnrecognizedTokensNode or SourceCodeNode
:param return_values:
:param start:
:param tokens:
:return: list of list (list of concept node sequence)
"""
lexer_nodes = []
for ret_val in return_values:
if ret_val.who == "parsers.Python":
if ret_val.body.source.strip().isidentifier():
# Discard SourceCodeNode which seems to be a concept name
# It may be a wrong idea, so let's see
continue
end = start + len(tokens) - 1
lexer_nodes.append([SourceCodeNode(ret_val.body.body, start, end, tokens, ret_val.body.source, ret_val)])
elif ret_val.who == "parsers.ExactConcept":
concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body]
end = start + len(tokens) - 1
for concept in concepts:
lexer_nodes.append([ConceptNode(concept, start, end, tokens, ret_val.body.source)])
elif ret_val.who in ("parsers.BnfNode", "parsers.SyaNode", "parsers.AtomNode"):
nodes = [node for node in ret_val.body.body]
for node in nodes:
node.start += start
node.end += start
# but append the whole sequence if when it's a sequence
lexer_nodes.append(nodes)
else:
raise NotImplementedError()
return lexer_nodes
def ensure_evaluated(context, concept):
"""
Evaluate a concept is not already evaluated
:param context:
:param concept:
:return:
"""
if concept.metadata.is_evaluated:
return concept
with context.push(desc=f"Evaluating concept {concept}") as sub_context:
sub_context.local_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED)
evaluated = context.sheerka.evaluate_concept(sub_context, concept)
sub_context.add_values(return_values=evaluated)
return evaluated
def get_lexer_nodes_from_unrecognized(context, unrecognized_tokens_node, parsers):
"""
Using parsers, try to recognize concepts from source
:param context:
:param unrecognized_tokens_node:
:param parsers:
:return:
"""
res = parse_unrecognized(context, unrecognized_tokens_node.source, parsers)
res = only_parsers_results(context, res)
if not res.status:
return None
return get_lexer_nodes(res.body.body, unrecognized_tokens_node.start, unrecognized_tokens_node.tokens)
def get_names(sheerka, concept_node):
"""
Finds all the names referenced by the concept_node
:param sheerka:
:param concept_node:
:return:
"""
unreferenced_names_visitor = UnreferencedNamesVisitor(sheerka)
unreferenced_names_visitor.visit(concept_node)
return list(unreferenced_names_visitor.names)
def extract_predicates(sheerka, expression, variables_to_include, variables_to_exclude):
"""
from a given expression and a variable (or list of variables)
tries to find out all the predicates referencing the(se) variable(s), and the(se) variable(s) solely
for example
exp : isinstance(a, int) and isinstance(b, str)
will return 'isinstance(a, int)' if variable_name == 'a'
:param sheerka:
:param expression:
:param variables_to_include:
:param variables_to_exclude:
:return: list of predicates
"""
if len(variables_to_include) == 0:
return []
def _get_predicates(_nodes):
_predicates = []
for _node in _nodes:
python_node = ast.Expression(body=core.ast.nodes.concept_to_python(_node))
python_node = ast.fix_missing_locations(python_node)
_predicates.append(python_node)
return _predicates
if isinstance(expression, str):
node = ast.parse(expression, mode="eval")
else:
return NotImplementedError()
concept_node = core.ast.nodes.python_to_concept(node)
main_op = concept_node.get_value("body")
return _get_predicates(_extract_predicates(sheerka, main_op, variables_to_include, variables_to_exclude))
def _extract_predicates(sheerka, node, variables_to_include, variables_to_exclude):
predicates = []
def _matches(_names, to_include, to_exclude):
_res = None
for n in _names:
if n in to_include and _res is None:
_res = True
if n in to_exclude:
_res = False
return _res
if node.node_type == "Compare":
if node.get_value("left").node_type == "Name":
"""Simple case of one comparison"""
comparison_name = sheerka.objvalue(node.get_value("left"))
if comparison_name in variables_to_include and comparison_name not in variables_to_exclude:
predicates.append(node)
else:
"""The left part is an expression"""
res = _extract_predicates(sheerka, node.get_value("left"), variables_to_include, variables_to_exclude)
if len(res) > 0:
predicates.append(node)
elif node.node_type == "Call":
"""Simple case predicate"""
call_node = node if isinstance(node, CallNodeConcept) else CallNodeConcept().update_from(node)
args = list(call_node.get_args_names(sheerka))
if _matches(args, variables_to_include, variables_to_exclude):
predicates.append(node)
elif node.node_type == "UnaryOp" and node.get_value("op").node_type == "Not":
"""Simple case of negation"""
res = _extract_predicates(sheerka, node.get_value("operand"), variables_to_include, variables_to_exclude)
if len(res) > 0:
predicates.append(node)
elif node.node_type == "BinOp":
names = get_names(sheerka, node)
if _matches(names, variables_to_include, variables_to_exclude):
predicates.append(node)
elif node.node_type == "BoolOp":
all_op = True
temp_res = []
for op in node.get_value("values").body:
res = _extract_predicates(sheerka, op, variables_to_include, variables_to_exclude)
if len(res) == 0:
all_op = False
else:
temp_res.extend(res)
if all_op:
predicates.append(node)
else:
for res in temp_res:
predicates.append(res)
return predicates
def add_to_ret_val(sheerka, context, return_values, concept_key):
concept = sheerka.new(concept_key)
ret_val = sheerka.ret(context.who, True, concept)
return_values.append(ret_val)
return return_values
def remove_from_ret_val(sheerka, return_values, concept_key):
to_remove = []
for ret_val in return_values:
if ret_val.status and sheerka.isinstance(ret_val.body, concept_key):
to_remove.append(ret_val)
for item in to_remove:
return_values.remove(item)
return return_values
def set_is_evaluated(concepts):
"""
set is_evaluated to True
:param concepts:
:return:
"""
if concepts is None:
return
if hasattr(concepts, "__iter__"):
for c in concepts:
c.metadata.is_evaluated = True
else:
concepts.metadata.is_evaluated = True