Files
Sheerka-Old/src/parsers/BnfNodeParser.py
T
kodjo 54e5681c5a Fixed #109 : Mix python and concept. List comprehension
Fixed #110 : SheerkaDebugManager: add list_debug_settings
Fixed #111 : SheerkaDebugManager: Implement ListDebugLogger
Fixed #112 : SyaNodeParser: rewrite this parser
Fixed #113 : Sheerka.: Add enable_parser_caching to disable parsers caching
Fixed #114 : SyaNodeParser : Implement fast cache to resolve unrecognized tokens requests
Fixed #115 : BnfNodeParser : Implement fast cache to resolve unrecognized tokens requests
Fixed #116 : SequenceNodeParser : Implement fast cache to resolve unrecognized tokens requests
Fixed #117 : ResolveMultiplePluralAmbiguityEvaluator: Resolve Multiple plural ambiguity
2021-09-06 11:51:50 +02:00

2157 lines
80 KiB
Python

#####################################################################################################
# This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio)
# I don't directly use the project, but it helped me figure out
# what to do.
# Dejanović I., Milosavljević G., Vaderna R.:
# Arpeggio: A flexible PEG parser for Python,
# Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004
#####################################################################################################
import re
from collections import defaultdict
from dataclasses import dataclass, field
from operator import attrgetter
from typing import List
import core.builtin_helpers
import core.utils
from cache.Cache import Cache
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept, ConceptParts, DEFINITION_TYPE_BNF, DoNotResolve
from core.global_symbols import NotFound
from core.sheerka.services.SheerkaExecute import ParserInput
from core.tokenizer import Token, TokenKind, Tokenizer
from core.utils import CONSOLE_COLORS_MAP as CCM
from parsers.BaseNodeParser import BaseNodeParser, ConceptNode, GrammarErrorNode, NoMatchingTokenError, RuleNode, \
SourceCodeNode, SourceCodeWithConceptNode, UnrecognizedTokensCache, UnrecognizedTokensNode
PARSERS = ["Sequence", "Sya", "Python"]
VARIABLE_EXPR_PARSER = ["Sequence", "Sya", "Python", "Bnf"]
@dataclass(eq=True)
class RegExDef:
to_match: str = None
ignore_case: bool = True
multiline: bool = None
explicit_flags: int = re.MULTILINE
def __hash__(self):
return hash((self.to_match, self.ignore_case, self.multiline, self.explicit_flags))
@staticmethod
def compile_flags(ignore_case, multiline, explicit_flags):
flags = explicit_flags
if multiline is True:
flags |= re.DOTALL
if multiline is False and flags & re.DOTALL:
flags -= re.DOTALL
if ignore_case is True:
flags |= re.IGNORECASE
if ignore_case is False and flags & re.IGNORECASE:
flags -= re.IGNORECASE
return flags
def serialize(self):
return f"{self.to_match}__!##ZZSEPZZ##!__{self.ignore_case}|{self.multiline}|{int(self.explicit_flags)}"
def deserialize(self, txt):
parts = txt.split("__!##ZZSEPZZ##!__")
parts2 = parts[1].split("|")
self.to_match = parts[0]
self.ignore_case = None if parts2[0] == "None" else True if parts2[0] == "True" else False
self.multiline = None if parts2[1] == "None" else True if parts2[1] == "True" else False
self.explicit_flags = int(parts2[2])
return self
class ParseTreeNode:
def __init__(self, parsing_expression, start: int, end: int, tokens: List[Token] = None, source: str = None):
self.parsing_expression = parsing_expression
self.start = start
self.end = end
self.tokens = tokens
self.source = source
if self.source is None:
self.source = core.utils.get_text_from_tokens(self.tokens)
class NonTerminalNode(ParseTreeNode):
"""
A ParseTreeNode is the result of the parsing of a parsing expression (pexpression)
NonTerminalNode when parsing a pexpression which has children (Sequence, OrderedChoice, Optional, Repetition...)
"""
def __init__(self, parsing_expression, start, end, tokens, children=None):
super().__init__(parsing_expression, start, end, tokens)
self.children = children
def __repr__(self):
name = "Node:" + (self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__)
if len(self.children) > 0:
sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")"
else:
sub_names = ""
return name + sub_names
def __eq__(self, other):
if not isinstance(other, NonTerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.children == other.children
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.children))
def clone(self):
clone = NonTerminalNode(self.parsing_expression, self.start, self.end, self.tokens, self.children.copy())
return clone
def get_debug(self):
res = f"{self.parsing_expression.concept}=>" if isinstance(self.parsing_expression, ConceptExpression) else ""
return res + ".".join([c.get_debug() for c in self.children])
def get_depth(self):
if isinstance(self.parsing_expression, ConceptExpression):
return 1 + max([c.get_depth() for c in self.children])
else:
return max([c.get_depth() for c in self.children])
class TerminalNode(ParseTreeNode):
"""
A ParseTreeNode is the result of the parsing of a parsing expression (pexpression)
TerminalNode for StrMatch
"""
def __init__(self, parsing_expression, start, end, source, value):
super().__init__(parsing_expression, start, end, source=source)
self.value = value
def __repr__(self):
name = "Node:" + (self.parsing_expression.rule_name or "")
return name + f"'{self.value}'"
def __eq__(self, other):
if not isinstance(other, TerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.source == other.source and \
self.value == other.value
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.source, self.value))
def clone(self):
clone = TerminalNode(self.parsing_expression, self.start, self.end, self.source, self.value)
return clone
def get_debug(self):
return str(self.value)
def get_depth(self):
return 0
class MultiNode:
""""
MultiNode is used by the UnorderedChoice parsing expression when multiple choices are found
It should inherit from ParseTreeNode, like its siblings TerminalNode and NonTerminal node
but I am too lazy to bother with start and end positions
"""
def __init__(self, results):
self.results = results
def __repr__(self):
text = "MultiNode("
sources = [r.node.source for r in self.results]
text += f"{sources})"
return text
def combine(self, parsing_expression):
for i in range(len(self.results)):
node = self.results[i].node
self.results[i].node = NonTerminalNode(parsing_expression,
node.start,
node.end,
node.tokens,
[node])
return self
@dataclass
class ParsingContext:
"""
Class used to allow backtracking when parsing UnOrderedChoice pexpression
It keeps the ParseTreeNode parsed and the position of the parser right after the parsing
"""
node: ParseTreeNode # node or nodes parsed
pos: int # position of the parser after the parsing
next_results: List[ParseTreeNode] = None # other node parsed, when known
variables: dict = field(default_factory=dict) # variables already seen
to_remove: bool = False # an error/inconsistency is detected. Remove this parsing context ASAP
def clone(self):
next_result_clones = [n.clone() for n in self.next_results] if self.next_results else None
return ParsingContext(self.node.clone(), self.pos, next_result_clones, self.variables.copy(), False)
def fix_tokens(self, parser_helper):
"""
When the nodes are fully created, make sure that their sources and tokens are correct
:param parser_helper:
:return:
"""
self.node.tokens = parser_helper.parser.parser_input.tokens[self.node.start: self.node.end + 1]
self.node.source = core.utils.get_text_from_tokens(self.node.tokens)
def update_with_ptree_node(self, ptree_node, pos):
next_results = None
if isinstance(ptree_node, list):
next_results = ptree_node[1:]
ptree_node = ptree_node[0]
if ptree_node.end == -1:
# means that the node must not be added, but the parsing context is not in error
return
if isinstance(ptree_node.parsing_expression, VariableExpression):
# check the variables consistency
var_name = ptree_node.parsing_expression.rule_name
if var_name in self.variables and self.variables[var_name] != ptree_node.source:
self.to_remove = True
return
self.variables[var_name] = ptree_node.source
self.pos = pos
self.node.children.append(ptree_node)
self.node.end = ptree_node.end
if ptree_node.start < self.node.start:
# fix start pos when sequence stars with VariableExpression
self.node.start = ptree_node.start
if next_results is not None:
self.next_results = next_results
def __mul__(self, other):
res = [self]
for i in range(other - 1):
res.append(self.clone())
return res
def __repr__(self):
if isinstance(self.node, list):
res = f"ParsingContext('{[n.get_debug() for n in self.node]}', pos={self.pos})"
else:
res = f"ParsingContext('{self.node.get_debug()}', pos={self.pos})"
return res
def get_depth(self):
if isinstance(self.node, list):
return max([n.get_depth() for n in self.node])
else:
return self.node.get_depth()
class ParsingExpression:
log_sink = []
@classmethod
def reset_logs(cls):
cls.log_sink.clear()
def __init__(self, *args, **kwargs):
self.elements = args
nodes = kwargs.get('nodes', []) or []
if not hasattr(nodes, '__iter__'):
nodes = [nodes]
self.nodes = nodes
self.rule_name = kwargs.get('rule_name', '')
def __eq__(self, other):
if not isinstance(other, ParsingExpression):
return False
if self.rule_name != other.rule_name:
return False
if len(self.elements) != len(other.elements):
return False
for self_element, other_element in zip(self.elements, other.elements):
if self_element != other_element:
return False
return True
def __hash__(self):
return hash((self.rule_name, self.elements))
def parse(self, parser_helper):
# TODO : add memoization
# parser_helper.debugger.debug_log(f">> {parser_helper.pos:3d} : {self}")
# if self.debug_enabled:
# self.debug(f">> {parser_helper.pos:3d} : {self}")
res = self._parse(parser_helper)
return res
def add_rule_name_if_needed(self, text):
return text + "=" + self.rule_name if self.rule_name else text
def inner_get_debug(self, n, tab=""):
"""
:param n: line number
:param tab: current indentation
:return:
"""
if not self.debug:
return None
id_self = id(self)
def add_debug_for_current(_n, _debug):
if n >= len(self.log_sink):
return _n, _debug
_l = self.log_sink[_n]
while _l[0] == id_self:
_debug += tab + _l[1] + "\n"
_n += 1
if _n == len(self.log_sink):
return _n, _debug
_l = self.log_sink[_n]
return _n, _debug
# if n >= len(self.log_sink):
# return n, None
#
# line = self.log_sink[n]
#
# if line[0] != id_self:
# # return n, f"{tab}>> No log for {self}\n"
# return n, None
debug = ""
n, debug = add_debug_for_current(n, debug)
# while line[0] == id_self:
# debug += tab + line[1] + "\n"
# n += 1
# if n == len(self.log_sink):
# return n, debug
# line = self.log_sink[n]
for node in self.nodes:
n, node_debug = node.inner_get_debug(n, tab + " ")
if node_debug:
debug += node_debug
n, debug = add_debug_for_current(n, debug)
return n, debug
@staticmethod
def debug_prefix(self_name, parser_helper):
current_rule_name = parser_helper.get_current_rule_name()
current_concept = parser_helper.concepts[-1]
str_rule_name = f":{current_rule_name}" if current_rule_name not in (None, current_concept.name) else ""
return f"{self_name}({current_concept}{str_rule_name})"
@staticmethod
def debug_remaining_text(parser_helper):
remaining_text = parser_helper.get_parsing_text()[parser_helper.token.index:]
if len(remaining_text) > 50:
remaining_text = remaining_text[:47] + "..."
return remaining_text
@staticmethod
def debug_to_raw(variables):
res = ""
first = True
for k, v in variables.items():
if not first:
res += ", "
res += f"{k}={v}"
first = False
return res
class ConceptExpression(ParsingExpression):
"""
Will match a concept
It used only for rule definition
When the grammar is created, it is replaced by the actual concept
"""
def __init__(self, concept, rule_name="", nodes=None):
super().__init__(rule_name=rule_name, nodes=nodes)
self.concept = concept
def __repr__(self):
return self.add_rule_name_if_needed(f"{self.concept}")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, ConceptExpression):
return False
if isinstance(self.concept, Concept):
return self.concept.id == other.concept.id
# when it's only the name of the concept
return self.concept == other.concept
def __hash__(self):
return hash((self.concept, self.rule_name))
def _parse(self, parser_helper):
parser_helper.rules_names.append(self.rule_name)
parser_helper.push_concept(self.concept)
# parser_helper.debug_concept(self.debug_prefix("ConceptExpression", parser_helper) + "=start")
node = self.nodes[0].parse(parser_helper)
# parser_helper.debug_concept(self.debug_prefix("ConceptExpression", parser_helper) + "=end")
parser_helper.pop_concept()
parser_helper.rules_names.pop()
if node is None:
return None
if isinstance(node, MultiNode):
return node.combine(self)
return NonTerminalNode(self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
@staticmethod
def get_recursion_id(parent_id, concept_id, rule_name):
return f"{parent_id}#{concept_id}({rule_name})"
class VariableExpression(ParsingExpression):
def __init__(self, rule_name):
super().__init__(rule_name=rule_name)
self.before_first_token_node = False
self.expected_variables = [self]
self.next_node_to_parse = None
def __repr__(self):
return self.add_rule_name_if_needed(f"Var")
def __eq__(self, other):
if not super().__eq__(other):
return False
return isinstance(other, VariableExpression)
def __hash__(self):
return hash(("VariableExpression", self.rule_name))
def init_parsing(self):
"""
Get the instance of the following VariableExpression if they exists,
:return:
"""
next_node_to_parse = self.nodes[0] if len(self.nodes) > 0 else None
while isinstance(next_node_to_parse, VariableExpression):
self.expected_variables.append(next_node_to_parse)
next_node_to_parse = next_node_to_parse.nodes[0] if len(next_node_to_parse.nodes) > 0 else None
self.next_node_to_parse = next_node_to_parse
def get_nodes_sequences_when_variables_are_first(self, parser_helper):
if len(parser_helper.sequence) < len(self.expected_variables):
# variable(s) is/are expected. But nothing found
return None
# only take the requested number of variables
nodes_sequence = parser_helper.sequence[-len(self.expected_variables):]
nodes_sequences = [nodes_sequence]
return nodes_sequences
def get_nodes_sequences_when_variables_are_last(self, parser_helper):
tokens = parser_helper.get_remaining_tokens()
start = parser_helper.pos
end = parser_helper.get_last_token_pos()
nodes_sequences = self.get_nodes_sequences_from_tokens(parser_helper, start, end, tokens)
if not nodes_sequences:
return nodes_sequences
# only take the requested number of variables
sequences_to_keep = []
for sequence in nodes_sequences:
if len(sequence) < len(self.expected_variables):
continue # not enough parameters to feed the VariableExpression
sequences_to_keep.append(sequence[:len(self.expected_variables)])
return sequences_to_keep
def get_nodes_sequences_when_variables_are_in_between(self, parser_helper):
start = parser_helper.pos
end = parser_helper.get_last_token_pos()
# start by the end, to be the greediest
while end >= start:
parser_helper.seek(end)
node = self.next_node_to_parse.parse(parser_helper)
if node and node.end != -1:
break
end -= 1
tokens = parser_helper.parser.parser_input.tokens[start:end]
parser_helper.seek(end) # for the next node
return self.get_nodes_sequences_from_tokens(parser_helper, start, end, tokens)
def _parse(self, parser_helper):
if parser_helper.debugger.is_enabled():
debug_prefix = self.debug_prefix("VariableExpression", parser_helper)
debug_vars = {"pos": parser_helper.pos,
"expected variables": self.expected_variables,
"next to match": self.next_node_to_parse}
debug_text = self.debug_to_raw(debug_vars)
parser_helper.debug_concept(debug_prefix, color="cyan", raw=debug_text)
if self.before_first_token_node:
nodes_sequences = self.get_nodes_sequences_when_variables_are_first(parser_helper)
elif not self.next_node_to_parse:
nodes_sequences = self.get_nodes_sequences_when_variables_are_last(parser_helper)
else:
nodes_sequences = self.get_nodes_sequences_when_variables_are_in_between(parser_helper)
if nodes_sequences is None or self.has_unrecognized(nodes_sequences):
# nothing is recognized or only part is recognized
return None
all_results = []
for nodes_sequence in nodes_sequences:
# this outer loop deals with when there a multiple choices
# ie, the result in either nodes_sequence_1 or nodes_sequence_2, etc..
ptree_nodes = []
for variable_expr, node in zip(self.expected_variables, nodes_sequence):
# this inner loop deals with results with multiples concepts in a row
# ie the result is a sequence of node_1, then node_2, etc...
resolved = self.get_resolved(node)
if resolved is None:
parser_helper.errors.append(f"Failed to recognize {node.source}")
break
ptree_nodes.append(TerminalNode(variable_expr, node.start, node.end, node.source, resolved))
if len(ptree_nodes) != len(nodes_sequence):
# it means that we did not recognize all the nodes
# So it's a mismatch
continue
# finally adds the results
if len(ptree_nodes) == 1:
all_results.append(ptree_nodes[0])
else:
all_results.append(ptree_nodes)
if len(all_results) == 0:
return None
# every seems to be fine. We can pop the nodes from parser_helper used as variable
if self.before_first_token_node:
for i in range(len(self.expected_variables)):
parser_helper.sequence.pop()
if len(all_results) == 1:
return all_results[0]
# all results are valid, let's return them
parsing_contexts = [ParsingContext(ptree_node, parser_helper.pos) for ptree_node in all_results]
return MultiNode(parsing_contexts)
@staticmethod
def get_resolved(node):
"""
Turn Lexer node into Concept, Rule or List[ReturnValueConcept], (basically what is
expected by SheerkaEvaluateConcept.resolve())
May be merged with builtin_helpers.update_compiled() ?
:param node:
:return:
"""
if isinstance(node, UnrecognizedTokensNode):
return None
if isinstance(node, RuleNode):
return node.rule
if isinstance(node, ConceptNode):
return node.concept
if isinstance(node, (SourceCodeNode, SourceCodeWithConceptNode)):
return node.return_value
raise NotImplementedError()
@staticmethod
def get_nodes_sequences_from_tokens(parser_helper, start, end, tokens):
if len(tokens) == 0:
return None
utn = UnrecognizedTokensNode(start, end, tokens)
nodes_sequences = parser_helper.parser.cache2.get_lexer_nodes_from_unrecognized(parser_helper.parser.context,
utn)
return nodes_sequences
@staticmethod
def has_unrecognized(nodes_sequence: list):
for n in nodes_sequence:
if isinstance(n, UnrecognizedTokensNode):
return True
return False
class Sequence(ParsingExpression):
"""
Will match sequence of parser expressions in exact order they are defined.
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = parser_helper.pos
if parser_helper.debugger.is_enabled():
debug_prefix = self.debug_prefix("Sequence", parser_helper)
debug_vars = {"pos": parser_helper.pos,
"nodes": self.nodes,
"to_match": self.debug_remaining_text(parser_helper)}
debug_text = self.debug_to_raw(debug_vars)
parser_helper.debug_concept(debug_prefix, color="cyan", raw=debug_text)
ntn = NonTerminalNode(self,
init_pos,
end_pos,
None,
[])
parsing_contexts = [ParsingContext(ntn, parser_helper.pos)]
to_append = []
for e in self.nodes:
for pcontext in parsing_contexts:
if parser_helper.debugger.is_enabled():
parser_helper.debug_concept(debug_prefix, node=e, pcontext=pcontext)
if pcontext.next_results:
node = pcontext.next_results
else:
parser_helper.seek(pcontext.pos)
node = e.parse(parser_helper)
if node is None:
pcontext.to_remove = True
elif isinstance(node, MultiNode):
clones = pcontext * len(node.results) # clones pcontext (and first item is current pcontext)
to_append.extend(clones[1:])
for clone, node_pcontext in zip(clones, node.results):
clone.update_with_ptree_node(node_pcontext.node, node_pcontext.pos)
else:
pcontext.update_with_ptree_node(node, parser_helper.pos)
# clean up and reorganize list of parsing_contexts
parsing_contexts.extend(to_append)
core.utils.remove_from_list(parsing_contexts, lambda pc: pc.to_remove)
if len(parsing_contexts) == 0:
if parser_helper.debugger.is_enabled():
parser_helper.debug_concept(debug_prefix,
raw="All pcontexts are failed. Sequence failed",
color="red")
return None
to_append.clear()
# reset tokenizer for the following pexpression
parser_helper.seek(parsing_contexts[0].pos)
# update nodes sources and tokens
for pcontext in parsing_contexts:
pcontext.fix_tokens(parser_helper)
if len(parsing_contexts) == 1:
# parser_helper.debugger.debug_log(f"<< Found match '{parsing_contexts[0].node.source}'")
return parsing_contexts[0].node
# parser_helper.debugger.debug_log(f"<< Found matches {[r.node.source for r in parsing_contexts]}")
return MultiNode(parsing_contexts)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class OrderedChoice(ParsingExpression):
"""
Will match the first one among multiple
It will stop at the first match (so the order of definition is important)
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
for e in self.nodes:
node = e.parse(parser_helper)
if node:
return NonTerminalNode(self,
init_pos,
node.end,
parser_helper.parser.parser_input.tokens[init_pos: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
return None
def __repr__(self):
to_str = "| ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
class UnOrderedChoice(ParsingExpression):
"""
May match many nodes. It will return nodes sorted by length
All elements will be tested, so the order is not important
The behaviour when multiple candidates with same length are found is not defined yet
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
parsing_contexts = []
if parser_helper.debugger.is_enabled():
debug_prefix = self.debug_prefix("UnOrderedChoice", parser_helper)
debug_vars = {"pos": parser_helper.pos, "text": self.debug_remaining_text(parser_helper)}
debug_text = self.debug_to_raw(debug_vars)
parser_helper.debug_concept(debug_prefix, color="cyan", raw=debug_text)
debug_text = ""
for e in self.nodes:
if isinstance(e, ConceptExpression) and e.concept.id in parser_helper.get_concepts_ids():
# avoid circular reference
continue
node = e.parse(parser_helper)
if node:
debug_text += CCM["green"] + str(e) + CCM["reset"] + ", "
if isinstance(node, MultiNode):
node.combine(self)
parsing_contexts.extend(node.results)
else:
tn = NonTerminalNode(self,
init_pos,
node.end,
parser_helper.parser.parser_input.tokens[init_pos: node.end + 1],
[node])
parsing_contexts.append(ParsingContext(tn, parser_helper.pos))
else:
debug_text += f"{e}, "
parser_helper.seek(init_pos) # backtrack
if parser_helper.debugger.is_enabled():
parser_helper.debug_concept(debug_prefix, raw=f"[{debug_text}]")
if len(parsing_contexts) == 0:
return None
parser_helper.seek(parsing_contexts[0].pos)
# Try to simplify the parsing_context
simplified_parsing_contexts = self.simplify(parsing_contexts)
if parser_helper.debugger.is_enabled() and len(simplified_parsing_contexts) != len(parsing_contexts):
parser_helper.debug_concept(debug_prefix, simplified=simplified_parsing_contexts)
if len(simplified_parsing_contexts) == 1:
return simplified_parsing_contexts[0].node
else:
simplified_parsing_contexts.sort(key=attrgetter("pos"), reverse=True)
return MultiNode(simplified_parsing_contexts)
def __repr__(self):
to_str = "# ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})")
@staticmethod
def simplify(parsing_contexts: List[ParsingContext]):
"""
Try to remove redundant parsing context
for example, if
color is an adjective
red is an adjective
red is a color
when parsing 'red' we will receive two parsing context
one for 'red'
one for 'color' -> 'red'
The second one should be discarded
:param parsing_contexts:
:return:
"""
if len(parsing_contexts) == 1:
return parsing_contexts
by_target = {}
for pc in parsing_contexts:
by_target.setdefault(pc.node.source, []).append((pc, pc.get_depth()))
res = []
for k, tuple_pc_pc_depth in by_target.items():
min_depth = min([pc_depth for pc, pc_depth in tuple_pc_pc_depth])
res.extend([pc for pc, pc_depth in tuple_pc_pc_depth if pc_depth == min_depth])
return res
class Optional(ParsingExpression):
"""
Will match or not the elements
if many matches, will choose longest one
If you need order, use Optional(OrderedChoice)
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
selected_node = NonTerminalNode(self, parser_helper.pos, -1, [], []) # means that nothing is found
for e in self.nodes:
node = e.parse(parser_helper)
if node:
if node.end > selected_node.end:
selected_node = NonTerminalNode(
self,
node.start,
node.end,
parser_helper.parser.parser_input.tokens[node.start: node.end + 1],
[node])
parser_helper.seek(init_pos) # backtrack
if selected_node.end != -1:
parser_helper.seek(selected_node.end)
parser_helper.next_token() # eat the tokens found
return selected_node
def __repr__(self):
if len(self.elements) == 1:
return f"{self.elements[0]}?"
else:
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})?")
class Repetition(ParsingExpression):
"""
Base class for all repetition-like parser expressions (?,*,+)
Args:
eolterm(bool): Flag that indicates that end of line should
terminate repetition match.
"""
def __init__(self, *elements, **kwargs):
super(Repetition, self).__init__(*elements, **kwargs)
self.sep = kwargs.get('sep', None)
def clone(self):
return Repetition(self.elements,
rule_name=self.rule_name,
nodes=self.nodes,
sep=self.sep)
class ZeroOrMore(Repetition):
"""
ZeroOrMore will try to match parser expression specified zero or more
times. It will never fail.
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0:
return NonTerminalNode(self, init_pos, -1, [], [])
return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})*")
class OneOrMore(Repetition):
"""
OneOrMore will try to match parser expression specified one or more times.
TODO : implement MultiNode support
"""
def _parse(self, parser_helper):
init_pos = parser_helper.pos
end_pos = -1
children = []
while True:
current_pos = parser_helper.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser_helper)
if sep_result is None:
parser_helper.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser_helper)
if node is None:
parser_helper.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0: # if nothing is found, it's an error
return None
return NonTerminalNode(self,
init_pos,
end_pos,
parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1],
children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return self.add_rule_name_if_needed(f"({to_str})+")
class UnorderedGroup(Repetition):
"""
Will try to match all of the parsing expression in any order.
"""
def _parse(self, parser):
raise NotImplementedError()
# def __repr__(self):
# to_str = ", ".join(repr(n) for n in self.elements)
# return f"({to_str})#"
class Match(ParsingExpression):
"""
Base class for all classes that will try to match something from the input.
"""
def __init__(self, rule_name, root=False):
super(Match, self).__init__(rule_name=rule_name, root=root)
def parse(self, parser):
return self._parse(parser)
class StrMatch(Match):
"""
Matches a literal
"""
def __init__(self, to_match, rule_name="", ignore_case=True, skip_whitespace=True):
super(Match, self).__init__(rule_name=rule_name)
self.to_match = to_match
self.ignore_case = ignore_case
self.skip_white_space = skip_whitespace
def __repr__(self):
text = self.to_match
if not self.ignore_case:
text += "#!ic"
if not self.skip_white_space:
text += "#!sw"
return self.add_rule_name_if_needed(f"'{text}'")
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, StrMatch):
return False
return self.to_match == other.to_match and \
self.ignore_case == other.ignore_case and \
self.skip_white_space == other.skip_white_space
def __hash__(self):
return hash(("StrMatch", self.to_match, self.ignore_case, self.skip_white_space))
def _parse(self, parser_helper):
token = parser_helper.get_token()
if parser_helper.debugger.is_enabled():
debug_prefix = self.debug_prefix("StrMatch", parser_helper)
debug_text = f"pos={parser_helper.pos}, to_match={self.to_match}, token={token.str_value}"
parser_helper.debug_concept(debug_prefix, raw=f"{CCM['green']}{debug_text}{CCM['reset']}")
m = token.str_value.lower() == self.to_match.lower() if self.ignore_case \
else token.strip_quote == self.to_match
if m:
if parser_helper.debugger.is_enabled():
parser_helper.debug_concept(debug_prefix, raw=f"{CCM['green']}{debug_text}{CCM['reset']}")
node = TerminalNode(self, parser_helper.pos, parser_helper.pos, token.str_value, token.str_value)
parser_helper.next_token(self.skip_white_space)
return node
if parser_helper.debugger.is_enabled():
parser_helper.debug_concept(debug_prefix, raw=f"{CCM['red']}{debug_text}{CCM['reset']}")
return None
class RegExMatch(Match):
"""
Matches regular expression
"""
def __init__(self, to_match, rule_name="", ignore_case=True, multiline=None):
super(Match, self).__init__(rule_name=rule_name)
self.to_match = to_match
self.ignore_case = ignore_case
self.multiline = multiline
self.explicit_flags = re.MULTILINE
self.regex = None
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, RegExMatch):
return False
return self.to_match == other.to_match and \
self.ignore_case == other.ignore_case and \
self.multiline == other.multiline and \
self.explicit_flags == other.explicit_flags
def __hash__(self):
return hash(("RegExMatch", self.to_match, self.ignore_case, self.multiline, self.explicit_flags))
def __repr__(self):
text = self.to_match
if not self.ignore_case:
text += "#!ic"
if self.multiline:
text += "#ml"
return self.add_rule_name_if_needed(f"r'{text}'")
def compile(self):
flags = RegExDef.compile_flags(self.ignore_case, self.multiline, self.explicit_flags)
self.regex = re.compile(self.to_match, flags)
def _parse(self, parser_helper):
text = parser_helper.get_parsing_text()
# if parser_helper.debugger.is_enabled():
# debug_prefix = self.debug_prefix("StrMatch", parser_helper)
# text_debug = text[:12] + "..." if len(text) > 12 else text
# debug_text = f"pos={parser_helper.pos}, to_match={self.to_match}, text={text_debug}"
# parser_helper.debug_concept(debug_prefix, raw=f"{CCM['green']}{debug_text}{CCM['reset']}")
m = self.regex.match(text, parser_helper.token.index)
if m:
matched = m.group()
# TODO: Add debug info here
if matched:
# the match is only valid if it fits with the actual tokens
next_pos = parser_helper.get_next_matching_pos(m.end())
if next_pos is NotFound:
parser_helper.errors.append(NoMatchingTokenError(m.end()))
return None
node = TerminalNode(self, parser_helper.pos, next_pos - 1, matched, matched)
parser_helper.seek(next_pos - 1)
parser_helper.next_token()
return node
# if parser_helper.debugger.is_enabled():
# parser_helper.debug_concept(debug_prefix, raw=f"{CCM['red']}{debug_text}{CCM['reset']}")
return None
class ParsingExpressionVisitor:
"""
visit ParsingExpression
"""
STOP = "##_Stop_##"
def __init__(self, get_nodes=None, circular_ref_strategy=None):
self.get_nodes = get_nodes or (lambda pe: pe.elements)
self.circular_ref_strategy = circular_ref_strategy
self.seen = set() if circular_ref_strategy else None
def visit(self, parsing_expression):
name = parsing_expression.__class__.__name__
if self.circular_ref_strategy:
if id(parsing_expression) in self.seen:
if self.circular_ref_strategy == "skip":
return
raise RecursionError(f"circular ref detected : {self}")
self.seen.add(id(parsing_expression))
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(parsing_expression)
def generic_visit(self, parsing_expression):
if hasattr(self, "visit_all"):
self.visit_all(parsing_expression)
self.visit_children(parsing_expression)
def visit_children(self, parsing_expression):
for node in self.get_nodes(parsing_expression):
if isinstance(node, Concept):
res = self.visit(ConceptExpression(node.key or node.name))
elif isinstance(node, str):
res = self.visit(StrMatch(node))
else:
res = self.visit(node)
if res == self.STOP:
return
class BnfNodeFirstTokenVisitor(ParsingExpressionVisitor):
def __init__(self, sheerka):
super().__init__()
self.sheerka = sheerka
self.first_tokens = None
def add_first_token(self, first_token):
if not self.first_tokens:
self.first_tokens = [first_token]
else:
self.first_tokens.append(first_token)
def visit_ConceptExpression(self, pe):
concept = self.sheerka.get_by_key(pe.concept) if isinstance(pe.concept, str) else pe.concept
if self.sheerka.is_known(concept):
self.add_first_token(core.utils.str_concept(concept, drop_name=True))
return self.STOP
def visit_StrMatch(self, pe):
if not pe.to_match:
return
self.add_first_token(pe.to_match)
return self.STOP
def visit_RegExMatch(self, pe):
if not pe.to_match:
return
self.add_first_token(RegExDef(pe.to_match, pe.ignore_case, pe.multiline, pe.explicit_flags))
return self.STOP
def visit_OrderedChoice(self, parsing_expression):
for node in parsing_expression.elements:
self.visit(node)
return self.STOP
def visit_UnOrderedChoice(self, parsing_expression):
for node in parsing_expression.elements:
self.visit(node)
return self.STOP
class BnfNodeConceptExpressionVisitor(ParsingExpressionVisitor):
def __init__(self):
super().__init__()
self.references = []
def visit_ConceptExpression(self, pe):
self.references.append(pe.concept)
class HasAChoiceExpressionVisitor(ParsingExpressionVisitor):
def __init__(self):
super().__init__()
self.result = False
def visit_OrderedChoice(self, parsing_expression):
self.result = True
def visit_UnOrderedChoice(self, parsing_expression):
self.result = True
class BnfConceptParserHelper:
def __init__(self, parser, debugger):
self.parser = parser
self.debugger = debugger
self.debug = [] # keep track of the tokens
self.errors = [] # sink of errors
self.sequence = [] # output. List of lexer nodes correctly parsed
self.concepts = [] # stack of concepts being processed (fed by ConceptExpression)
self.concepts_ids = [] # ids if the concept to increase speed
self.rules_names = [] # stack of concepts rules names
self.concept_depth = 0 # depth of concept (+1 for each ConceptExpression which is not an OrderedChoice)
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
self.has_unrecognized = False
self.bnf_parsed = False
self.forked = []
self.token = None
self.pos = -1
def __repr__(self):
nodes = core.builtin_helpers.debug_nodes(self.sequence)
return f"BnfConceptParserHelper({nodes})"
def __eq__(self, other):
if id(self) == id(other):
return True
if not isinstance(other, BnfConceptParserHelper):
return False
return self.sequence == other.sequence and self.errors == other.errors
def __hash__(self):
return len(self.sequence) + len(self.errors)
def debug_concept(self, text, **kwargs):
if len(self.concepts) <= 2:
self.debugger.debug_concept(self.concepts[0], text, **kwargs)
def get_current_rule_name(self):
for rule_name in reversed(self.rules_names):
if rule_name:
return rule_name
def push_concept(self, concept):
self.concepts.append(concept)
self.concepts_ids.append(concept.id)
def pop_concept(self):
self.concepts.pop()
self.concepts_ids.pop()
def get_concepts_ids(self):
return self.concepts_ids
def get_token(self) -> Token:
return self.token
def get_parsing_text(self) -> str:
return self.parser.parser_input.sub_text
def get_remaining_tokens(self):
return self.parser.parser_input.tokens[self.pos:-1] # do not return the trailing EOF
def get_last_token_pos(self):
last_token = self.parser.parser_input.tokens[self.parser.parser_input.end]
return self.parser.parser_input.end - 1 if last_token.type == TokenKind.EOF else self.parser.parser_input.end
def next_token(self, skip_whitespace=True):
if self.token and self.token.type == TokenKind.EOF:
return False
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
if skip_whitespace:
while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE:
self.pos += 1
self.token = self.parser.parser_input.tokens[self.pos]
return self.token.type != TokenKind.EOF
def get_next_matching_pos(self, token_index):
"""
Given the token, tries to find a token (within the remaining tokens) that matches the index
:param token_index:
:return:
"""
current = self.pos
while current <= self.parser.parser_input.end:
if self.parser.parser_input.tokens[current].index == token_index:
return current
current += 1
# No matching token
return NotFound
def seek(self, pos):
self.pos = pos
self.token = self.parser.parser_input.tokens[self.pos]
def has_error(self):
return len(self.errors) > 0
def is_locked(self):
return self.parser.parser_input.pos <= self.pos or self.has_error()
def eat_concept(self, concept, token):
def _get_longest_valid_node(multi_node):
res = []
longest = -1
for node_res in multi_node.results:
if node_res.node is None or node_res.node.end == -1:
continue
if longest == -1 or node_res.pos == longest:
res.append(node_res.node)
longest = node_res.pos
else:
break
return None if len(res) == 0 else res[0] if len(res) == 1 else res
if self.is_locked():
return
try:
self.push_concept(concept)
self.debug.append(concept)
self.manage_unrecognized()
for forked in self.forked:
# manage the fact that some clone may have been forked
forked.eat_concept(concept, token)
# init
parsing_expression = self.parser.get_parsing_expression(self.parser.context, concept)
if not isinstance(parsing_expression, ParsingExpression):
self.debug.append(concept)
error_msg = f"Failed to parse concept '{concept}'"
if parsing_expression is not None:
error_msg += f". Reason: '{parsing_expression}'"
self.errors.append(GrammarErrorNode(error_msg))
return
self.pos = self.parser.parser_input.pos
self.token = self.parser.parser_input.tokens[self.pos]
# parse
self.debugger.debug_concept(concept, parsing_expression=parsing_expression)
node = parsing_expression.parse(self)
if isinstance(node, MultiNode):
node = _get_longest_valid_node(node)
if isinstance(node, list):
# multiple results are found.
# add the nodes to the forks
instances = [self]
for i in range(len(node) - 1):
clone = self.clone()
instances.append(clone)
self.forked.append(clone)
for instance, n in zip(instances, node):
instance.sequence.append(instance.create_concept_node(concept, n))
instance.pos = n.end
instance.bnf_parsed = True
elif isinstance(node, ParseTreeNode) and node.end != -1:
self.sequence.append(self.create_concept_node(concept, node))
self.pos = node.end
self.bnf_parsed = True
else:
self.debug.append(("Rewind", token))
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
self.pos = self.parser.parser_input.pos # reset position
finally:
self.concepts.pop()
def eat_unrecognized(self, token):
if self.is_locked():
return
self.debug.append(token)
self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos)
def manage_unrecognized(self):
if self.unrecognized_tokens.is_empty():
return
# do not put empty UnrecognizedToken in out
if self.unrecognized_tokens.is_whitespace():
self.unrecognized_tokens.reset()
return
self.unrecognized_tokens.fix_source()
# try to recognize concepts
nodes_sequences = self.parser.cache.get_lexer_nodes_from_unrecognized(self.parser.context,
self.unrecognized_tokens)
if nodes_sequences:
instances = [self]
for i in range(len(nodes_sequences) - 1):
clone = self.clone()
instances.append(clone)
self.forked.append(clone)
for instance, node_sequence in zip(instances, nodes_sequences):
for node in node_sequence:
instance.sequence.append(node)
if isinstance(node, UnrecognizedTokensNode) or \
hasattr(node, "unrecognized_tokens") and node.unrecognized_tokens:
instance.has_unrecognized = True
instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
else:
self.sequence.append(self.unrecognized_tokens)
self.has_unrecognized = True
# create another instance
self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, [])
def clone(self):
clone = BnfConceptParserHelper(self.parser, self.debugger)
clone.debug = self.debug[:]
clone.errors = self.errors[:]
clone.sequence = self.sequence[:]
clone.unrecognized_tokens = self.unrecognized_tokens.clone()
clone.has_unrecognized = self.has_unrecognized
clone.bnf_parsed = self.bnf_parsed
clone.pos = self.pos
return clone
def finalize(self):
if self.bnf_parsed:
self.manage_unrecognized()
for forked in self.forked:
# manage that some clones may have been forked
forked.finalize()
def create_concept_node(self, template, underlying):
sheerka = self.parser.context.sheerka
key = (template.key, template.id) if template.id else template.key
concept = sheerka.new(key)
concept = self.finalize_concept(sheerka, concept, underlying)
concept.get_hints().use_copy = True
concept_node = ConceptNode(concept,
underlying.start,
underlying.end,
self.parser.parser_input.tokens[underlying.start: underlying.end + 1],
None,
underlying)
return concept_node
def finalize_concept(self, sheerka, concept, underlying, init_empty_body=True):
"""
Updates the properties of the concept
Goes in recursion if the property is a concept
"""
# this cache is to make sure that we return the same concept for the same ConceptExpression
_underlying_value_cache = {}
def _add_compiled(_concept, prop_name, value):
"""
Adds a new entry,
makes a list if the property already exists
"""
if prop_name not in _concept.get_compiled() or _concept.get_compiled()[prop_name] is None:
# new entry
_concept.get_compiled()[prop_name] = value
else:
# make a list if there was a value
previous_value = _concept.get_compiled()[prop_name]
if isinstance(previous_value, list):
previous_value.append(value)
else:
new_value = [previous_value, value]
_concept.get_compiled()[prop_name] = new_value
def _look_for_concept_match(_underlying):
"""
At some point, there is either an StrMatch or a ConceptMatch,
that allowed the recognition.
Look for the ConceptMatch, with recursion if needed
"""
if isinstance(_underlying.parsing_expression, ConceptExpression):
return _underlying
if not isinstance(_underlying, NonTerminalNode):
return None
if len(_underlying.children) != 1:
return None
return _look_for_concept_match(_underlying.children[0])
def _get_underlying_value(_underlying):
if (concept_match_node := _look_for_concept_match(_underlying)) is not None:
# the value is a concept
if id(concept_match_node) in _underlying_value_cache:
result = _underlying_value_cache[id(concept_match_node)]
else:
ref_tpl = concept_match_node.parsing_expression.concept
new = sheerka.new_from_template(ref_tpl, ref_tpl.key)
result = self.finalize_concept(sheerka, new, concept_match_node.children[0], init_empty_body)
_underlying_value_cache[id(concept_match_node)] = result
elif not hasattr(_underlying, "value") or isinstance(_underlying.value, str):
result = DoNotResolve(_underlying.source)
else:
result = _underlying.value
return result
def _process_rule_name(_concept, _underlying):
if _underlying.parsing_expression.rule_name:
# make sure VariableExpression are only added once
if (not isinstance(_underlying.parsing_expression, VariableExpression) or
_underlying.parsing_expression.rule_name not in _concept.get_compiled()):
var_value = _get_underlying_value(_underlying)
_add_compiled(_concept, _underlying.parsing_expression.rule_name, var_value)
_concept.get_hints().need_validation = True
elif isinstance(_underlying, NonTerminalNode):
for child in _underlying.children:
_process_rule_name(_concept, child)
# first set the body to something if it is required
if init_empty_body and concept.get_metadata().body is None:
value = _get_underlying_value(underlying)
concept.get_compiled()[ConceptParts.BODY] = value
if underlying.parsing_expression.rule_name:
_add_compiled(concept, underlying.parsing_expression.rule_name, value)
# KSI : Why don't we set concept.get_hints().need_validation to True ?
# then recursively browse children to update concept variables
if isinstance(underlying, NonTerminalNode) and not isinstance(underlying.parsing_expression, ConceptExpression):
for node in underlying.children:
_process_rule_name(concept, node)
return concept
def get_node_value(self, node):
"""
Try to evaluate the value of a given ParseTreeNode (TerminalNode or NonTerminalNode)
:param node:
:return:
"""
if isinstance(node, TerminalNode):
return node.value
if isinstance(node.parsing_expression, ConceptExpression):
concept = node.parsing_expression.concept
finalized = self.finalize_concept(self.parser.sheerka, concept, node)
evaluated = core.builtin_helpers.ensure_evaluated(self.parser.context, finalized)
return evaluated.body
return None
@dataclass
class UnderConstruction:
concept_id: str
@dataclass()
class ToUpdate:
instance_id: int
parsing_expression: ParsingExpression
def __hash__(self):
return hash(self.instance_id)
class BnfNodeParser(BaseNodeParser):
NAME = "Bnf"
def __init__(self, **kwargs):
super().__init__(BnfNodeParser.NAME, 50, **kwargs)
if 'sheerka' in kwargs:
sheerka = kwargs.get("sheerka")
self.concepts_grammars = sheerka.get_concepts_bnf_definitions()
self.sheerka = sheerka
else:
self.concepts_grammars = Cache()
self.cache = UnrecognizedTokensCache(PARSERS)
self.cache2 = UnrecognizedTokensCache(VARIABLE_EXPR_PARSER)
self.ignore_case = True
@staticmethod
def _is_eligible(concept):
"""
Predicate that select concepts that must handled by BnfNodeParser
:param concept:
:return:
"""
return concept.get_metadata().definition_type == DEFINITION_TYPE_BNF
@staticmethod
def get_expression_from_concept_name(name):
"""
Create the parsing expression from the name
This function differs from BNFParser.parse() as it does not try to resolve identifiers into concepts
>>> assert get_expression_from_concept_name('one hundred') == Sequence(StrMatch("one"), StrMatch("hundred"))
while BNFParser.parse("one hundred") will look for concept 'one' and concept 'hundred'
:param name:
:return:
"""
if name is None or name.strip() == "":
return []
res = []
tokens = Tokenizer(name, yield_eof=False)
for token in tokens:
if token.type == TokenKind.WHITESPACE:
continue
elif token.type == TokenKind.STRING:
sub_tokens = list(Tokenizer(token.strip_quote, yield_eof=False))
for sub_token in sub_tokens[:-1]:
res.append(StrMatch(sub_token.str_value, skip_whitespace=False))
res.append(StrMatch(sub_tokens[-1].str_value))
else:
res.append(StrMatch(token.str_value))
return res[0] if len(res) == 1 else Sequence(*res)
def get_valid(self, parsers_helpers):
bnf_found = False
valid_parser_helpers = []
for parser_helper in parsers_helpers:
if parser_helper.bnf_parsed:
bnf_found = True
if parser_helper.has_error():
self.error_sink.extend(parser_helper.errors)
if not parser_helper.bnf_parsed or parser_helper.has_error():
continue
if parser_helper in valid_parser_helpers:
continue
valid_parser_helpers.append(parser_helper)
return valid_parser_helpers if bnf_found else None
def get_concepts_sequences(self, context):
"""
Main method that parses the tokens and extract the concepts
:return:
"""
def _add_forked_to_concept_parser_helpers():
# check that if some new InfixToPostfix are created
for parser in concept_parser_helpers:
if len(parser.forked) > 0:
forked.extend(parser.forked)
parser.forked.clear()
if len(forked) > 0:
concept_parser_helpers.extend(forked)
forked.clear()
def _get_longest(parser_helpers):
# when there is a match with several concepts
# on keep the ones that eat the more tokens
by_end_pos = defaultdict(list)
for helper in parser_helpers:
by_end_pos[helper.pos].append(helper)
return by_end_pos[max(by_end_pos)]
def _merge(list1, list2):
if not list1:
return list2
if not list2:
return list1
return list1 + list2
forked = []
debugger = context.get_debugger(self.NAME, "parse")
debugger.debug_entering(source=self.parser_input.as_text())
concept_parser_helpers = [BnfConceptParserHelper(self, debugger)]
while self.parser_input.next_token(False):
token = self.parser_input.token
if debugger.is_enabled():
debug_prefix = f"pos={self.parser_input.pos}, {token=}, {len(concept_parser_helpers)} parser(s)"
try:
# KSI 2021-02-13. I am not quite sure of the reason why we want to stop the processing
# if all the parsers are locked.
# It means that if we have two concepts 'foo bar baz' and 'bar baz'
# we are going to miss the sequence '[UTN('foo'), CN('bar baz')]
# ...
not_locked = [p for p in concept_parser_helpers if not p.is_locked()]
if len(not_locked) == 0:
if debugger.is_enabled():
debugger.debug_log(debug_prefix + ", all parsers are locked. Nothing to do.")
continue
by_token = context.sheerka.get_concepts_by_first_token(token, self._is_eligible, strip_quotes=False)
by_regex = context.sheerka.get_concepts_by_first_regex(self.parser_input.sub_text, token.index)
concepts = _merge(by_token, by_regex)
if not concepts:
if debugger.is_enabled():
debugger.debug_log(debug_prefix + ", no concept found.")
for concept_parser in not_locked:
concept_parser.eat_unrecognized(token)
continue
if debugger.is_enabled():
debugger.debug_log(debug_prefix + f", concept(s) found={concepts}")
if len(concepts) == 1:
for concept_parser in not_locked:
concept_parser.eat_concept(concepts[0], token)
continue
# make the cartesian product
temp_res = []
for concept_parser in concept_parser_helpers:
if concept_parser.is_locked():
# It means that it already eat the token
# so simply add it, do not clone
temp_res.append(concept_parser)
continue
for concept in concepts:
clone = concept_parser.clone()
temp_res.append(clone)
clone.eat_concept(concept, token)
if debugger.is_enabled():
debugger.debug_log(f"..{concept}, parsed={clone.bnf_parsed}, length={clone.pos}")
# only keep the longest
concept_parser_helpers = _get_longest(temp_res)
if debugger.is_enabled() and len(temp_res) > 1:
debugger.debug_log(f"Only keep longest -> {len(concept_parser_helpers)} parser(s) left")
finally:
_add_forked_to_concept_parser_helpers()
# make sure that remaining items in stack are moved to out
for concept_parser in concept_parser_helpers:
concept_parser.finalize()
_add_forked_to_concept_parser_helpers()
debugger.debug_var("result", concept_parser_helpers)
return concept_parser_helpers
def check_for_infinite_recursion(self, parsing_expression, already_found, in_recursion, only_first=False):
if isinstance(parsing_expression, ConceptExpression):
if parsing_expression.concept.id in already_found:
already_found.append(parsing_expression.concept.id) # add the id again, to know where the cycle starts
in_recursion.extend(already_found)
return True
already_found.append(parsing_expression.concept.id)
return self.check_for_infinite_recursion(parsing_expression.nodes[0],
already_found,
in_recursion,
only_first)
already_found_for_current_node = []
if isinstance(parsing_expression, Sequence):
# for sequence, we need to check all nodes (unless, only first)
if only_first:
nodes = [] if len(parsing_expression.nodes) == 0 else [parsing_expression.nodes[0]]
else:
nodes = parsing_expression.nodes
for node in nodes:
already_found_for_current_node.clear()
already_found_for_current_node.extend(already_found)
if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, False):
return True
return False
if isinstance(parsing_expression, OrderedChoice):
# for ordered choice, if there is at least one node that does not resolved to a recursion
# we are safe
for node in parsing_expression.nodes:
already_found_for_current_node.clear()
already_found_for_current_node.extend(already_found)
if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, True):
return True
else:
return False
return False
# if isinstance(parsing_expression, UnOrderedChoice):
# for node in parsing_expression.nodes:
# already_found_for_current_node.clear()
# already_found_for_current_node.extend(already_found.copy())
# if self.check_for_infinite_recursion(node, already_found_for_current_node, in_recursion, True):
# return True
# return False
return False
def get_parsing_expression(self, context, concept):
"""
Compute the parsing expression for a given concept
:param context:
:param concept:
:return:
"""
if concept.id in self.concepts_grammars:
return self.concepts_grammars.get(concept.id)
# internal cache of already computed parsing expression to use during the recursion
grammar = {}
# concept that are not totally resolved, because they reference parsing expression under construction
to_update = set() # the key is the instance id of the parsing expression
desc = f"Get parsing expression for concept {concept}"
with context.push(BuiltinConcepts.INIT_BNF, concept,
who=self.name,
obj=concept,
desc=desc) as sub_context:
# get the parsing expression
to_skip = {concept.id}
presult = self.resolve_concept_parsing_expression(sub_context, concept, None, grammar, to_skip, to_update)
# check and update parsing expression that are still under construction
for item in to_update:
pe = item.parsing_expression
for i, node in enumerate(pe.nodes):
if isinstance(node, UnderConstruction):
pe.nodes[i] = grammar.get(node.concept_id)
# check for infinite recursion definitions
already_seen = [concept.id]
in_recursion = [] # there may be cases where in_recursion is less than already_seen
concepts_in_recursion = self.check_for_infinite_recursion(presult, already_seen, in_recursion)
if concepts_in_recursion:
chicken_anf_egg = context.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=in_recursion)
for concept_id in in_recursion:
grammar[concept_id] = chicken_anf_egg
# update, in case of infinite recursion
presult = grammar[concept.id]
# finally, update the list of the known pexpression (self.concepts_grammars) for latter use
for k, v in grammar.items():
self.concepts_grammars.put(k, v)
sub_context.add_values(return_values=presult)
return presult
def resolve_concept_parsing_expression(self, context, concept, name, grammar, to_skip, to_update):
"""
:param context:
:param concept: concept
:param name: rule_name of the concept if exists
:param grammar: already resolved parsing expressions
:param to_skip: list of concepts to skip in order to avoid circular references (only for UnOrderedChoice pe)
:param to_update: parsing expressions that contains unresolved parsing expression
:return:
"""
sheerka = context.sheerka
# if sheerka.isaset(context, concept) and hasattr(context, "obj"):
# key_to_use = ConceptExpression.get_recursion_id(context.obj.id, concept.id, name)
# else:
# key_to_use = concept.id
key_to_use = concept.id
if key_to_use in self.concepts_grammars:
return self.concepts_grammars.get(key_to_use)
# # Use the global pexpression only if it does not contains UnOrderedChoice
# pe = self.concepts_grammars.get(key_to_use)
# if not pe.has_unordered_choice():
if key_to_use in grammar: # under construction entry
return grammar.get(key_to_use)
desc = f"Resolve concept parsing expression for '{concept}'. {key_to_use=}"
with context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as sub_context:
if not concept.get_bnf(): # 'if' is done outside to save a function call. Not sure it worth it.
core.builtin_helpers.ensure_bnf(sub_context, concept, self.name)
grammar[key_to_use] = UnderConstruction(concept.id)
if concept.get_metadata().definition_type == DEFINITION_TYPE_BNF:
expression = concept.get_bnf()
desc = f"Bnf concept detected. Resolving parsing expression '{expression}'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(expression=expression)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_skip, to_update)
ssc.add_values(return_values=resolved)
elif sheerka.isaset(context, concept):
desc = f"Concept is a group. Resolving parsing expression using 'isa'"
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
ssc.add_inputs(concept=concept)
concepts_in_group = self.sheerka.get_set_elements(ssc, concept)
# valid_concepts = [c for c in concepts_in_group if c.id not in to_skip]
valid_concepts = concepts_in_group
nodes = []
for c in valid_concepts:
nodes.append(ConceptExpression(c, rule_name=c.key))
resolved = self.resolve_parsing_expression(ssc,
UnOrderedChoice(*nodes),
grammar,
to_skip,
to_update)
ssc.add_values(concepts_in_group=concepts_in_group)
ssc.add_values(return_values=resolved)
else:
desc = f"Concept is a simple concept."
with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc:
expression = self.get_expression_from_concept_name(concept.name)
resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_skip, to_update)
grammar[key_to_use] = resolved
if self.has_error:
sub_context.add_values(errors=self.error_sink)
return None
sub_context.add_values(return_values=resolved)
return resolved
def resolve_parsing_expression(self, context, expression, grammar, to_skip, to_update):
if isinstance(expression, str):
ret = StrMatch(expression, ignore_case=self.ignore_case)
elif not isinstance(expression, ParsingExpression):
return expression # escalate the error
elif isinstance(expression, ConceptExpression):
concept = self.get_concept(context, expression.concept)
expression.concept = concept
if not self.sheerka.is_known(concept):
unknown_concept = self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=concept)
return self.add_error(unknown_concept)
inner_to_skip = to_skip.copy()
inner_to_skip.add(concept.id)
pe = self.resolve_concept_parsing_expression(context,
concept,
expression.rule_name,
grammar,
inner_to_skip,
to_update)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
if isinstance(pe, UnderConstruction):
to_update.add(ToUpdate(id(expression), expression))
expression.nodes = [pe]
expression.rule_name = expression.rule_name or concept.name
ret = expression
elif isinstance(expression, StrMatch):
ret = expression
if ret.ignore_case is None:
ret.ignore_case = self.ignore_case
elif isinstance(expression, RegExMatch):
# Regular expression are not compiled yet
# to support global settings propagation from
# parser.
ret = expression
if ret.ignore_case is None:
ret.ignore_case = self.ignore_case
ret.compile()
elif isinstance(expression, (Sequence, OrderedChoice, UnOrderedChoice, ZeroOrMore, OneOrMore, Optional)):
ret = expression
ret.nodes = []
for e in ret.elements:
if not isinstance(e, VariableExpression):
pe = self.resolve_parsing_expression(context, e, grammar, to_skip, to_update)
if not isinstance(pe, (ParsingExpression, UnderConstruction)):
return pe # an error is detected, escalate it
if isinstance(pe, UnderConstruction):
to_update.add(ToUpdate(id(expression), ret))
ret.nodes.append(pe)
else:
ret.nodes.append(e)
# manage VariableExpression
start_node = None # first non VariableExpression node
variable_expr_nodes = []
for i, e in enumerate(ret.nodes):
if isinstance(e, VariableExpression):
variable_expr_nodes.append(e)
e.before_first_token_node = start_node is None
if i < len(ret.nodes) - 1:
e.nodes.append(ret.nodes[i + 1])
else:
start_node = e
for variable_expr in variable_expr_nodes:
variable_expr.init_parsing()
else:
ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False)
# Translate separator expression.
if isinstance(ret, Repetition) and expression.sep:
expression.sep = self.resolve_parsing_expression(context,
expression.sep,
grammar,
to_skip,
to_update)
return ret
def get_concept(self, context, concept):
if isinstance(concept, Concept):
return concept
if context.concepts and concept in context.concepts:
return context.concepts[concept]
return self.sheerka.get_by_key(concept)
def parse(self, context, parser_input: ParserInput):
"""
parser_input can be string, but text can also be an list of tokens
:param context:
:param parser_input:
:return:
"""
if not isinstance(parser_input, ParserInput):
return None
context.log(f"Parsing '{parser_input}' with BnfNode", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.NOT_FOR_ME,
body=parser_input.as_text(),
reason=BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
sequences = self.get_concepts_sequences(context)
valid_parser_helpers = self.get_valid(sequences)
debugger = context.get_debugger(self.NAME, "parse")
if debugger.is_enabled:
debugger.debug_var("stats", self.cache.to_dict())
#debugger.debug_var("stats", self.cache2.to_dict())
if valid_parser_helpers is None:
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text(), reason=self.error_sink))
if len(valid_parser_helpers) == 0:
# token error
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
ret = []
for parser_helper in valid_parser_helpers:
ret.append(
self.sheerka.ret(
self.name,
not parser_helper.has_unrecognized,
self.sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=parser_input.as_text(),
body=parser_helper.sequence,
try_parsed=parser_helper.sequence)))
if len(ret) == 1:
self.log_result(context, parser_input.as_text(), ret[0])
return ret[0]
else:
self.log_multiple_results(context, parser_input.as_text(), ret)
return ret