Files
Sheerka-Old/src/core/sheerka/services/SheerkaExecute.py
T
kodjo 6cda2686fb Fixed #48 : RelationalExpressionParser: Implement relational operator parser
Fixed #49 : ExpressionParser: Implement ExpressionParser
Fixed #50 : Implement ReteConditionExprVisitor
Fixed #51 : Implement PythonConditionExprVisitor
Fixed #52 : SheerkaConceptManager: I can get and set concept property
2021-03-23 11:35:10 +01:00

792 lines
34 KiB
Python

import core.utils
from cache.FastCache import FastCache
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
from core.concept import ConceptParts
from core.global_symbols import NotFound, NO_MATCH
from core.sheerka.services.sheerka_service import BaseService
from core.tokenizer import Tokenizer, TokenKind, Token, Keywords
PARSE_STEPS = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
PARSE_AND_EVAL_STEPS = PARSE_STEPS + [BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION]
ALL_STEPS = PARSE_AND_EVAL_STEPS + [BuiltinConcepts.BEFORE_RENDERING,
BuiltinConcepts.RENDERING,
BuiltinConcepts.AFTER_RENDERING,
BuiltinConcepts.BEFORE_RULES_EVALUATION,
BuiltinConcepts.AFTER_RULES_EVALUATION]
class ParserInput:
"""
Helper class that tokenizes the input once for all
"""
def __init__(self, text, tokens=None, length=None, start=None, end=None, yield_oef=True):
self.text = text
self.tokens = tokens or None
if self.tokens:
# make sure tokens ends with EOF token
# and do not modify the original token list
if len(self.tokens) == 0:
self.tokens = [Token(TokenKind.EOF, "", 0, 1, 1)]
elif (last_token := self.tokens[-1]).type != TokenKind.EOF:
self.tokens = self.tokens + [Token(TokenKind.EOF,
"",
last_token.index + 1,
last_token.line,
last_token.column + 1)]
self.length = length # to be computed (again) in reset()
self.yield_oef = yield_oef
self.start = start or 0
if end is not None:
self.original_end = end # forced index of the last token
self.end = self.original_end # index of the last token => len(tokens) - 1 if full tokens
else:
self.original_end = self.end = None
self.sub_text = None
self.sub_tokens = None
self.pos = None
self.token = None
self.from_tokens = tokens is not None
def __repr__(self):
from_tokens = "from_tokens" if self.from_tokens else ""
return f"ParserInput({from_tokens}'{self.text}')"
def reset(self, yield_oef=None):
def _get_end_from_yield_eof(_length, _yield_oef):
return _length - 1 if _yield_oef else _length - 2
if yield_oef is None:
yield_oef = self.yield_oef
# make sure tokens is correctly initialized
if self.tokens is None:
# the eof if forced, but will not be yield if not set to.
self.tokens = list(Tokenizer(self.text, yield_eof=True))
self.length = len(self.tokens)
if self.original_end is None:
self.end = _get_end_from_yield_eof(self.length, yield_oef)
else:
self.end = self.original_end if self.original_end < self.length else \
_get_end_from_yield_eof(self.length, yield_oef)
self.pos = self.start - 1
self.token = None
return self
def as_text(self, custom_switcher=None, tracker=None):
if not self.tokens or self.end is None:
# as_text is requested before reset().
# It means that we want the original text
return self.text
if custom_switcher is None:
if self.sub_text:
return self.sub_text
if self.start == 0 and self.end == self.length - 1:
self.sub_text = self.text
return self.sub_text
self.sub_text = core.utils.get_text_from_tokens(self.tokens[self.start:self.end + 1])
return self.sub_text
else:
return core.utils.get_text_from_tokens(self.as_tokens(), custom_switcher, tracker)
def as_tokens(self):
if self.sub_tokens:
return self.sub_tokens
if self.start == 0 and self.end == self.length - 1:
self.sub_tokens = self.tokens
return self.sub_tokens
self.sub_tokens = self.tokens[self.start:self.end + 1]
return self.sub_tokens
def next_token(self, skip_whitespace=True):
self.pos += 1
if self.pos > self.end:
self.token = self.tokens[-1]
return False
self.token = self.tokens[self.pos]
if self.token.type == TokenKind.EOF:
return False
if skip_whitespace:
while self.token.type in (TokenKind.WHITESPACE, TokenKind.NEWLINE):
self.pos += 1
if self.pos > self.end:
return False
self.token = self.tokens[self.pos]
if self.pos <= self.end:
return True
else:
self.token = self.tokens[-1]
return False
def the_token_after(self, skip_whitespace=True):
"""
Returns the token after the current one
Never returns None (returns TokenKind.EOF instead)
"""
my_pos = self.pos + 1
if my_pos > self.end:
return self.tokens[-1]
if skip_whitespace:
while self.tokens[my_pos].type in (TokenKind.WHITESPACE, TokenKind.NEWLINE):
my_pos += 1
if my_pos > self.end:
return self.tokens[-1]
return self.tokens[my_pos]
def seek(self, pos):
"""
Move the token offset to position pos
:param pos:
:return: True is pos is a valid position False otherwise
"""
if pos < 0 or pos > self.end:
self.token = None
return False
self.pos = pos
self.token = self.tokens[self.pos]
return True
def is_empty(self):
if self.text.strip() == "":
return True
if self.end == self.start:
return True
if self.end and self.end == self.start + 1 and self.tokens[self.start].type == TokenKind.WHITESPACE:
return True
return False
class SheerkaExecute(BaseService):
"""
Manage the execution of a process flow
"""
NAME = "Execute"
PARSERS_INPUTS_ENTRY = "Execute:ParserInput" # entry for admin or internal variables
def __init__(self, sheerka):
# order must be after SheerkaEvaluateRules because of self.rules_evaluation_service
# order must be after ConceptManager because it needs concept bnf definitions
super().__init__(sheerka, order=15)
self.pi_cache = FastCache(default=lambda key: ParserInput(key), max_size=20)
self.instantiated_evaluators = None
self.evaluators_by_name = None
self.instantiated_parsers = None
self.parsers_by_name = None
self.preprocessed_items_old_values = []
# cache for all preregistered evaluator combination
# the key is the concatenation of the step and the name of evaluators in the group
# ex : BEFORE_EVALUATION|Python|Sya|Bnf
# The value is a tuple,
# The first entry is the grouped evaluators
# ex : {60 : [PythonEvaluator(), SyaEvaluator()], 50: [BnfEvaluator()]}
# The second entry are the sorted priorities ex [60, 50]
self.grouped_evaluators_cache = {}
# cache for preregistered parsers
# Same construction than the evaluators
# Except 1 : the key does not have a step component. It is simple the list of parsers' names
# Except 2 : we store the type of the parser, not its instance
self.grouped_parsers_cache = {}
self.rules_eval_service = None
def initialize(self):
self.sheerka.bind_service_method(self.execute, True, visible=False)
self.sheerka.bind_service_method(self.execute_rules, True, visible=False)
self.sheerka.bind_service_method(self.parse_unrecognized, False, visible=False)
self.sheerka.bind_service_method(self.parse_function, False, visible=False)
self.sheerka.bind_service_method(self.parse_python, False, visible=False)
self.sheerka.bind_service_method(self.parse_expression, False, visible=False)
self.reset_registered_evaluators()
self.reset_registered_parsers()
from core.sheerka.services.SheerkaEvaluateRules import SheerkaEvaluateRules
self.rules_eval_service = self.sheerka.services[SheerkaEvaluateRules.NAME]
def reset_state(self):
self.pi_cache.clear()
def reset_registered_evaluators(self):
# instantiate evaluators, once for all, only keep when it's enabled
self.instantiated_evaluators = [e_class() for e_class in self.sheerka.evaluators]
self.instantiated_evaluators = [e for e in self.instantiated_evaluators if e.enabled]
self.evaluators_by_name = {e.short_name: e for e in self.instantiated_evaluators}
# get default evaluators by process step
for process_step in ALL_STEPS:
self.grouped_evaluators_cache[f"{process_step}|__default"] = self.get_grouped(
[e for e in self.instantiated_evaluators if process_step in e.steps])
def reset_registered_parsers(self):
"""
Browse all parsers and only keep those which are enabled
:return:
"""
self.instantiated_parsers = [parser(sheerka=self.sheerka) for parser in self.sheerka.parsers.values()]
self.instantiated_parsers = [p for p in self.instantiated_parsers if p.enabled]
self.parsers_by_name = {p.short_name: p for p in self.instantiated_parsers}
self.grouped_parsers_cache["__default"] = self.get_grouped(self.instantiated_parsers, use_classes=True)
@staticmethod
def get_grouped(evaluators, use_classes=False):
"""
For a given list of evaluators,
group them by priorities
sort the priorities
:param evaluators:
:param use_classes: if True, store the class (the type) of the evaluator, not its instance
:return: tuple({priority: List of evaluators with this priority}, list of sorted priorities)
"""
grouped = {}
for evaluator in evaluators:
grouped.setdefault(evaluator.priority, []).append(type(evaluator) if use_classes else evaluator)
sorted_groups = sorted(grouped.keys(), reverse=True)
return grouped, sorted_groups
def preprocess(self, items, preprocess_definitions):
"""
Modifies the attributes of item
:param items: either a parser or an evaluator
:param preprocess_definitions: how to modify List of BuiltinConcepts.EVALUATOR_PRE_PROCESS
preprocess.get_value("preprocess_name") gives the name of the property to alter
preprocess.values() gives the alterations
ex:
Sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS, preprocess_name="parser_name", enabled=False)
Will disable parser 'parser_name'
:return:
"""
for preprocess in preprocess_definitions:
for item in items:
if self.matches(item.name, preprocess.get_value("preprocess_name")):
for var_name, value in preprocess.values().items():
if var_name == "preprocess_name":
continue
if hasattr(item, var_name):
self.preprocessed_items_old_values.append((item, var_name, getattr(item, var_name)))
setattr(item, var_name, value)
def get_evaluators(self, context, process_step):
"""
Returns the list of evaluators to use for a specific context need
:param context:
:param process_step:
:return:
"""
# Normal case, the evaluators are the default one
if not context.preprocess_evaluators and not context.preprocess:
return self.grouped_evaluators_cache[f"{process_step}|__default"]
# Other case, only use a subset of evaluators
selected = context.preprocess_evaluators
if selected and not context.preprocess:
key = str(process_step) + "|" + "|".join(selected)
try:
return self.grouped_evaluators_cache[key]
except KeyError:
evaluators = [self.evaluators_by_name[e] for e in selected if e in self.evaluators_by_name]
evaluators = [e for e in evaluators if process_step in e.steps] # check the process step
grouped = self.get_grouped(evaluators)
self.grouped_evaluators_cache[key] = grouped
return grouped
# Final case, evaluators attributes are modified by the context
evaluators = [self.evaluators_by_name[e] for e in selected if
e in self.evaluators_by_name] if selected else self.instantiated_evaluators
evaluators = [e for e in evaluators if process_step in e.steps] # check the process step
self.preprocess(evaluators, context.preprocess)
evaluators = [e for e in evaluators if e.enabled] # make sure they are still enabled
return self.get_grouped(evaluators)
def get_parsers(self, context):
"""
We cannot use a single instance of a parser shared among executions as it's common to have a parser
calling another parser or even calling itself
So the cache holds the parser classes or sorted priorities
:param context:
:return:
"""
def get_instances(from_cache):
grouped_instances = {priority: [p(sheerka=self.sheerka) for p in parsers_classes]
for priority, parsers_classes in from_cache[0].items()}
return grouped_instances, from_cache[1]
key = self.get_parsers_key(context)
if key:
try:
return key, *get_instances(self.grouped_parsers_cache[key])
except KeyError:
parsers = [self.parsers_by_name[p] for p in context.preprocess_parsers if p in self.parsers_by_name]
self.grouped_parsers_cache[key] = self.get_grouped(parsers, use_classes=True)
return key, *get_instances(self.grouped_parsers_cache[key])
# else, case where parsers attributes are modified by the context
# This a the case when we want to disable a specific parser, or change the order of priority
parsers = [self.parsers_by_name[p] for p in context.preprocess_parsers if p in self.parsers_by_name] \
if context.preprocess_parsers else self.instantiated_parsers
self.preprocess(parsers, context.preprocess)
parsers = [p for p in parsers if p.enabled] # only keep those that are still enabled
groups, sorted_priorities = self.get_grouped(parsers, use_classes=True)
return key, *get_instances((groups, sorted_priorities))
def get_parser_input(self, text, tokens=None):
"""
Returns new or existing parser input
:param text:
:param tokens:
:return:
"""
if isinstance(text, ParserInput):
return text
if tokens is None or self.pi_cache.has(text):
pi = self.pi_cache.get(text)
if pi is NotFound: # when CacheManager.cache_only is True
pi = ParserInput(text)
self.pi_cache.put(text, pi)
return ParserInput(text, tokens=pi.tokens,
length=pi.length) # new instance, but no need to tokenize the text again
key = text or core.utils.get_text_from_tokens(tokens)
pi = ParserInput(key, tokens=tokens, length=len(tokens))
self.pi_cache.put(key, pi)
return pi
@staticmethod
def get_parsers_key(context):
"""
From the context.preprocess_parsers and context.preprocess,
try to find a key to store the further results of the parsings
:param context:
:return:
"""
if not context.preprocess_parsers and not context.preprocess:
return "__default"
if context.preprocess_parsers and not context.preprocess:
return "|".join(context.preprocess_parsers)
return None
def call_parsers(self, context, return_values):
"""
Call all the parsers, ordered by priority
Possible return value for a parser:
None : indicate that you do no need to care about the result
ParserResult with status False : Success
ParserResult with status False : failed to parse, but the result will be reused by other parsers
NotForMe (status is False) : Failed to parse. Do no reuse the result
:param context:
:param return_values:
:return:
"""
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
# first make the distinguish between what is for the parsers and what is not
result = []
to_process = []
for r in return_values:
if not r.status or not self.sheerka.isinstance(r.body, BuiltinConcepts.USER_INPUT):
result.append(r)
else:
to_process.append(r)
if not to_process:
return result
# keep track of the originals user inputs, as they need to be removed at the end
user_inputs = to_process[:]
parsers_key, grouped_parsers, sorted_priorities = self.get_parsers(context)
stop_processing = False
for priority in sorted_priorities:
inputs_for_this_group = to_process[:]
for parser in grouped_parsers[priority]:
for return_value in inputs_for_this_group:
to_parse = self.get_parser_input(return_value.body.body) \
if self.sheerka.isinstance(return_value.body, BuiltinConcepts.USER_INPUT) \
else return_value.body
# if self.sheerka.log.isEnabledFor(logging.DEBUG):
# debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \
# else "'" + core.utils.get_text_from_tokens(to_parse) + "' as tokens"
# context.log(f"Parsing {debug_text}")
with context.push(BuiltinConcepts.PARSING,
{"parser": parser.name},
desc=f"Parsing using {parser.name}") as sub_context:
sub_context.add_inputs(to_parse=to_parse)
res = parser.parse(sub_context, to_parse)
if res is not None:
if hasattr(res, "__iter__"):
for r in res:
if r is None:
continue
r.parents = [return_value]
result.append(r)
if self.sheerka.isinstance(r.body, BuiltinConcepts.PARSER_RESULT):
# if a ParserResultConcept is returned, it will be used by the parsers
# of the following groups
to_process.append(r)
if r.status:
stop_processing = True
else:
res.parents = [return_value]
result.append(res)
if self.sheerka.isinstance(res.body, BuiltinConcepts.PARSER_RESULT):
# if a ParserResultConcept is returned, it will be used by the parsers
# of the following groups
to_process.append(res)
if res.status:
stop_processing = True
sub_context.add_values(return_values=res)
if stop_processing:
break # Do not try the other priorities if a match is found
result = core.utils.remove_list_from_list(result, user_inputs)
return result
def call_evaluators(self, context, return_values, process_step):
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
grouped_evaluators, sorted_priorities = self.get_evaluators(context, process_step)
iteration = 0
while True:
with context.push(process_step,
{"step": process_step, "iteration": iteration},
desc=f"iteration #{iteration}") as iteration_context:
simple_digest = return_values[:]
iteration_context.add_inputs(return_values=simple_digest)
for priority in sorted_priorities:
original_items = return_values[:]
evaluated_items = []
to_delete = set()
for evaluator in grouped_evaluators[priority]:
evaluator.reset()
sub_context_desc = f"Evaluating using {evaluator.name} ({priority=})"
with iteration_context.push(process_step,
{"step": process_step,
"iteration": iteration,
"evaluator": evaluator.name},
desc=sub_context_desc) as sub_context:
sub_context.add_inputs(return_values=original_items)
# process evaluators that work on one simple return value at the time
from evaluators.BaseEvaluator import OneReturnValueEvaluator
if isinstance(evaluator, OneReturnValueEvaluator):
debug_result = []
for item in original_items:
if evaluator.matches(sub_context, item):
# init the evaluator is possible
# KSI. 20201102 : Evaluators are now instantiated at startup,
# Can we move this section into reset_registered_evaluators()
if hasattr(evaluator, "init_evaluator") and not evaluator.is_initialized:
evaluator.init_evaluator(sub_context, original_items)
result = evaluator.eval(sub_context, item)
if result is None:
# match() was successful but nothing was done in eval
# most of the time, it's because extra checks are unsuccessful
debug_result.append({"input": item, "return_value": None})
continue
if id(result) == id(item):
# eval was successful, but we don't want to alter the processing flow
debug_result.append({"input": item, "return_value": result})
continue
# otherwise, item will be removed and replaced by result
to_delete.add(item)
if isinstance(result, list):
evaluated_items.extend(result)
elif isinstance(result, ReturnValueConcept):
evaluated_items.append(result)
else:
error = self.sheerka.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
evaluator=evaluator)
result = self.sheerka.ret("sheerka.process", False, error, parents=[item])
evaluated_items.append(result)
# TODO: Add a validation to make sure that item is somewhere in return_value.parents
debug_result.append({"input": item, "return_value": result})
else:
debug_result.append({"input": item, "return_value": NO_MATCH})
sub_context.add_values(return_values=debug_result)
# process evaluators that work on all return values
else:
if evaluator.matches(sub_context, original_items):
results = evaluator.eval(sub_context, original_items)
if results is None:
continue
if not isinstance(results, list):
results = [results]
for result in results:
if result.body != BuiltinConcepts.NO_RESULT:
evaluated_items.append(result)
to_delete.update(result.parents)
sub_context.add_values(return_values=results)
else:
sub_context.add_values(return_values=NO_MATCH)
return_values = evaluated_items
return_values.extend([item for item in original_items if item not in to_delete])
iteration_context.add_values(return_values=return_values[:])
# have we done something ?
to_compare = return_values[:]
if simple_digest == to_compare:
break
# inc the iteration and continue
iteration += 1
self.undo_preprocess()
return return_values
def execute(self, context, return_values, execution_steps):
"""
Executes process for all initial contexts
:param context:
:param return_values:
:param execution_steps:
:return:
"""
for step in execution_steps:
copy = return_values[:] if hasattr(return_values, "__iter__") else [return_values]
with context.push(BuiltinConcepts.PROCESSING,
{"step": step, "iteration": 0},
desc=f"{step=}") as sub_context:
sub_context.add_inputs(return_values=copy)
if step == BuiltinConcepts.PARSING:
return_values = self.call_parsers(sub_context, return_values)
else:
return_values = self.call_evaluators(sub_context, return_values, step)
has_changed = copy != return_values
if has_changed:
sub_context.log_result(return_values)
sub_context.add_values(return_values=return_values)
sub_context.add_values(has_changed=has_changed)
return return_values
def execute_rules(self, context, return_values, rules_steps, evaluation_steps):
""""
Executes the execution rules until no match is found
:param context:
:param return_values: input return values
:param rules_steps: steps are configurable
:param evaluation_steps: steps are configurable
:return: out return_values
"""
continue_execution = True
counter = 0
in_rete_memory = None
while continue_execution:
with context.push(BuiltinConcepts.PROCESSING, {"counter": counter}, desc=f"{counter=}") as sub_context:
# apply rule evaluation steps
for step in rules_steps:
if step == BuiltinConcepts.RULES_EVALUATION:
eval_res = self.rules_eval_service.evaluate_exec_rules(sub_context, return_values)
if not eval_res:
self.rules_eval_service.remove_from_rete_memory(return_values)
continue_execution = False
break
else:
in_rete_memory = return_values.copy()
return_values = eval_res
else:
return_values = self.call_evaluators(sub_context, return_values, step)
if not continue_execution:
break
# evaluate the result
return_values = [r.body.body.compiled_action for r in return_values]
while True:
copy = return_values[:]
for step in evaluation_steps:
return_values = self.call_evaluators(sub_context, return_values, step)
if copy == return_values[:]:
break
# evaluation is done. Remove object in Rete memory
self.rules_eval_service.remove_from_rete_memory(in_rete_memory)
counter += 1
return return_values
def undo_preprocess(self):
for item, var_name, value in self.preprocessed_items_old_values:
setattr(item, var_name, value)
self.preprocessed_items_old_values.clear()
@staticmethod
def matches(parser_or_evaluator_name, preprocessor_name):
if preprocessor_name.endswith("*"):
return parser_or_evaluator_name.startswith(preprocessor_name[:-1])
else:
return parser_or_evaluator_name == preprocessor_name
def parse_unrecognized(self, context, source, parsers, who=None, prop=None, filter_func=None):
"""
Try to recognize concepts or code from source using the given parsers
:param context:
:param source: ParserInput if possible
:param parsers:
:param who: who is asking the parsing ?
:param prop: Extra info, when parsing a property
:param filter_func: Once the result are found, call this function to filter them
:return:
"""
sheerka = context.sheerka
if prop:
action_context = {"prop": prop, "source": source}
desc = f"Parsing attribute '{prop}'"
else:
action_context = source
desc = f"Parsing '{source}'"
with context.push(BuiltinConcepts.PARSING, action_context, who=who, desc=desc) as sub_context:
# disable all parsers but the requested ones
if parsers != "all":
sub_context.preprocess_parsers = parsers
else:
sub_context.preprocess_parsers = None
if prop in (Keywords.WHERE, Keywords.PRE, ConceptParts.WHERE, ConceptParts.PRE, Keywords.WHEN):
sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED)
sub_context.add_inputs(source=source)
to_parse = sheerka.ret(context.who,
True,
sheerka.new(BuiltinConcepts.USER_INPUT, body=source))
res = sheerka.execute(sub_context, to_parse, PARSE_STEPS)
if filter_func:
res = filter_func(sub_context, res)
sub_context.add_values(return_values=res)
return res
def parse_function(self, context, source, tokens=None, start=0):
"""
Helper function that parses what is supposed to be a function
:param context:
:param source:
:param tokens:
:param start: start index for the source code node
:return:
"""
from parsers.BaseNodeParser import SourceCodeWithConceptNode
sheerka = context.sheerka
from parsers.FunctionParser import FunctionParser
parser = FunctionParser()
desc = f"Parsing function '{source}'"
with context.push(BuiltinConcepts.PARSE_CODE, source, desc=desc) as sub_context:
sheerka_execution = sheerka.services[SheerkaExecute.NAME]
res = parser.parse(sub_context, sheerka_execution.get_parser_input(source, tokens))
if not isinstance(res, list):
res = [res]
for r in [r for r in res if sheerka.isinstance(r.body, BuiltinConcepts.PARSER_RESULT)]:
r.body.body.start += start
r.body.body.end += start
if isinstance(r.body.body, SourceCodeWithConceptNode):
for n in [r.body.body.first, r.body.body.last] + r.body.body.nodes:
n.start += start
n.end += start
return res
def parse_python(self, context, source, desc=None):
"""
Helper function that parses what is known to be Python source code
:param context:
:param source:
:param desc: option description when creating the sub context
"""
from parsers.PythonParser import PythonParser
desc = desc or f"Compiling python '{source}'"
with context.push(BuiltinConcepts.PARSE_CODE,
{"language": "Python", "source": source},
desc) as sub_context:
parser_input = context.sheerka.services[SheerkaExecute.NAME].get_parser_input(source)
python_parser = PythonParser()
return python_parser.parse(sub_context, parser_input)
def parse_expression(self, context, source, desc=None):
"""
Helper function to parser expressions with AND, OR and NOT
"""
desc = desc or f"Parsing expression '{source}'"
with context.push(BuiltinConcepts.PARSE_CODE, source, desc) as sub_context:
parser_input = context.sheerka.services[SheerkaExecute.NAME].get_parser_input(source)
from parsers.ExpressionParser import ExpressionParser
expr_parser = ExpressionParser()
return expr_parser.parse(sub_context, parser_input)