736 lines
25 KiB
Python
736 lines
25 KiB
Python
import ast
|
|
import logging
|
|
|
|
import core.ast.nodes
|
|
from core.ast.nodes import CallNodeConcept
|
|
from core.ast.visitors import UnreferencedNamesVisitor
|
|
from core.builtin_concepts import BuiltinConcepts
|
|
from core.concept import Concept, NotInit, ConceptParts
|
|
from core.sheerka.services.SheerkaExecute import SheerkaExecute
|
|
from core.tokenizer import Keywords
|
|
# from evaluators.BaseEvaluator import BaseEvaluator
|
|
from parsers.BaseNodeParser import SourceCodeNode, ConceptNode, UnrecognizedTokensNode, SourceCodeWithConceptNode
|
|
from parsers.BaseParser import BaseParser, ErrorNode
|
|
|
|
PARSE_STEPS = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
|
|
EVAL_STEPS = PARSE_STEPS + [BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION]
|
|
PARSERS = ["EmptyString", "ShortTermMemory", "AtomNode", "BnfNode", "SyaNode", "Python"]
|
|
|
|
|
|
def is_same_success(context, return_values):
|
|
"""
|
|
Returns True if all returns values are successful and have the same value
|
|
:param context:
|
|
:param return_values:
|
|
:return:
|
|
"""
|
|
assert isinstance(return_values, list)
|
|
|
|
def _get_value(ret_val):
|
|
if not ret_val.status:
|
|
raise Exception("Status is false")
|
|
|
|
if isinstance(ret_val.body, Concept):
|
|
if not ret_val.body.metadata.is_evaluated:
|
|
evaluated = context.sheerka.evaluate_concept(context, ret_val.body, eval_body=True)
|
|
if not context.sheerka.is_success(evaluated):
|
|
raise Exception("Failed to evaluate evaluate")
|
|
|
|
return context.sheerka.objvalue(evaluated)
|
|
else:
|
|
return context.sheerka.objvalue(ret_val.body)
|
|
else:
|
|
return context.sheerka.objvalue(ret_val)
|
|
|
|
try:
|
|
reference = _get_value(return_values[0])
|
|
|
|
for return_value in return_values[1:]:
|
|
actual = _get_value(return_value)
|
|
if actual != reference:
|
|
return False
|
|
|
|
except Exception as ex:
|
|
context.log_error(ex)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def expect_one(context, return_values):
|
|
"""
|
|
Checks if there is at least one success return value
|
|
If there is more than one, check if it's the same value
|
|
:param context:
|
|
:param return_values:
|
|
:return:
|
|
"""
|
|
|
|
if not isinstance(return_values, list):
|
|
return return_values
|
|
|
|
sheerka = context.sheerka
|
|
|
|
if len(return_values) == 0:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
|
|
parents=return_values)
|
|
|
|
if len(return_values) == 1:
|
|
return return_values[0]
|
|
|
|
successful_results = [item for item in return_values if item.status]
|
|
number_of_successful = len(successful_results)
|
|
# total_items = len(return_values)
|
|
|
|
# remove errors when a winner is found
|
|
if number_of_successful == 1:
|
|
return sheerka.ret(
|
|
context.who,
|
|
True,
|
|
successful_results[0].body,
|
|
parents=return_values)
|
|
|
|
# too many winners, which one to choose ?
|
|
if number_of_successful > 1:
|
|
if is_same_success(context, successful_results):
|
|
return sheerka.ret(
|
|
context.who,
|
|
True,
|
|
successful_results[0].value,
|
|
parents=return_values)
|
|
else:
|
|
if context.logger and context.logger.isEnabledFor(logging.DEBUG):
|
|
context.log(f"Too many successful results found by expect_one()", context.who)
|
|
for s in successful_results:
|
|
context.log(f"-> {s}", context.who)
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results),
|
|
parents=return_values)
|
|
|
|
# number_of_successful == 0, only errors, i cannot help you
|
|
if context.logger and context.logger.isEnabledFor(logging.DEBUG):
|
|
context.log(f"Too many errors found by expect_one()", context.who)
|
|
for s in successful_results:
|
|
context.log(f"-> {s}", context.who)
|
|
|
|
if len(return_values) == 1:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
return_values[0].body,
|
|
parents=return_values)
|
|
else:
|
|
# test if only one evaluator in error
|
|
from evaluators.OneErrorEvaluator import OneErrorEvaluator
|
|
one_error_evaluator = OneErrorEvaluator()
|
|
reduce_requested = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.REDUCE_REQUESTED))
|
|
if one_error_evaluator.matches(context, return_values + [reduce_requested]):
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
one_error_evaluator.eval(context, return_values).body,
|
|
parents=return_values)
|
|
else:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
|
|
parents=return_values)
|
|
|
|
|
|
def only_successful(context, return_values):
|
|
"""
|
|
Removes all return values that are not successful
|
|
Return error when no successful return value
|
|
:param context:
|
|
:param return_values:
|
|
:return:
|
|
"""
|
|
if not isinstance(return_values, list):
|
|
return return_values
|
|
|
|
sheerka = context.sheerka
|
|
|
|
if len(return_values) == 0:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
|
|
parents=return_values)
|
|
|
|
successful_results = [item for item in return_values if item.status]
|
|
if len(successful_results) == 0:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
|
|
parents=return_values)
|
|
|
|
return sheerka.ret(
|
|
context.who,
|
|
True,
|
|
sheerka.new(BuiltinConcepts.ONLY_SUCCESSFUL, body=successful_results),
|
|
parents=return_values)
|
|
|
|
|
|
def resolve_ambiguity(context, concepts):
|
|
"""
|
|
From the list of concepts, elect the one(s) that best suit(s) the context
|
|
Use the PRE metadata to choose the correct concepts
|
|
:param context:
|
|
:param concepts:
|
|
:return:
|
|
"""
|
|
|
|
# we first sort by condition complexity. The more complex is the PRE condition, the more likely
|
|
# the concept matches the context
|
|
by_complexity = {}
|
|
for c in concepts:
|
|
by_complexity.setdefault(get_condition_complexity(c, "pre"), []).append(c)
|
|
|
|
remaining_concepts = []
|
|
for complexity in sorted(by_complexity.keys(), reverse=True):
|
|
if complexity == 0:
|
|
remaining_concepts.extend(by_complexity[complexity])
|
|
else:
|
|
for c in by_complexity[complexity]:
|
|
evaluated = context.sheerka.evaluate_concept(context, c, metadata=["pre"])
|
|
if evaluated.key == c.key:
|
|
remaining_concepts.append(c)
|
|
|
|
if len(remaining_concepts) > 0:
|
|
break # no need to check concept with lower complexity
|
|
|
|
if len(remaining_concepts) in (0, 1):
|
|
return remaining_concepts # they all failed the pre conditions or one champ is found
|
|
|
|
# for concepts with the same condition complexity, we choose the one that has the less number of variables
|
|
# We consider that Concept("hello world") is more specific than Concept("hello a").def_var("a")
|
|
# when the input is "hello world"
|
|
by_number_of_vars = {}
|
|
for c in remaining_concepts:
|
|
by_number_of_vars.setdefault(len(c.metadata.variables), []).append(c)
|
|
|
|
return by_number_of_vars[min(by_number_of_vars.keys())]
|
|
|
|
|
|
def get_condition_complexity(concept, concept_part_str):
|
|
concept_part_value = getattr(concept.metadata, concept_part_str)
|
|
if concept_part_value is None or concept_part_value.strip() == 0:
|
|
return 0
|
|
|
|
return 1 # no real computing as of now
|
|
|
|
|
|
def only_parsers_results(context, return_values):
|
|
"""
|
|
Filters the return_values and returns when the result is a ParserResult
|
|
regardless of the status
|
|
|
|
So it filters errors
|
|
:param context:
|
|
:param return_values:
|
|
:return:
|
|
"""
|
|
|
|
if not isinstance(return_values, list):
|
|
return return_values
|
|
|
|
sheerka = context.sheerka
|
|
|
|
if len(return_values) == 0:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values),
|
|
parents=return_values)
|
|
|
|
return_values_ok = [item for item in return_values if
|
|
sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)]
|
|
|
|
# hack because some parsers don't follow the NOT_FOR_ME rule
|
|
temp_ret_val = []
|
|
for ret_val in return_values_ok:
|
|
if isinstance(ret_val.body.body, ErrorNode):
|
|
continue
|
|
if isinstance(ret_val.body.body, list) and \
|
|
len(ret_val.body.body) == 1 and \
|
|
isinstance(ret_val.body.body[0], UnrecognizedTokensNode):
|
|
continue
|
|
temp_ret_val.append(ret_val)
|
|
return_values_ok = temp_ret_val
|
|
|
|
if len(return_values_ok) == 0:
|
|
return sheerka.ret(
|
|
context.who,
|
|
False,
|
|
sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values),
|
|
parents=return_values)
|
|
|
|
return sheerka.ret(
|
|
context.who,
|
|
True,
|
|
sheerka.new(BuiltinConcepts.FILTERED,
|
|
body=return_values_ok,
|
|
iterable=return_values,
|
|
predicate="sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)"),
|
|
parents=return_values)
|
|
|
|
|
|
def parse_unrecognized(context, source, parsers, who=None, prop=None, filter_func=None):
|
|
"""
|
|
Try to recognize concepts or code from source using the given parsers
|
|
:param context:
|
|
:param source:
|
|
:param parsers:
|
|
:param who: who is asking the parsing ?
|
|
:param prop: Extra info, when parsing a property
|
|
:param filter_func: filter function to call is provided
|
|
:return:
|
|
"""
|
|
sheerka = context.sheerka
|
|
|
|
if prop:
|
|
action_context = {"prop": prop, "source": source}
|
|
desc = f"Parsing attribute '{prop}'"
|
|
else:
|
|
action_context = source
|
|
desc = f"Parsing '{source}'"
|
|
|
|
with context.push(BuiltinConcepts.PARSING, action_context, who=who, desc=desc) as sub_context:
|
|
# disable all parsers but the requested ones
|
|
if parsers != "all":
|
|
sub_context.add_preprocess(BaseParser.PREFIX + "*", enabled=False)
|
|
for parser in parsers:
|
|
sub_context.add_preprocess(BaseParser.PREFIX + parser, enabled=True)
|
|
|
|
if prop in (Keywords.WHERE, Keywords.PRE, ConceptParts.WHERE, ConceptParts.PRE):
|
|
sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED)
|
|
|
|
sub_context.add_inputs(source=source)
|
|
to_parse = sheerka.ret(context.who,
|
|
True,
|
|
sheerka.new(BuiltinConcepts.USER_INPUT, body=source))
|
|
res = sheerka.execute(sub_context, to_parse, PARSE_STEPS)
|
|
|
|
if filter_func:
|
|
res = filter_func(sub_context, res)
|
|
|
|
sub_context.add_values(return_values=res)
|
|
if not hasattr(res, "__iter__"):
|
|
return res
|
|
|
|
# discard Python response if accepted by AtomNode
|
|
is_concept = False
|
|
for r in res:
|
|
if r.status and r.who == "parsers.AtomNode":
|
|
is_concept = True
|
|
|
|
if not is_concept:
|
|
return res
|
|
|
|
no_python = []
|
|
for r in res:
|
|
if r.who == "parsers.Python":
|
|
continue
|
|
no_python.append(r)
|
|
|
|
return no_python
|
|
|
|
|
|
def parse_function(context, source, tokens=None, start=0):
|
|
"""
|
|
Helper function to parse what is supposed to be a function
|
|
:param context:
|
|
:param source:
|
|
:param tokens:
|
|
:param start: start index for the source code node
|
|
:return:
|
|
"""
|
|
sheerka = context.sheerka
|
|
from parsers.FunctionParser import FunctionParser
|
|
parser = FunctionParser()
|
|
desc = f"Parsing function '{source}'"
|
|
with context.push(BuiltinConcepts.PARSE_CODE, source, desc=desc) as sub_context:
|
|
sheerka_execution = sheerka.services[SheerkaExecute.NAME]
|
|
res = parser.parse(sub_context, sheerka_execution.get_parser_input(source, tokens))
|
|
|
|
if not isinstance(res, list):
|
|
res = [res]
|
|
|
|
for r in [r for r in res if sheerka.isinstance(r.body, BuiltinConcepts.PARSER_RESULT)]:
|
|
r.body.body.start += start
|
|
r.body.body.end += start
|
|
if isinstance(r.body.body, SourceCodeWithConceptNode):
|
|
for n in [r.body.body.first, r.body.body.last] + r.body.body.nodes:
|
|
n.start += start
|
|
n.end += start
|
|
|
|
return res
|
|
|
|
|
|
def evaluate(context,
|
|
source,
|
|
evaluators="all",
|
|
desc=None,
|
|
eval_body=True,
|
|
eval_where=True,
|
|
expect_success=False,
|
|
stm=None):
|
|
"""
|
|
|
|
:param context:
|
|
:param source:
|
|
:param evaluators:
|
|
:param desc:
|
|
:param eval_body:
|
|
:param eval_where:
|
|
:param expect_success:
|
|
:param stm: short term memories entries
|
|
:return:
|
|
"""
|
|
|
|
sheerka = context.sheerka
|
|
desc = desc or f"Eval '{source}'"
|
|
with context.push(BuiltinConcepts.EVALUATE_SOURCE, source, desc=desc) as sub_context:
|
|
if eval_body:
|
|
sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED)
|
|
|
|
if eval_where:
|
|
sub_context.protected_hints.add(BuiltinConcepts.EVAL_WHERE_REQUESTED)
|
|
|
|
if expect_success:
|
|
sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED)
|
|
sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED)
|
|
|
|
if stm:
|
|
for k, v in stm.items():
|
|
sub_context.add_to_short_term_memory(k, v)
|
|
|
|
# disable all evaluators but the requested ones
|
|
if evaluators != "all":
|
|
from evaluators.BaseEvaluator import BaseEvaluator
|
|
sub_context.add_preprocess(BaseEvaluator.PREFIX + "*", enabled=False)
|
|
for evaluator in evaluators:
|
|
sub_context.add_preprocess(BaseEvaluator.PREFIX + evaluator, enabled=True)
|
|
|
|
user_input = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.USER_INPUT, body=source))
|
|
ret = sheerka.execute(sub_context, [user_input], EVAL_STEPS)
|
|
sub_context.add_values(return_values=ret)
|
|
|
|
return ret
|
|
|
|
|
|
def get_lexer_nodes(return_values, start, tokens):
|
|
"""
|
|
From a parser result, return the corresponding LexerNode
|
|
either ConceptNode, UnrecognizedTokensNode or SourceCodeNode
|
|
:param return_values:
|
|
:param start:
|
|
:param tokens:
|
|
:return: list of list (list of concept node sequence)
|
|
"""
|
|
|
|
lexer_nodes = []
|
|
for ret_val in return_values:
|
|
if ret_val.who == "parsers.Python":
|
|
|
|
if ret_val.body.source.strip().isidentifier():
|
|
# Discard SourceCodeNode which seems to be a concept name
|
|
# It may be a wrong idea, so let's see
|
|
continue
|
|
|
|
end = start + len(tokens) - 1
|
|
lexer_nodes.append(
|
|
[SourceCodeNode(start,
|
|
end,
|
|
tokens,
|
|
ret_val.body.source,
|
|
python_node=ret_val.body.body,
|
|
return_value=ret_val)])
|
|
|
|
elif ret_val.who == "parsers.ExactConcept":
|
|
concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body]
|
|
end = start + len(tokens) - 1
|
|
for concept in concepts:
|
|
lexer_nodes.append([ConceptNode(concept, start, end, tokens, ret_val.body.source)])
|
|
|
|
elif ret_val.who in ("parsers.BnfNode", "parsers.SyaNode", "parsers.AtomNode"):
|
|
nodes = [node for node in ret_val.body.body]
|
|
for node in nodes:
|
|
node.start += start
|
|
node.end += start
|
|
|
|
# but append the whole sequence if when it's a sequence
|
|
lexer_nodes.append(nodes)
|
|
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
return lexer_nodes
|
|
|
|
|
|
def ensure_evaluated(context, concept, eval_body=True):
|
|
"""
|
|
Evaluate a concept is not already evaluated
|
|
:param context:
|
|
:param concept:
|
|
:param eval_body:
|
|
:return:
|
|
"""
|
|
if concept.metadata.is_evaluated:
|
|
return concept
|
|
|
|
# do not try to evaluate concept that are not fully initialized
|
|
for var in concept.metadata.variables:
|
|
# to code
|
|
if var[1] is None and \
|
|
var[0] not in concept.compiled and \
|
|
(var[0] not in concept.values or concept.get_value(var[0]) == NotInit):
|
|
return concept
|
|
|
|
evaluated = context.sheerka.evaluate_concept(context, concept, eval_body=eval_body)
|
|
return evaluated
|
|
|
|
|
|
def get_lexer_nodes_from_unrecognized(context, unrecognized_tokens_node, parsers):
|
|
"""
|
|
Using parsers, try to recognize concepts from source
|
|
:param context:
|
|
:param unrecognized_tokens_node:
|
|
:param parsers:
|
|
:return:
|
|
"""
|
|
|
|
res = parse_unrecognized(context, unrecognized_tokens_node.source, parsers)
|
|
res = only_parsers_results(context, res)
|
|
|
|
if not res.status:
|
|
return None
|
|
|
|
return get_lexer_nodes(res.body.body, unrecognized_tokens_node.start, unrecognized_tokens_node.tokens)
|
|
|
|
|
|
def update_compiled(context, concept, errors, parsers=None):
|
|
"""
|
|
recursively iterate thru concept.compiled to replace LexerNode into concepts or list of ReturnValueConcept
|
|
When parsing using a LexerNodeParser (SyaNodeParser, BnfNodeParser...)
|
|
the result will be a LexerNode.
|
|
In the specific case of a ConceptNode, the compiled variables will also be LexerNode (UnrecognizedTokensNode...)
|
|
This function iterate thru the compile to transform these nodes into concept of compiled AST
|
|
:param context:
|
|
:param concept:
|
|
:param errors: a list the must be initialized by the caller
|
|
:param parsers: to customize the parsers to use
|
|
:return:
|
|
"""
|
|
|
|
sheerka = context.sheerka
|
|
parsers = parsers or PARSERS
|
|
|
|
def _validate_concept(c):
|
|
"""
|
|
Recursively browse the compiled properties in order to find unrecognized
|
|
:param c:
|
|
:return:
|
|
"""
|
|
for k, v in c.compiled.items():
|
|
if isinstance(v, Concept):
|
|
_validate_concept(v)
|
|
|
|
elif isinstance(v, SourceCodeWithConceptNode):
|
|
from parsers.PythonWithConceptsParser import PythonWithConceptsParser
|
|
parser_helper = PythonWithConceptsParser()
|
|
res = parser_helper.parse_nodes(context, v.get_all_nodes())
|
|
if res.status:
|
|
c.compiled[k] = [res]
|
|
else:
|
|
errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'"))
|
|
|
|
elif isinstance(v, UnrecognizedTokensNode):
|
|
res = parse_unrecognized(context, v.source, parsers)
|
|
res = only_successful(context, res) # only key successful parsers
|
|
if res.status:
|
|
c.compiled[k] = res.body.body
|
|
else:
|
|
errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'"))
|
|
|
|
def _get_source(compiled, var_name):
|
|
if var_name not in compiled:
|
|
return None
|
|
if not isinstance(compiled[var_name], list):
|
|
return None
|
|
if not len(compiled[var_name]) == 1:
|
|
return None
|
|
if not sheerka.isinstance(compiled[var_name][0], BuiltinConcepts.RETURN_VALUE):
|
|
return None
|
|
if not sheerka.isinstance(compiled[var_name][0].body, BuiltinConcepts.PARSER_RESULT):
|
|
return None
|
|
if compiled[var_name][0].body.name == "parsers.ShortTermMemory":
|
|
return None
|
|
|
|
return compiled[var_name][0].body.source
|
|
|
|
_validate_concept(concept)
|
|
|
|
# Special case where the values of the variables are the names of the variable
|
|
# example : Concept("a plus b").def_var("a").def_var("b")
|
|
# and the user has entered 'a plus b'
|
|
# Chances are that we are talking about the concept itself, and not an instantiation (like '10 plus 2')
|
|
# This means that 'a' and 'b' don't have any real value
|
|
if len(concept.metadata.variables) > 0:
|
|
for name, value in concept.metadata.variables:
|
|
if _get_source(concept.compiled, name) != name:
|
|
break
|
|
else:
|
|
concept.metadata.is_evaluated = True
|
|
|
|
|
|
def get_names(sheerka, concept_node):
|
|
"""
|
|
Finds all the names referenced by the concept_node
|
|
:param sheerka:
|
|
:param concept_node:
|
|
:return:
|
|
"""
|
|
unreferenced_names_visitor = UnreferencedNamesVisitor(sheerka)
|
|
unreferenced_names_visitor.visit(concept_node)
|
|
return list(unreferenced_names_visitor.names)
|
|
|
|
|
|
def extract_predicates(sheerka, expression, variables_to_include, variables_to_exclude):
|
|
"""
|
|
from a given expression and a variable (or list of variables)
|
|
tries to find out all the predicates referencing the(se) variable(s), and the(se) variable(s) solely
|
|
for example
|
|
exp : isinstance(a, int) and isinstance(b, str)
|
|
will return 'isinstance(a, int)' if variable_name == 'a'
|
|
:param sheerka:
|
|
:param expression:
|
|
:param variables_to_include:
|
|
:param variables_to_exclude:
|
|
:return: list of predicates
|
|
"""
|
|
|
|
if len(variables_to_include) == 0:
|
|
return []
|
|
|
|
def _get_predicates(_nodes):
|
|
_predicates = []
|
|
for _node in _nodes:
|
|
python_node = ast.Expression(body=core.ast.nodes.concept_to_python(_node))
|
|
python_node = ast.fix_missing_locations(python_node)
|
|
_predicates.append(python_node)
|
|
return _predicates
|
|
|
|
if isinstance(expression, str):
|
|
node = ast.parse(expression, mode="eval")
|
|
else:
|
|
return NotImplementedError()
|
|
|
|
concept_node = core.ast.nodes.python_to_concept(node)
|
|
main_op = concept_node.get_value("body")
|
|
|
|
return _get_predicates(_extract_predicates(sheerka, main_op, variables_to_include, variables_to_exclude))
|
|
|
|
|
|
def _extract_predicates(sheerka, node, variables_to_include, variables_to_exclude):
|
|
predicates = []
|
|
|
|
def _matches(_names, to_include, to_exclude):
|
|
_res = None
|
|
for n in _names:
|
|
if n in to_include and _res is None:
|
|
_res = True
|
|
if n in to_exclude:
|
|
_res = False
|
|
return _res
|
|
|
|
if node.node_type == "Compare":
|
|
if node.get_value("left").node_type == "Name":
|
|
"""Simple case of one comparison"""
|
|
comparison_name = sheerka.objvalue(node.get_value("left"))
|
|
if comparison_name in variables_to_include and comparison_name not in variables_to_exclude:
|
|
predicates.append(node)
|
|
else:
|
|
"""The left part is an expression"""
|
|
res = _extract_predicates(sheerka, node.get_value("left"), variables_to_include, variables_to_exclude)
|
|
if len(res) > 0:
|
|
predicates.append(node)
|
|
elif node.node_type == "Call":
|
|
"""Simple case predicate"""
|
|
call_node = node if isinstance(node, CallNodeConcept) else CallNodeConcept().update_from(node)
|
|
args = list(call_node.get_args_names(sheerka))
|
|
if _matches(args, variables_to_include, variables_to_exclude):
|
|
predicates.append(node)
|
|
elif node.node_type == "UnaryOp" and node.get_value("op").node_type == "Not":
|
|
"""Simple case of negation"""
|
|
res = _extract_predicates(sheerka, node.get_value("operand"), variables_to_include, variables_to_exclude)
|
|
if len(res) > 0:
|
|
predicates.append(node)
|
|
elif node.node_type == "BinOp":
|
|
names = get_names(sheerka, node)
|
|
if _matches(names, variables_to_include, variables_to_exclude):
|
|
predicates.append(node)
|
|
elif node.node_type == "BoolOp":
|
|
all_op = True
|
|
temp_res = []
|
|
for op in node.get_value("values").body:
|
|
res = _extract_predicates(sheerka, op, variables_to_include, variables_to_exclude)
|
|
if len(res) == 0:
|
|
all_op = False
|
|
else:
|
|
temp_res.extend(res)
|
|
|
|
if all_op:
|
|
predicates.append(node)
|
|
else:
|
|
for res in temp_res:
|
|
predicates.append(res)
|
|
|
|
return predicates
|
|
|
|
|
|
def add_to_ret_val(sheerka, context, return_values, concept_key):
|
|
concept = sheerka.new(concept_key)
|
|
ret_val = sheerka.ret(context.who, True, concept)
|
|
return_values.append(ret_val)
|
|
return return_values
|
|
|
|
|
|
def remove_from_ret_val(sheerka, return_values, concept_key):
|
|
to_remove = []
|
|
for ret_val in return_values:
|
|
if ret_val.status and sheerka.isinstance(ret_val.body, concept_key):
|
|
to_remove.append(ret_val)
|
|
|
|
for item in to_remove:
|
|
return_values.remove(item)
|
|
|
|
return return_values
|
|
|
|
|
|
def set_is_evaluated(concepts, check_nb_variables=False):
|
|
"""
|
|
set is_evaluated to True
|
|
:param concepts:
|
|
:param check_nb_variables: only set is_evaluated if the concept has variables
|
|
:return:
|
|
"""
|
|
if concepts is None:
|
|
return
|
|
|
|
if hasattr(concepts, "__iter__"):
|
|
for c in concepts:
|
|
if not check_nb_variables or check_nb_variables and len(c.metadata.variables) > 0:
|
|
c.metadata.is_evaluated = True
|
|
else:
|
|
if not check_nb_variables or check_nb_variables and len(concepts.metadata.variables) > 0:
|
|
concepts.metadata.is_evaluated = True
|