Files
Sheerka-Old/src/parsers/BnfNodeParser.py
T

1549 lines
56 KiB
Python

#####################################################################################################
# This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio)
# I don't directly use the project, but it helped me figure out
# what to do.
# Dejanović I., Milosavljević G., Vaderna R.:
# Arpeggio: A flexible PEG parser for Python,
# Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004
#####################################################################################################
from collections import defaultdict
from dataclasses import dataclass
from operator import attrgetter
import core.utils
from cache.Cache import Cache
from core import builtin_helpers
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept, DEFINITION_TYPE_BNF, DoNotResolve, ConceptParts
from core.sheerka.services.SheerkaExecute import ParserInput
from core.tokenizer import Tokenizer, Token, TokenKind
from parsers.BaseNodeParser import BaseNodeParser, LexerNode, UnrecognizedTokensNode, ConceptNode, GrammarErrorNode
from parsers.BaseParser import BaseParser
PARSERS = ["AtomNode", "SyaNode", "Python"]
# def debug(obj):
# with open("debug.txt", "a") as f:
# f.write(f"{obj}\n")
def debug(obj):
pass
@dataclass
class ParsingContext:
"""
Class used to allow backtracking when parsing UnOrderedChoice pexpression
It keeps the LexerNode parsed and the position of the parser right after the parsing
"""
node: LexerNode # node parsed
pos: int # position of the parser after the parsing
def clone(self):
return ParsingContext(self.node.clone(), self.pos)
def fix_tokens(self, parser_helper):
"""
When the nodes are fully created, make sure that their sources and tokens are correct
:param parser_helper:
:return:
"""
self.node.tokens = parser_helper.parser.parser_input.tokens[self.node.start: self.node.end + 1]
self.node.source = BaseParser.get_text_from_tokens(self.node.tokens)
def __mul__(self, other):
res = [self]
for i in range(other - 1):
res.append(self.clone())
return res
class NonTerminalNode(LexerNode):
"""
A LexerNode is the result of the parsing of a parsing expression (pexpression)
NonTerminalNode when parsing a pexpression which has children (Sequence, OrderedChoice, Optional, Repetition...)
"""
def __init__(self, parsing_expression, start, end, tokens, children=None):
super().__init__(start, end, tokens)
self.parsing_expression = parsing_expression
self.children = children
def __repr__(self):
name = "Node:" + (self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__)
if len(self.children) > 0:
sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")"
else:
sub_names = ""
return name + sub_names
def __eq__(self, other):
if not isinstance(other, NonTerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.children == other.children
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.children))
def clone(self):
clone = NonTerminalNode(self.parsing_expression, self.start, self.end, self.tokens, self.children.copy())
return clone
class TerminalNode(LexerNode):
"""
A LexerNode is the result of the parsing of a parsing expression (pexpression)
TerminalNode for StrMatch
"""
def __init__(self, parsing_expression, start, end, value):
super().__init__(start, end, source=value)
self.parsing_expression = parsing_expression
self.value = value
def __repr__(self):
name = "Node:" + (self.parsing_expression.rule_name or "")
return name + f"'{self.value}'"
def __eq__(self, other):
if not isinstance(other, TerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.value == other.value
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.value))
def clone(self):
clone = TerminalNode(self.parsing_expression, self.start, self.end, self.value)
return clone
class MultiNode:
""""
A LexerNode is the result of the parsing of a parsing expression (pexpression)
MultiNode is used by the UnorderedChoice parsing expression when multiple choices are found
"""
def __init__(self, results):
self.results = results
def __repr__(self):
text = "MultiNode("
sources = [r.node.source for r in self.results]
text += f"{sources})"
return text
def combine(self, parsing_expression):
for i in range(len(self.results)):
node = self.results[i].node
self.results[i].node = NonTerminalNode(parsing_expression,
node.start,
node.end,
node.tokens,
[node])
return self
class ParsingExpression:
def __init__(self, *args, **kwargs):
self.elements = args
nodes = kwargs.get('nodes', []) or []
if not hasattr(nodes, '__iter__'):
nodes = [nodes]
self.nodes = nodes
self.rule_name = kwargs.get('rule_name', '')
def __eq__(self, other):
if not isinstance(other, ParsingExpression):
return False
if self.rule_name != other.rule_name:
return False
if len(self.elements) != len(other.elements):
return False
for self_element, other_element in zip(self.elements, other.elements):
if self_element != other_element:
return False
return True
def __hash__(self):
return hash((self.rule_name, self.elements))
def parse(self, parser):
debug(self)
# TODO : add memoization
return self._parse(parser)
def add_rule_name_if_needed(self, text):
return text + "=" + self.rule_name if self.rule_name else text
class ConceptExpression(ParsingExpression):
"""
Will match a concept
It used only for rule definition
When the grammar is created, it is replaced by the actual concept
"""
def __init__(self, concept, rule_name="", recurse_id=None, nodes=None):
super().__init__(rule_name=rule_name, nodes=nodes)
self.concept = concept
self.recurse_id = recurse_id
def __repr__(self):
return self.add_rule_name_if_needed(f"{self.concept}")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, ConceptExpression):
return False
# TODO : enable self.recurse_id when it will be correctly implemented
# if self.recurse_id != other.recurse_id:
# return False
if isinstance(self.concept, Concept):
return self.concept.id == other.concept.id
# when it's only the name of the concept
return self.concept == other.concept
def __hash__(self):
return hash((self.concept, self.rule_name))
def _parse(self, parser_helper):
node = self.nodes[0].parse(parser_helper)
if node is None:
return None
if isinstance(node, MultiNode):
return node.combine(self)
return NonTerminalNode(self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
class Sequence(ParsingExpression):
"""
Will match sequence of parser expressions in exact order they are defined.
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = parser_helper.pos
ntn = NonTerminalNode(self,
init_pos,
end_pos,
None,
[])
parsing_contexts = [ParsingContext(ntn, parser_helper.pos)]
to_remove = []
to_append = []
for e in self.nodes:
for pcontext in parsing_contexts:
parser_helper.seek(pcontext.pos)
node = e.parse(parser_helper)
if node is None:
to_remove.append(pcontext)
elif isinstance(node, MultiNode):
clones = pcontext * len(node.results) # clones pcontext (but first item is pcontext)
to_append.extend(clones[1:])
for clone, node_pcontext in zip(clones, node.results):
clone.pos = node_pcontext.pos
clone.node.children.append(node_pcontext.node)
clone.node.end = node_pcontext.node.end
else:
if node.end != -1: # because returns -1 when no match
pcontext.pos = parser_helper.pos
pcontext.node.children.append(node)
pcontext.node.end = node.end
for pcontext in to_remove:
parsing_contexts.remove(pcontext)
parsing_contexts.extend(to_append)
if len(parsing_contexts) == 0:
return None
to_append.clear()
to_remove.clear()
# reset tokenizer the following pexpression
parser_helper.seek(parsing_contexts[0].pos)
# update nodes sources and tokens
for pcontext in parsing_contexts:
pcontext.fix_tokens(parser_helper)
if len(parsing_contexts) == 1:
return parsing_contexts[0].node
return MultiNode(parsing_contexts)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class OrderedChoice(ParsingExpression):
"""
Will match the first one among multiple
It will stop at the first match (so the order of definition is important)
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
for e in self.nodes:
node = e.parse(parser_helper)
if node:
return NonTerminalNode(self,
init_pos,
node.end,
parser_helper.parser.parser_input.tokens[init_pos: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
return None
def __repr__(self):
to_str = "| ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class UnOrderedChoice(ParsingExpression):
"""
May match many nodes. It will return nodes sorted by length
All elements will be tested, so the order is not important
The behaviour when multiple candidates with same length are found is not defined yet
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
parsing_contexts = []
for e in self.nodes:
node = e.parse(parser_helper)
if node:
if isinstance(node, MultiNode):
node.combine(self)
parsing_contexts.extend(node.results)
else:
tn = NonTerminalNode(self,
init_pos,
node.end,
parser_helper.parser.parser_input.tokens[init_pos: node.end + 1],
[node])
parsing_contexts.append(ParsingContext(tn, parser_helper.pos))
parser_helper.seek(init_pos) # backtrack
if len(parsing_contexts) == 0:
return None
parser_helper.seek(parsing_contexts[0].pos)
if len(parsing_contexts) == 1:
return parsing_contexts[0].node
else:
parsing_contexts.sort(key=attrgetter("pos"), reverse=True)
return MultiNode(parsing_contexts)
def __repr__(self):
to_str = "# ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class Optional(ParsingExpression):
"""
Will match or not the elements
if many matches, will choose longest one
If you need order, use Optional(OrderedChoice)
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
selected_node = NonTerminalNode(self, parser_helper.pos, -1, [], []) # means that nothing is found
for e in self.nodes:
node = e.parse(parser_helper)
if node:
if node.end > selected_node.end:
selected_node = NonTerminalNode(
self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
if selected_node.end != -1:
parser_helper.seek(selected_node.end)
parser_helper.next_token() # eat the tokens found
return selected_node
def __repr__(self):
if len(self.elements) == 1:
return f"{self.elements[0]}?"
else:
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})?")
class Repetition(ParsingExpression):
"""
Base class for all repetition-like parser expressions (?,*,+)
Args:
eolterm(bool): Flag that indicates that end of line should
terminate repetition match.
"""
def __init__(self, *elements, **kwargs):
super(Repetition, self).__init__(*elements, **kwargs)
self.sep = kwargs.get('sep', None)
def clone(self):
return Repetition(self.elements,
rule_name=self.rule_name,
nodes=self.nodes,
sep=self.sep)
class ZeroOrMore(Repetition):
"""
ZeroOrMore will try to match parser expression specified zero or more
times. It will never fail.
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0:
return NonTerminalNode(self, init_pos, -1, [], [])
return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})*")
class OneOrMore(Repetition):
"""
OneOrMore will try to match parser expression specified one or more times.
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0: # if nothing is found, it's an error
return None
return NonTerminalNode(self,
init_pos,
end_pos,
parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})+")
class UnorderedGroup(Repetition):
"""
Will try to match all of the parsing expression in any order.
"""
def _parse(self, parser):
raise NotImplementedError()
# def __repr__(self):
# to_str = ", ".join(repr(n) for n in self.elements)
# return f"({to_str})#"
class Match(ParsingExpression):
"""
Base class for all classes that will try to match something from the input.
"""
def __init__(self, rule_name, root=False):
super(Match, self).__init__(rule_name=rule_name, root=root)
def parse(self, parser):
result = self._parse(parser)
return result
class StrMatch(Match):
"""
Matches a literal
"""
def __init__(self, to_match, rule_name="", ignore_case=True, skip_whitespace=True):
super(Match, self).__init__(rule_name=rule_name)
self.to_match = to_match
self.ignore_case = ignore_case
self.skip_white_space = skip_whitespace
def __repr__(self):
text = self.to_match
if not self.ignore_case:
text += "#!ic"
if not self.skip_white_space:
text += "#!sw"
return self.add_rule_name_if_needed(f"'{text}'")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, StrMatch):
return False
return self.to_match == other.to_match and \
self.ignore_case == other.ignore_case and \
self.skip_white_space == other.skip_white_space
def _parse(self, parser_helper):
token = parser_helper.get_token()
m = token.str_value.lower() == self.to_match.lower() if self.ignore_case \
else token.strip_quote == self.to_match
if m:
node = TerminalNode(self, parser_helper.pos, parser_helper.pos, token.str_value)
parser_helper.next_token(self.skip_white_space)
return node
debug(f"Failed to match {self}. {token=}")
return None
# class RegExMatch(Match):
# '''
# This Match class will perform input matching based on Regular Expressions.
#
# Args:
# to_match (regex string): A regular expression string to match.
# It will be used to create regular expression using re.compile.
# ignore_case(bool): If case insensitive match is needed.
# Default is None to support propagation from global parser setting.
# multiline(bool): allow regex to works on multiple lines
# (re.DOTALL flag). Default is None to support propagation from
# global parser setting.
# str_repr(str): A string that is used to represent this regex.
# re_flags: flags parameter for re.compile if neither ignore_case
# or multiple are set.
#
# '''
# def __init__(self, to_match, rule_name='', root=False, ignore_case=None,
# multiline=None, str_repr=None, re_flags=re.MULTILINE):
# super(RegExMatch, self).__init__(rule_name, root)
# self.to_match_regex = to_match
# self.ignore_case = ignore_case
# self.multiline = multiline
# self.explicit_flags = re_flags
#
# self.to_match = str_repr if str_repr is not None else to_match
#
# def compile(self):
# flags = self.explicit_flags
# if self.multiline is True:
# flags |= re.DOTALL
# if self.multiline is False and flags & re.DOTALL:
# flags -= re.DOTALL
# if self.ignore_case is True:
# flags |= re.IGNORECASE
# if self.ignore_case is False and flags & re.IGNORECASE:
# flags -= re.IGNORECASE
# self.regex = re.compile(self.to_match_regex, flags)
#
# def __str__(self):
# return self.to_match
#
# def __unicode__(self):
# return self.__str__()
#
# def _parse(self, parser):
# c_pos = parser.position
# m = self.regex.match(parser.input, c_pos)
# if m:
# matched = m.group()
# if parser.debug:
# parser.dprint(
# "++ Match '%s' at %d => '%s'" %
# (matched, c_pos, parser.context(len(matched))))
# parser.position += len(matched)
# if matched:
# return Terminal(self, c_pos, matched, extra_info=m)
# else:
# if parser.debug:
# parser.dprint("-- NoMatch at {}".format(c_pos))
# parser._nm_raise(self, c_pos, parser)
class ParsingExpressionVisitor:
"""
visit ParsingExpression
"""
STOP = "##_Stop_##"
def visit(self, parsing_expression):
name = parsing_expression.__class__.__name__
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(parsing_expression)
def generic_visit(self, parsing_expression):
if hasattr(self, "visit_all"):
self.visit_all(parsing_expression)
for node in parsing_expression.elements:
if isinstance(node, Concept):
res = self.visit(ConceptExpression(node.key or node.name))
elif isinstance(node, str):
res = self.visit(StrMatch(node))
else:
res = self.visit(node)
if res == self.STOP:
return
class BnfNodeFirstTokenVisitor(ParsingExpressionVisitor):
def __init__(self, sheerka):
self.sheerka = sheerka
self.first_tokens = None
def add_first_token(self, first_token):
if not self.first_tokens:
self.first_tokens = [first_token]
else:
self.first_tokens.append(first_token)
def visit_ConceptExpression(self, pe):
concept = self.sheerka.get_by_key(pe.concept) if isinstance(pe.concept, str) else pe.concept
if self.sheerka.is_known(concept):
self.add_first_token(core.utils.str_concept(concept, drop_name=True))
return self.STOP
def visit_StrMatch(self, pe):
if not pe.to_match:
return
self.add_first_token(pe.to_match)
return self.STOP
def visit_OrderedChoice(self, parsing_expression):
for node in parsing_expression.elements:
self.visit(node)
return self.STOP
def visit_UnOrderedChoice(self, parsing_expression):
for node in parsing_expression.elements:
self.visit(node)
return self.STOP
class BnfNodeConceptExpressionVisitor(ParsingExpressionVisitor):
def __init__(self):
self.references = []
def visit_ConceptExpression(self, pe):
self.references.append(pe.concept)
class BnfConceptParserHelper:
def __init__(self, parser):
self.parser = parser
self.debug = []
self.errors = []
self.sequence = []
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
self.has_unrecognized = False
self.bnf_parsed = False
self.forked = []
self.token = None
self.pos = -1
def __repr__(self):
return f"BnfConceptParserHelper({self.sequence})"
def __eq__(self, other):
if id(self) == id(other):
return True
if not isinstance(other, BnfConceptParserHelper):
return False
return self.sequence == other.sequence and self.errors == other.errors
def __hash__(self):
return len(self.sequence) + len(self.errors)
def get_token(self) -> Token:
return self.token
def next_token(self, skip_whitespace=True):
if self.token and self.token.type == TokenKind.EOF:
return False
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
if skip_whitespace:
while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE:
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
return self.token.type != TokenKind.EOF
def seek(self, pos):
self.pos = pos
self.token = self.parser.parser_input.tokens[self.pos]
def has_error(self):
return len(self.errors) > 0
def is_locked(self):
return self.parser.parser_input.pos <= self.pos or self.has_error()
def eat_concept(self, concept, token):
if self.is_locked():
return
self.debug.append(concept)
self.manage_unrecognized()
for forked in self.forked:
# manage the fact that some clone may have been forked
forked.eat_concept(concept, token)
# init
parsing_expression = self.parser.get_parsing_expression(self.parser.context, concept)
if not isinstance(parsing_expression, ParsingExpression):
self.debug.append(concept)
error_msg = f"Failed to parse concept '{concept}'"
if parsing_expression is not None:
error_msg += f". Reason: '{parsing_expression}'"
self.errors.append(GrammarErrorNode(error_msg))
return
self.pos = self.parser.parser_input.pos
self.token = self.parser.parser_input.tokens[self.pos]
# parse
debug(f"parsing {parsing_expression} against '{self.parser.parser_input.text}'")
node = parsing_expression.parse(self)
if isinstance(node, MultiNode):
# when multiple choices are found, use the longest result
node = node.results[0].node
if node is not None and node.end != -1:
self.sequence.append(self.create_concept_node(concept, node))
self.pos = node.end
self.bnf_parsed = True
else:
self.debug.append(("Rewind", token))
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
self.pos = self.parser.parser_input.pos # reset position
def eat_unrecognized(self, token):
if self.is_locked():
return
self.debug.append(token)
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
def manage_unrecognized(self):
if self.unrecognized_tokens.is_empty():
return
# do not put empty UnrecognizedToken in out
if self.unrecognized_tokens.is_whitespace():
self.unrecognized_tokens.reset()
return
self.unrecognized_tokens.fix_source()
# try to recognize concepts
nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized(
self.parser.context,
self.unrecognized_tokens,
PARSERS)
if nodes_sequences:
instances = [self]
for i in range(len(nodes_sequences) - 1):
clone = self.clone()
instances.append(clone)
self.forked.append(clone)
for instance, node_sequence in zip(instances, nodes_sequences):
for node in node_sequence:
instance.sequence.append(node)
if isinstance(node, UnrecognizedTokensNode) or \
hasattr(node, "unrecognized_tokens") and node.unrecognized_tokens:
instance.has_unrecognized = True
instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
else:
self.sequence.append(self.unrecognized_tokens)
self.has_unrecognized = True
# create another instance
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
def clone(self):
clone = BnfConceptParserHelper(self.parser)
clone.debug = self.debug[:]
self.errors = self.errors[:]
clone.sequence = self.sequence[:]
clone.pos = self.pos
clone.unrecognized_tokens = self.unrecognized_tokens.clone()
return clone
def finalize(self):
if self.bnf_parsed > 0:
self.manage_unrecognized()
for forked in self.forked:
# manage that some clones may have been forked
forked.finalize()
def create_concept_node(self, template, underlying):
sheerka = self.parser.context.sheerka
key = (template.key, template.id) if template.id else template.key
concept = sheerka.new(key)
concept = self.finalize_concept(sheerka, concept, underlying)
concept_node = ConceptNode(
concept,
underlying.start,
underlying.end,
self.parser.parser_input.tokens[underlying.start: underlying.end + 1],
None,
underlying)
return concept_node
def finalize_concept(self, sheerka, concept, underlying, init_empty_body=True):
"""
Updates the properties of the concept
Goes in recursion if the property is a concept
"""
# this cache is to make sure that we return the same concept for the same ConceptExpression
_underlying_value_cache = {}
def _add_prop(_concept, prop_name, value):
"""
Adds a new entry,
makes a list if the property already exists
"""
if prop_name not in _concept.compiled or _concept.compiled[prop_name] is None:
# new entry
_concept.compiled[prop_name] = value
else:
# make a list if there was a value
previous_value = _concept.compiled[prop_name]
if isinstance(previous_value, list):
previous_value.append(value)
else:
new_value = [previous_value, value]
_concept.compiled[prop_name] = new_value
def _look_for_concept_match(_underlying):
"""
At some point, there is either an StrMatch or a ConceptMatch,
that allowed the recognition.
Look for the ConceptMatch, with recursion if needed
"""
if isinstance(_underlying.parsing_expression, ConceptExpression):
return _underlying
if not isinstance(_underlying, NonTerminalNode):
return None
if len(_underlying.children) != 1:
return None
return _look_for_concept_match(_underlying.children[0])
def _get_underlying_value(_underlying):
concept_match_node = _look_for_concept_match(_underlying)
if concept_match_node:
# the value is a concept
if id(concept_match_node) in _underlying_value_cache:
result = _underlying_value_cache[id(concept_match_node)]
else:
ref_tpl = concept_match_node.parsing_expression.concept
new = sheerka.new_from_template(ref_tpl, ref_tpl.key)
result = self.finalize_concept(sheerka, new, concept_match_node.children[0], init_empty_body)
_underlying_value_cache[id(concept_match_node)] = result
else:
# the value is a string
result = DoNotResolve(_underlying.source)
return result
def _process_rule_name(_concept, _underlying):
if _underlying.parsing_expression.rule_name:
value = _get_underlying_value(_underlying)
_add_prop(_concept, _underlying.parsing_expression.rule_name, value)
_concept.metadata.need_validation = True
elif isinstance(_underlying, NonTerminalNode):
for child in _underlying.children:
_process_rule_name(_concept, child)
if init_empty_body and concept.metadata.body is None:
value = _get_underlying_value(underlying)
concept.compiled[ConceptParts.BODY] = value
if underlying.parsing_expression.rule_name:
_add_prop(concept, underlying.parsing_expression.rule_name, value)
# KSI : Why don't we set concept.metadata.need_validation to True ?
if isinstance(underlying, NonTerminalNode):
for node in underlying.children:
_process_rule_name(concept, node)
return concept
def get_node_value(self, node):
"""
Try to evaluate the value of a given LexerNode (TerminalNode or NonTerminalNode)
:param node:
:return:
"""
if isinstance(node, TerminalNode):
return node.value
if isinstance(node.parsing_expression, ConceptExpression):
concept = node.parsing_expression.concept
finalized = self.finalize_concept(self.parser.sheerka, concept, node)
evaluated = core.builtin_helpers.ensure_evaluated(self.parser.context, finalized)
return evaluated.body
return None
@dataclass
class UnderConstruction:
concept_id: str
@dataclass()
class ToUpdate:
instance_id: int
parsing_expression: ParsingExpression
def __hash__(self):
return hash(self.instance_id)
class BnfNodeParser(BaseNodeParser):
def __init__(self, **kwargs):
super().__init__("BnfNode", 50, **kwargs)
if 'sheerka' in kwargs:
sheerka = kwargs.get("sheerka")
self.concepts_grammars = sheerka.concepts_grammars
else:
self.concepts_grammars = Cache()
self.ignore_case = True
@staticmethod
def _is_eligible(concept):
"""
Predicate that select concepts that must handled by AtomNodeParser
:param concept:
:return:
"""
return concept.metadata.definition_type == DEFINITION_TYPE_BNF
@staticmethod
def get_valid(parsers_helpers):
valid_parser_helpers = []
for parser_helper in parsers_helpers:
if not parser_helper.bnf_parsed or parser_helper.has_error():
continue
if parser_helper in valid_parser_helpers:
continue
valid_parser_helpers.append(parser_helper)
return valid_parser_helpers
@staticmethod
def get_expression_from_concept_name(name):
"""
Create the parsing expression from the name
This function differs from BNFParser.parse() as it does not try to resolve identifiers into concepts
>>> assert get_expression_from_concept_name('one hundred') == Sequence(StrMatch("one"), StrMatch("hundred"))
while BNFParser.parse("one hundred") will look for concept 'one' and concept 'hundred'
:param name:
:return:
"""
if name is None or name.strip() == "":
return []
res = []
tokens = Tokenizer(name, yield_eof=False)
for token in tokens:
if token.type == TokenKind.WHITESPACE:
continue
elif token.type == TokenKind.STRING:
sub_tokens = list(Tokenizer(token.strip_quote, yield_eof=False))
for sub_token in sub_tokens[:-1]:
res.append(StrMatch(sub_token.str_value, skip_whitespace=False))
res.append(StrMatch(sub_tokens[-1].str_value))
else:
res.append(StrMatch(token.str_value))
return res[0] if len(res) == 1 else Sequence(*res)
def get_concepts_sequences(self):
"""
Main method that parses the tokens and extract the concepts
:return:
"""
def _add_forked_to_concept_parser_helpers():
# check that if some new InfixToPostfix are created
for parser in concept_parser_helpers:
if len(parser.forked) > 0:
forked.extend(parser.forked)
parser.forked.clear()
if len(forked) > 0:
concept_parser_helpers.extend(forked)
forked.clear()
def _get_longest(parser_helpers):
# when there is a match with several concepts
# on keep the ones that eat the more tokens
by_end_pos = defaultdict(list)
for helper in parser_helpers:
by_end_pos[helper.pos].append(helper)
return by_end_pos[max(by_end_pos)]
forked = []
concept_parser_helpers = [BnfConceptParserHelper(self)]
while self.parser_input.next_token(False):
token = self.parser_input.token
try:
concepts = self.get_concepts(token, self._is_eligible, strip_quotes=False)
if not concepts:
for concept_parser in concept_parser_helpers:
concept_parser.eat_unrecognized(token)
continue
if len(concepts) == 1:
for concept_parser in concept_parser_helpers:
concept_parser.eat_concept(concepts[0], token)
continue
# make the cartesian product
temp_res = []
for concept_parser in concept_parser_helpers:
if concept_parser.is_locked():
# It means that it already eat the token
# so simply add it, do not clone
temp_res.append(concept_parser)
continue
for concept in concepts:
clone = concept_parser.clone()
temp_res.append(clone)
clone.eat_concept(concept, token)
# only keep the longest
concept_parser_helpers = _get_longest(temp_res)
finally:
_add_forked_to_concept_parser_helpers()
# make sure that remaining items in stack are moved to out
for concept_parser in concept_parser_helpers:
concept_parser.finalize()
_add_forked_to_concept_parser_helpers()
return concept_parser_helpers
def fix_infinite_recursions(self, context, grammar, concept_id, parsing_expression):
"""
Check the newly created parsing expresion
Some infinite recursion can be resolved, simply by removing the pexpression that causes the loop
Let's look for that
:param context:
:param grammar:
:param concept_id:
:param parsing_expression:
:return:
"""
def _find(expression_, path_):
index_ = -1
parent_ = None
for node_id in path_:
expression_ = expression_.nodes[0] if isinstance(expression_, ConceptExpression) else expression_
for i, node in [(i, n) for i, n in enumerate(expression_.nodes) if isinstance(n, ConceptExpression)]:
if node.recurse_id == node_id or node.concept.id == node_id:
index_ = i
parent_ = expression_
expression_ = node # take the child of the ConceptExpression found
break
else:
raise IndexError(f"path {path_} cannot be found in '{expression_}'")
return parent_, index_, expression_
def _fix_node(expression, path):
parent, index, expression_update = _find(expression, path[1:-2])
assert isinstance(expression_update, ConceptExpression)
desc = f"Fixing circular reference {path}"
with context.push(BuiltinConcepts.INIT_BNF,
expression_update.concept,
who=self.name,
obj=expression_update.concept,
concepts_to_skip=[concept_id],
desc=desc) as sub_context:
new_grammar = grammar.copy()
for node_id in path[-2:]:
del new_grammar[node_id]
new_nodes = self.resolve_concept_parsing_expression(sub_context,
expression_update.concept,
expression_update.rule_name, new_grammar, set())
new = ConceptExpression(expression_update.concept,
rule_name=expression_update.rule_name,
recurse_id=expression_update.recurse_id,
nodes=new_nodes)
parent.nodes[index] = new
while True:
already_found = [concept_id]
concepts_in_recursion = []
if self.check_for_infinite_recursion(parsing_expression, already_found, concepts_in_recursion):
if "#" in concepts_in_recursion[-2]:
# means that it's isaset concept
_fix_node(parsing_expression, concepts_in_recursion[:-1])
else:
break
else:
break
return concepts_in_recursion
def check_for_infinite_recursion(self, parsing_expression, already_found, in_recursion, only_first=False):
if isinstance(parsing_expression, ConceptExpression):
id_to_use = parsing_expression.recurse_id or parsing_expression.concept.id
if id_to_use in already_found:
already_found.append(id_to_use) # add the id again, to know where the cycle starts
in_recursion.extend(already_found)
return True
already_found.append(id_to_use)
return self.check_for_infinite_recursion(
parsing_expression.nodes[0], already_found, in_recursion, only_first)
if isinstance(parsing_expression, Sequence):
# for sequence, we need to check all nodes
if only_first:
nodes = [] if len(parsing_expression.nodes) == 0 else [parsing_expression.nodes[0]]
else:
nodes = parsing_expression.nodes
for node in nodes:
already_found_for_current_node = already_found.copy()
if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, False):
return True
return False
if isinstance(parsing_expression, OrderedChoice):
# for ordered choice, if there is at least one node that does not resolved to a recursion
# we are safe
for node in parsing_expression.nodes:
already_found_for_current_node = already_found.copy()
if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, True):
return True
else:
return False
return False
if isinstance(parsing_expression, UnOrderedChoice):
for node in parsing_expression.nodes:
already_found_for_current_node = already_found.copy()
if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, True):
return True
return False
return False
def get_parsing_expression(self, context, concept):
"""
Compute the parsing expression for a given concept
:param context:
:param concept:
:return:
"""
if concept.id in self.concepts_grammars:
return self.concepts_grammars.get(concept.id)
# internal cache of already computed parsing expression to use during the recursion
grammar = {}
# concept that are not totally resolved, because they reference parsing expression under construction
to_update = set() # the key is the instance id of the parsing expression
desc = f"Get parsing expression for concept {concept}"
with context.push(BuiltinConcepts.INIT_BNF, concept,
who=self.name,
obj=concept,
root_concept=concept,
desc=desc) as sub_context:
# get the parsing expression
ret = self.resolve_concept_parsing_expression(sub_context, concept, None, grammar, to_update)
# check and update parsing expression that are still under construction
# Note that we only update the concept that will update concepts_grammars
# because pe.node may be large
for item in to_update:
pe = item.parsing_expression
for i, node in enumerate(pe.nodes):
if isinstance(node, UnderConstruction):
pe.nodes[i] = grammar.get(node.concept_id)
# # check for infinite recursions.
# # and try to fix them when possible
# already_found = [concept.id]
# concepts_in_recursion = []
# if self.check_for_infinite_recursion(ret, already_found, concepts_in_recursion):
# chicken_anf_egg = context.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=concepts_in_recursion)
# for concept_id in concepts_in_recursion:
# grammar[concept_id] = chicken_anf_egg
concepts_in_recursion = self.fix_infinite_recursions(context, grammar, concept.id, ret)
if concepts_in_recursion:
chicken_anf_egg = context.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=concepts_in_recursion)
for concept_id in concepts_in_recursion:
grammar[concept_id] = chicken_anf_egg
# update, in case of infinite circular recursion
ret = grammar[concept.id]
# finally, update concept grammar
for k, v in grammar.items():
self.concepts_grammars.put(k, v)
# not quite sure that it is a good idea.
# Why do we want to corrupt previous valid entries ?
if context.sheerka.isinstance(v, BuiltinConcepts.CHICKEN_AND_EGG):
self.concepts_grammars.put(k, v)
sub_context.add_values(return_values=ret)
return ret
def resolve_concept_parsing_expression(self, context, concept, name, grammar, to_update):
"""
:param context:
:param concept: concept
:param name: rule_name of the concept if exists
:param grammar: already resolved parsing expressions
:param to_update: parsing expressions that contains unresovled parsing expression
:return:
"""
if context.sheerka.isaset(context, concept) and hasattr(context, "obj"):
key_to_use = f"{concept.id}#{name}#{context.obj.id}"
else:
key_to_use = concept.id
if key_to_use in self.concepts_grammars: # validated entry
return self.concepts_grammars.get(key_to_use)
if key_to_use in grammar: # under construction entry
return grammar.get(key_to_use)
desc = f"Resolve concept parsing expression for '{concept}'. {key_to_use=}"
with context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as sub_context:
if not concept.bnf: # to save a function call. Not sure it worth it.
BaseNodeParser.ensure_bnf(sub_context, concept, self.name)
grammar[key_to_use] = UnderConstruction(concept.id)
sheerka = context.sheerka
if concept.metadata.definition_type == DEFINITION_TYPE_BNF:
expression = concept.bnf
desc = f"Bnf concept detected. Resolving parsing expression '{expression}'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(expression=expression)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_update)
ssc.add_values(return_values=resolved)
elif sheerka.isaset(context, concept):
desc = f"Concept is a group. Resolving parsing expression using 'isa'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(concept=concept)
concepts_in_group = self.sheerka.get_set_elements(ssc, concept)
valid_concepts = []
for c in concepts_in_group:
if c.id == context.obj.id:
continue
if hasattr(context, "concepts_to_skip") and c.id in context.concepts_to_skip:
continue
valid_concepts.append(c)
nodes = []
for c in valid_concepts:
c_recurse_id = f"{c.id}#{c.name}#{concept.id}" if self.sheerka.isaset(context, c) else None
nodes.append(ConceptExpression(c, rule_name=c.name, recurse_id=c_recurse_id))
resolved = self.resolve_parsing_expression(ssc,
UnOrderedChoice(*nodes),
grammar,
to_update)
ssc.add_values(concepts_in_group=concepts_in_group)
ssc.add_values(return_values=resolved)
else:
desc = f"Concept is a simple concept."
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
expression = self.get_expression_from_concept_name(concept.name)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_update)
grammar[key_to_use] = resolved
if self.has_error:
sub_context.add_values(errors=self.error_sink)
return None
sub_context.add_values(return_values=resolved)
return resolved
def resolve_parsing_expression(self, context, expression, grammar, to_update):
if isinstance(expression, str):
ret = StrMatch(expression, ignore_case=self.ignore_case)
elif not isinstance(expression, ParsingExpression):
return expression # escalate the error
elif isinstance(expression, ConceptExpression):
concept = self.get_concept(context, expression.concept)
expression.concept = concept
if not self.sheerka.is_known(concept):
unknown_concept = self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=concept)
return self.add_error(unknown_concept)
pe = self.resolve_concept_parsing_expression(
context,
concept,
expression.rule_name,
grammar,
to_update)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
if isinstance(pe, UnderConstruction):
to_update.add(ToUpdate(id(expression), expression))
expression.nodes = [pe]
expression.rule_name = expression.rule_name or concept.name
ret = expression
elif isinstance(expression, StrMatch):
ret = expression
if ret.ignore_case is None:
ret.ignore_case = self.ignore_case
elif isinstance(expression, Sequence) or \
isinstance(expression, OrderedChoice) or \
isinstance(expression, UnOrderedChoice) or \
isinstance(expression, ZeroOrMore) or \
isinstance(expression, OneOrMore) or \
isinstance(expression, Optional):
ret = expression
ret.nodes = []
for e in ret.elements:
pe = self.resolve_parsing_expression(context, e, grammar, to_update)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
if isinstance(pe, UnderConstruction):
to_update.add(ToUpdate(id(expression), ret))
ret.nodes.append(pe)
else:
ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False)
# Translate separator expression.
if isinstance(ret, Repetition) and expression.sep:
expression.sep = self.resolve_parsing_expression(context,
expression.sep,
grammar,
to_update)
return ret
def get_concept(self, context, concept):
if isinstance(concept, Concept):
return concept
if concept in context.concepts:
return context.concepts[concept]
return self.sheerka.get_by_key(concept)
def parse(self, context, parser_input: ParserInput):
"""
parser_input can be string, but text can also be an list of tokens
:param context:
:param parser_input:
:return:
"""
if not isinstance(parser_input, ParserInput):
return None
context.log(f"Parsing '{parser_input}' with BnfNode", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.NOT_FOR_ME,
body=parser_input.as_text(),
reason=BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
sequences = self.get_concepts_sequences()
valid_parser_helpers = self.get_valid(sequences)
if valid_parser_helpers is None:
# token error
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
if len(valid_parser_helpers) == 0:
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text()))
ret = []
for parser_helper in valid_parser_helpers:
ret.append(
self.sheerka.ret(
self.name,
not parser_helper.has_unrecognized,
self.sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=parser_input.as_text(),
body=parser_helper.sequence,
try_parsed=parser_helper.sequence)))
if len(ret) == 1:
self.log_result(context, parser_input.as_text(), ret[0])
return ret[0]
else:
self.log_multiple_results(context, parser_input.as_text(), ret)
return ret