Files
Sheerka-Old/src/parsers/BnfNodeParser.py
T

1171 lines
42 KiB
Python

#####################################################################################################
# This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio)
# I don't directly use the project, but it helped me figure out
# what to do.
# Dejanović I., Milosavljević G., Vaderna R.:
# Arpeggio: A flexible PEG parser for Python,
# Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004
#####################################################################################################
from collections import defaultdict
from dataclasses import dataclass
import core.utils
from cache.Cache import Cache
from core import builtin_helpers
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept, DEFINITION_TYPE_BNF, DoNotResolve, ConceptParts
from core.sheerka.services.SheerkaExecute import ParserInput
from core.tokenizer import Tokenizer, Token, TokenKind
from parsers.BaseNodeParser import BaseNodeParser, LexerNode, UnrecognizedTokensNode, ConceptNode, GrammarErrorNode
from parsers.BaseParser import ErrorNode
PARSERS = ["AtomNode", "SyaNode", "Python"]
@dataclass
class ConceptParsingError(ErrorNode):
concept: Concept
class NonTerminalNode(LexerNode):
"""
Returned by the BnfNodeParser
"""
def __init__(self, parsing_expression, start, end, tokens, children=None):
super().__init__(start, end, tokens)
self.parsing_expression = parsing_expression
self.children = children
def __repr__(self):
name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__
if len(self.children) > 0:
sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")"
else:
sub_names = ""
return name + sub_names
def __eq__(self, other):
if not isinstance(other, NonTerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.children == other.children
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.children))
class TerminalNode(LexerNode):
"""
Returned by the BnfNodeParser
"""
def __init__(self, parsing_expression, start, end, value):
super().__init__(start, end, source=value)
self.parsing_expression = parsing_expression
self.value = value
def __repr__(self):
name = self.parsing_expression.rule_name or ""
return name + f"'{self.value}'"
def __eq__(self, other):
if not isinstance(other, TerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.value == other.value
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.value))
class ParsingExpression:
def __init__(self, *args, **kwargs):
self.elements = args
nodes = kwargs.get('nodes', [])
if not hasattr(nodes, '__iter__'):
nodes = [nodes]
self.nodes = nodes
self.rule_name = kwargs.get('rule_name', '')
def __eq__(self, other):
if not isinstance(other, ParsingExpression):
return False
return self.rule_name == other.rule_name and self.elements == other.elements
def __hash__(self):
return hash((self.rule_name, self.elements))
def parse(self, parser):
return self._parse(parser)
def add_rule_name_if_needed(self, text):
return text + "=" + self.rule_name if self.rule_name else text
class ConceptExpression(ParsingExpression):
"""
Will match a concept
It used only for rule definition
When the grammar is created, it is replaced by the actual concept
"""
def __init__(self, concept, rule_name=""):
super().__init__(rule_name=rule_name)
self.concept = concept
def __repr__(self):
return self.add_rule_name_if_needed(f"{self.concept}")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, ConceptExpression):
return False
if isinstance(self.concept, Concept):
return self.concept.name == other.concept.name
# when it's only the name of the concept
return self.concept == other.concept
def __hash__(self):
return hash((self.concept, self.rule_name))
def _parse(self, parser_helper):
node = self.nodes[0].parse(parser_helper)
if node is None:
return None
return NonTerminalNode(self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
class Sequence(ParsingExpression):
"""
Will match sequence of parser expressions in exact order they are defined.
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = parser_helper.pos
children = []
for e in self.nodes:
node = e.parse(parser_helper)
if node is None:
return None
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
return NonTerminalNode(self,
init_pos,
end_pos,
parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class OrderedChoice(ParsingExpression):
"""
Will match one among multiple
It will stop at the first match (so the order of definition is important)
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
for e in self.nodes:
node = e.parse(parser_helper)
if node:
return NonTerminalNode(self,
init_pos,
node.end,
parser_helper.parser.parser_input.tokens[init_pos: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
return None
def __repr__(self):
to_str = "| ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class Optional(ParsingExpression):
"""
Will match or not the elements
if many matches, will choose longest one
If you need order, use Optional(OrderedChoice)
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
selected_node = NonTerminalNode(self, parser_helper.pos, -1, [], []) # means that nothing is found
for e in self.nodes:
node = e.parse(parser_helper)
if node:
if node.end > selected_node.end:
selected_node = NonTerminalNode(
self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
if selected_node.end != -1:
parser_helper.seek(selected_node.end)
parser_helper.next_token() # eat the tokens found
return selected_node
def __repr__(self):
if len(self.elements) == 1:
return f"{self.elements[0]}?"
else:
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})?")
class Repetition(ParsingExpression):
"""
Base class for all repetition-like parser expressions (?,*,+)
Args:
eolterm(bool): Flag that indicates that end of line should
terminate repetition match.
"""
def __init__(self, *elements, **kwargs):
super(Repetition, self).__init__(*elements, **kwargs)
self.sep = kwargs.get('sep', None)
class ZeroOrMore(Repetition):
"""
ZeroOrMore will try to match parser expression specified zero or more
times. It will never fail.
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0:
return NonTerminalNode(self, init_pos, -1, [], [])
return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})*")
class OneOrMore(Repetition):
"""
OneOrMore will try to match parser expression specified one or more times.
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0: # if nothing is found, it's an error
return None
return NonTerminalNode(self,
init_pos,
end_pos,
parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})+")
class UnorderedGroup(Repetition):
"""
Will try to match all of the parsing expression in any order.
"""
def _parse(self, parser):
raise NotImplementedError()
# def __repr__(self):
# to_str = ", ".join(repr(n) for n in self.elements)
# return f"({to_str})#"
class Match(ParsingExpression):
"""
Base class for all classes that will try to match something from the input.
"""
def __init__(self, rule_name, root=False):
super(Match, self).__init__(rule_name=rule_name, root=root)
def parse(self, parser):
result = self._parse(parser)
return result
class StrMatch(Match):
"""
Matches a literal
"""
def __init__(self, to_match, rule_name="", ignore_case=True, skip_whitespace=True):
super(Match, self).__init__(rule_name=rule_name)
self.to_match = to_match
self.ignore_case = ignore_case
self.skip_white_space = skip_whitespace
def __repr__(self):
return self.add_rule_name_if_needed(f"'{self.to_match}'")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, StrMatch):
return False
return self.to_match == other.to_match and self.ignore_case == other.ignore_case
def _parse(self, parser_helper):
token = parser_helper.get_token()
m = token.str_value.lower() == self.to_match.lower() if self.ignore_case \
else token.strip_quote == self.to_match
if m:
node = TerminalNode(self, parser_helper.pos, parser_helper.pos, token.str_value)
parser_helper.next_token(self.skip_white_space)
return node
return None
# class RegExMatch(Match):
# '''
# This Match class will perform input matching based on Regular Expressions.
#
# Args:
# to_match (regex string): A regular expression string to match.
# It will be used to create regular expression using re.compile.
# ignore_case(bool): If case insensitive match is needed.
# Default is None to support propagation from global parser setting.
# multiline(bool): allow regex to works on multiple lines
# (re.DOTALL flag). Default is None to support propagation from
# global parser setting.
# str_repr(str): A string that is used to represent this regex.
# re_flags: flags parameter for re.compile if neither ignore_case
# or multiple are set.
#
# '''
# def __init__(self, to_match, rule_name='', root=False, ignore_case=None,
# multiline=None, str_repr=None, re_flags=re.MULTILINE):
# super(RegExMatch, self).__init__(rule_name, root)
# self.to_match_regex = to_match
# self.ignore_case = ignore_case
# self.multiline = multiline
# self.explicit_flags = re_flags
#
# self.to_match = str_repr if str_repr is not None else to_match
#
# def compile(self):
# flags = self.explicit_flags
# if self.multiline is True:
# flags |= re.DOTALL
# if self.multiline is False and flags & re.DOTALL:
# flags -= re.DOTALL
# if self.ignore_case is True:
# flags |= re.IGNORECASE
# if self.ignore_case is False and flags & re.IGNORECASE:
# flags -= re.IGNORECASE
# self.regex = re.compile(self.to_match_regex, flags)
#
# def __str__(self):
# return self.to_match
#
# def __unicode__(self):
# return self.__str__()
#
# def _parse(self, parser):
# c_pos = parser.position
# m = self.regex.match(parser.input, c_pos)
# if m:
# matched = m.group()
# if parser.debug:
# parser.dprint(
# "++ Match '%s' at %d => '%s'" %
# (matched, c_pos, parser.context(len(matched))))
# parser.position += len(matched)
# if matched:
# return Terminal(self, c_pos, matched, extra_info=m)
# else:
# if parser.debug:
# parser.dprint("-- NoMatch at {}".format(c_pos))
# parser._nm_raise(self, c_pos, parser)
class ParsingExpressionVisitor:
"""
visit ParsingExpression
"""
STOP = "##_Stop_##"
def visit(self, parsing_expression):
name = parsing_expression.__class__.__name__
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(parsing_expression)
def generic_visit(self, parsing_expression):
if hasattr(self, "visit_all"):
self.visit_all(parsing_expression)
for node in parsing_expression.elements:
if isinstance(node, Concept):
res = self.visit(ConceptExpression(node.key or node.name))
elif isinstance(node, str):
res = self.visit(StrMatch(node))
else:
res = self.visit(node)
if res == self.STOP:
return
class BnfNodeFirstTokenVisitor(ParsingExpressionVisitor):
def __init__(self, sheerka):
self.sheerka = sheerka
self.first_tokens = None
def add_first_token(self, first_token):
if not self.first_tokens:
self.first_tokens = [first_token]
else:
self.first_tokens.append(first_token)
def visit_ConceptExpression(self, pe):
concept = self.sheerka.get_by_key(pe.concept) if isinstance(pe.concept, str) else pe.concept
if self.sheerka.is_known(concept):
self.add_first_token(core.utils.str_concept(concept, drop_name=True))
return self.STOP
def visit_StrMatch(self, pe):
if not pe.to_match:
return
self.add_first_token(pe.to_match)
return self.STOP
def visit_OrderedChoice(self, parsing_expression):
for node in parsing_expression.elements:
self.visit(node)
return self.STOP
class BnfConceptParserHelper:
def __init__(self, parser):
self.parser = parser
self.debug = []
self.errors = []
self.sequence = []
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
self.has_unrecognized = False
self.bnf_parsed = False
self.forked = []
self.token = None
self.pos = -1
def __repr__(self):
return f"BnfConceptParserHelper({self.sequence})"
def __eq__(self, other):
if id(self) == id(other):
return True
if not isinstance(other, BnfConceptParserHelper):
return False
return self.sequence == other.sequence and self.errors == other.errors
def __hash__(self):
return len(self.sequence) + len(self.errors)
def get_token(self) -> Token:
return self.token
def next_token(self, skip_whitespace=True):
if self.token and self.token.type == TokenKind.EOF:
return False
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
if skip_whitespace:
while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE:
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
return self.token.type != TokenKind.EOF
def seek(self, pos):
self.pos = pos
self.token = self.parser.parser_input.tokens[self.pos]
def has_error(self):
return len(self.errors) > 0
def is_locked(self):
return self.parser.parser_input.pos <= self.pos or self.has_error()
def eat_concept(self, concept, token):
if self.is_locked():
return
self.debug.append(concept)
self.manage_unrecognized()
for forked in self.forked:
# manage the fact that some clone may have been forked
forked.eat_concept(concept, token)
# init
parsing_expression = self.parser.get_parsing_expression(self.parser.context, concept)
if not isinstance(parsing_expression, ParsingExpression):
self.debug.append(concept)
error_msg = f"Failed to parse concept '{concept}'"
if parsing_expression is not None:
error_msg += f". Reason: '{parsing_expression}'"
self.errors.append(GrammarErrorNode(error_msg))
return
self.pos = self.parser.parser_input.pos
self.token = self.parser.parser_input.tokens[self.pos]
# parse
node = parsing_expression.parse(self)
if node is not None and node.end != -1:
self.sequence.append(self.create_concept_node(concept, node))
self.pos = node.end
self.bnf_parsed = True
else:
self.debug.append(("Rewind", token))
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
self.pos = self.parser.parser_input.pos # reset position
def eat_unrecognized(self, token):
if self.is_locked():
return
self.debug.append(token)
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
def manage_unrecognized(self):
if self.unrecognized_tokens.is_empty():
return
# do not put empty UnrecognizedToken in out
if self.unrecognized_tokens.is_whitespace():
self.unrecognized_tokens.reset()
return
self.unrecognized_tokens.fix_source()
# try to recognize concepts
nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized(
self.parser.context,
self.unrecognized_tokens,
PARSERS)
if nodes_sequences:
instances = [self]
for i in range(len(nodes_sequences) - 1):
clone = self.clone()
instances.append(clone)
self.forked.append(clone)
for instance, node_sequence in zip(instances, nodes_sequences):
for node in node_sequence:
instance.sequence.append(node)
if isinstance(node, UnrecognizedTokensNode) or \
hasattr(node, "unrecognized_tokens") and node.unrecognized_tokens:
instance.has_unrecognized = True
instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
else:
self.sequence.append(self.unrecognized_tokens)
self.has_unrecognized = True
# create another instance
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
def clone(self):
clone = BnfConceptParserHelper(self.parser)
clone.debug = self.debug[:]
self.errors = self.errors[:]
clone.sequence = self.sequence[:]
clone.pos = self.pos
clone.unrecognized_tokens = self.unrecognized_tokens.clone()
return clone
def finalize(self):
if self.bnf_parsed > 0:
self.manage_unrecognized()
for forked in self.forked:
# manage that some clones may have been forked
forked.finalize()
def create_concept_node(self, template, underlying):
sheerka = self.parser.context.sheerka
key = (template.key, template.id) if template.id else template.key
concept = sheerka.new(key)
concept = self.finalize_concept(sheerka, concept, underlying)
concept_node = ConceptNode(
concept,
underlying.start,
underlying.end,
self.parser.parser_input.tokens[underlying.start: underlying.end + 1],
None,
underlying)
return concept_node
def finalize_concept(self, sheerka, concept, underlying, init_empty_body=True):
"""
Updates the properties of the concept
Goes in recursion if the property is a concept
"""
# this cache is to make sure that we return the same concept for the same ConceptExpression
_underlying_value_cache = {}
def _add_prop(_concept, prop_name, value):
"""
Adds a new entry,
makes a list if the property already exists
"""
if prop_name not in _concept.compiled or _concept.compiled[prop_name] is None:
# new entry
_concept.compiled[prop_name] = value
else:
# make a list if there was a value
previous_value = _concept.compiled[prop_name]
if isinstance(previous_value, list):
previous_value.append(value)
else:
new_value = [previous_value, value]
_concept.compiled[prop_name] = new_value
def _look_for_concept_match(_underlying):
"""
At some point, there is either an StrMatch or a ConceptMatch,
that allowed the recognition.
Look for the ConceptMatch, with recursion if needed
"""
if isinstance(_underlying.parsing_expression, ConceptExpression):
return _underlying
if not isinstance(_underlying, NonTerminalNode):
return None
if len(_underlying.children) != 1:
return None
return _look_for_concept_match(_underlying.children[0])
def _get_underlying_value(_underlying):
concept_match_node = _look_for_concept_match(_underlying)
if concept_match_node:
# the value is a concept
if id(concept_match_node) in _underlying_value_cache:
result = _underlying_value_cache[id(concept_match_node)]
else:
ref_tpl = concept_match_node.parsing_expression.concept
new = sheerka.new_from_template(ref_tpl, ref_tpl.key)
result = self.finalize_concept(sheerka, new, concept_match_node.children[0], init_empty_body)
_underlying_value_cache[id(concept_match_node)] = result
else:
# the value is a string
result = DoNotResolve(_underlying.source)
return result
def _process_rule_name(_concept, _underlying):
if _underlying.parsing_expression.rule_name:
value = _get_underlying_value(_underlying)
_add_prop(_concept, _underlying.parsing_expression.rule_name, value)
_concept.metadata.need_validation = True
if isinstance(_underlying, NonTerminalNode):
for child in _underlying.children:
_process_rule_name(_concept, child)
if init_empty_body and concept.metadata.body is None:
value = _get_underlying_value(underlying)
concept.compiled[ConceptParts.BODY] = value
if underlying.parsing_expression.rule_name:
_add_prop(concept, underlying.parsing_expression.rule_name, value)
# KSI : Why don't we set concept.metadata.need_validation to True ?
if isinstance(underlying, NonTerminalNode):
for node in underlying.children:
_process_rule_name(concept, node)
return concept
@dataclass
class UnderConstruction:
concept_id: str
class BnfNodeParser(BaseNodeParser):
def __init__(self, **kwargs):
super().__init__("BnfNode", 50, **kwargs)
if 'sheerka' in kwargs:
sheerka = kwargs.get("sheerka")
self.concepts_grammars = sheerka.concepts_grammars
else:
self.concepts_grammars = Cache()
self.ignore_case = True
@staticmethod
def _is_eligible(concept):
"""
Predicate that select concepts that must handled by AtomNodeParser
:param concept:
:return:
"""
return concept.metadata.definition_type == DEFINITION_TYPE_BNF
@staticmethod
def get_valid(parsers_helpers):
valid_parser_helpers = []
for parser_helper in parsers_helpers:
if not parser_helper.bnf_parsed or parser_helper.has_error():
continue
if parser_helper in valid_parser_helpers:
continue
valid_parser_helpers.append(parser_helper)
return valid_parser_helpers
def get_concepts_sequences(self):
"""
Main method that parses the tokens and extract the concepts
:return:
"""
def _add_forked_to_concept_parser_helpers():
# check that if some new InfixToPostfix are created
for parser in concept_parser_helpers:
if len(parser.forked) > 0:
forked.extend(parser.forked)
parser.forked.clear()
if len(forked) > 0:
concept_parser_helpers.extend(forked)
forked.clear()
def _get_longest(parser_helpers):
# when there is a match with several concepts
# on keep the ones that eat the more tokens
by_end_pos = defaultdict(list)
for helper in parser_helpers:
by_end_pos[helper.pos].append(helper)
return by_end_pos[max(by_end_pos)]
forked = []
concept_parser_helpers = [BnfConceptParserHelper(self)]
while self.parser_input.next_token(False):
token = self.parser_input.token
try:
concepts = self.get_concepts(token, self._is_eligible, strip_quotes=False)
if not concepts:
for concept_parser in concept_parser_helpers:
concept_parser.eat_unrecognized(token)
continue
if len(concepts) == 1:
for concept_parser in concept_parser_helpers:
concept_parser.eat_concept(concepts[0], token)
continue
# make the cartesian product
temp_res = []
for concept_parser in concept_parser_helpers:
if concept_parser.is_locked():
# It means that it already eat the token
# so simply add it, do not clone
temp_res.append(concept_parser)
continue
for concept in concepts:
clone = concept_parser.clone()
temp_res.append(clone)
clone.eat_concept(concept, token)
# only keep the longest
concept_parser_helpers = _get_longest(temp_res)
finally:
_add_forked_to_concept_parser_helpers()
# make sure that remaining items in stack are moved to out
for concept_parser in concept_parser_helpers:
concept_parser.finalize()
_add_forked_to_concept_parser_helpers()
return concept_parser_helpers
def check_for_infinite_recursion(self, parsing_expression, already_found, only_first=False):
if isinstance(parsing_expression, ConceptExpression):
if parsing_expression.concept in already_found:
return True
already_found.add(parsing_expression.concept)
return self.check_for_infinite_recursion(parsing_expression.nodes[0], already_found, False)
if isinstance(parsing_expression, Sequence):
# for sequence, we need to check all nodes
if only_first:
nodes = [] if len(parsing_expression.nodes) == 0 else [parsing_expression.nodes[0]]
else:
nodes = parsing_expression.nodes
for node in nodes:
already_found_for_current_node = already_found.copy()
if self.check_for_infinite_recursion(node, already_found_for_current_node, False):
already_found.update(already_found_for_current_node)
return True
return False
if isinstance(parsing_expression, OrderedChoice):
# for ordered choice, if there is at least one node that does not resolved to a recursion
# we are safe
for node in parsing_expression.nodes:
already_found_for_current_node = already_found.copy()
if self.check_for_infinite_recursion(node, already_found, True):
already_found.update(already_found_for_current_node)
return True
else:
return False
return False
return False
def get_parsing_expression(self, context, concept):
if concept.id in self.concepts_grammars:
return self.concepts_grammars.get(concept.id)
grammar = self.concepts_grammars.copy()
to_resolve = {} # the key is the instance id of the parsing expression
isa_concepts = set()
self.resolve_concept_parsing_expression(context, concept, grammar, to_resolve, isa_concepts)
for _id, pe in to_resolve.items():
for i, node in enumerate(pe.nodes):
if isinstance(node, UnderConstruction):
pe.nodes[i] = grammar.get(node.concept_id)
concepts_in_recursion = set()
if self.check_for_infinite_recursion(pe, concepts_in_recursion):
cycle = context.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body={c.id for c in concepts_in_recursion})
for concept in concepts_in_recursion:
grammar[concept.id] = cycle
# Make sure you do not put isa concepts in cache
# why :
# twenties = 'twenty' number where number < 10
# hundreds = number 'hundred' where number < 99
# the concept of number depends on its utilisation
for concept_id in [c for c in grammar if c not in isa_concepts]:
self.concepts_grammars.put(concept_id, grammar[concept_id])
return self.concepts_grammars.get(concept.id)
def resolve_concept_parsing_expression(self, context, concept, grammar, to_resolve, isa_concepts):
if concept.id in grammar:
return grammar.get(concept.id)
desc = f"Get parsing expression for '{concept}'"
with context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as sub_context:
if not concept.bnf: # to save a function call. Not sure it worth it.
BaseNodeParser.ensure_bnf(sub_context, concept, self.name)
grammar[concept.id] = UnderConstruction(concept.id)
sheerka = context.sheerka
if concept.metadata.definition_type == DEFINITION_TYPE_BNF:
expression = concept.bnf
desc = f"Bnf concept detected. Resolving parsing expression '{expression}'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(expression=expression)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_resolve, isa_concepts)
ssc.add_values(return_values=resolved)
elif sheerka.isaset(context, concept):
desc = f"Concept is a group. Resolving parsing expression using 'isa'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(concept=concept)
isa_concepts.add(concept.id)
concepts_in_group = self.sheerka.get_set_elements(ssc, concept)
# concepts_in_group comes from a set, so the order of its elements is not guaranteed
# to avoid random failure (ie random CHICKEN_AND_EGG), we need to rearrange
# We also remove the root concept (the one from get_parsing_expression())
root_concept_as_set = set(context.search(
predicate=lambda ec: ec.action == BuiltinConcepts.INIT_BNF,
get_obj=lambda ec: ec.obj,
stop=lambda ec: ec.action != BuiltinConcepts.INIT_BNF)) # there only one item in the set
root_concept = list(root_concept_as_set)[0]
reordered = []
for c in concepts_in_group:
if c.id == root_concept.id:
continue
# I do not guaranty the same order every time, but I minimize the ChickenAndEgg random issue
if c.metadata.definition_type == DEFINITION_TYPE_BNF or sheerka.isaset(ssc, c):
reordered.append(c)
else:
reordered.insert(0, c)
nodes = [ConceptExpression(c, rule_name=c.name) for c in reordered]
resolved = self.resolve_parsing_expression(ssc,
OrderedChoice(*nodes),
grammar,
to_resolve,
isa_concepts)
ssc.add_values(concepts_in_group=concepts_in_group)
ssc.add_values(return_values=resolved)
else:
desc = f"Concept is a simple concept."
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
tokens = Tokenizer(concept.name, yield_eof=False)
nodes = [StrMatch(token.strip_quote) for token in tokens]
expression = nodes[0] if len(nodes) == 1 else Sequence(nodes)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_resolve, isa_concepts)
grammar[concept.id] = resolved
if self.has_error:
sub_context.add_values(errors=self.error_sink)
return None
sub_context.add_values(return_values=resolved)
return resolved
def resolve_parsing_expression(self, context, expression, grammar, to_resolve, isa_concepts):
if isinstance(expression, str):
ret = StrMatch(expression, ignore_case=self.ignore_case)
elif not isinstance(expression, ParsingExpression):
return expression # escalate the error
elif isinstance(expression, ConceptExpression):
concept = self.get_concept(context, expression.concept)
expression.concept = concept
if not self.sheerka.is_known(concept):
unknown_concept = self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=concept)
return self.add_error(unknown_concept)
pe = self.resolve_concept_parsing_expression(context, concept, grammar, to_resolve, isa_concepts)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
#
# if isinstance(pe, UnderConstruction) and expression.concept.id == pe.concept_id:
# return pe # we are looking for ourself, just return it
if isinstance(pe, UnderConstruction):
to_resolve[id(expression)] = expression
expression.nodes = [pe]
expression.rule_name = expression.rule_name or concept.name
ret = expression
elif isinstance(expression, StrMatch):
ret = expression
if ret.ignore_case is None:
ret.ignore_case = self.ignore_case
elif isinstance(expression, Sequence) or \
isinstance(expression, OrderedChoice) or \
isinstance(expression, ZeroOrMore) or \
isinstance(expression, OneOrMore) or \
isinstance(expression, Optional):
ret = expression
ret.nodes = []
for e in ret.elements:
pe = self.resolve_parsing_expression(context, e, grammar, to_resolve, isa_concepts)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
if isinstance(pe, UnderConstruction):
to_resolve[id(ret)] = ret # remember that there is an unresolved parsing expression
ret.nodes.append(pe)
else:
ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False)
# Translate separator expression.
if isinstance(ret, Repetition) and expression.sep:
expression.sep = self.resolve_parsing_expression(context,
expression.sep,
grammar,
to_resolve,
isa_concepts)
return ret
def get_concept(self, context, concept):
if isinstance(concept, Concept):
return concept
if concept in context.concepts:
return context.concepts[concept]
return self.sheerka.get_by_key(concept)
def parse(self, context, parser_input: ParserInput):
"""
parser_input can be string, but text can also be an list of tokens
:param context:
:param parser_input:
:return:
"""
if not isinstance(parser_input, ParserInput):
return None
context.log(f"Parsing '{parser_input}' with BnfNode", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.NOT_FOR_ME,
body=parser_input.as_text(),
reason=BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
sequences = self.get_concepts_sequences()
valid_parser_helpers = self.get_valid(sequences)
if valid_parser_helpers is None:
# token error
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
if len(valid_parser_helpers) == 0:
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text()))
ret = []
for parser_helper in valid_parser_helpers:
ret.append(
self.sheerka.ret(
self.name,
not parser_helper.has_unrecognized,
self.sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=parser_input.as_text(),
body=parser_helper.sequence,
try_parsed=parser_helper.sequence)))
if len(ret) == 1:
self.log_result(context, parser_input.as_text(), ret[0])
return ret[0]
else:
self.log_multiple_results(context, parser_input.as_text(), ret)
return ret