Files
Sheerka-Old/parsers/ConceptLexerParser.py
T

797 lines
25 KiB
Python

#####################################################################################################
# This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio)
# I don't directly use the project, but it helped me figure out
# what to do.
# Dejanović I., Milosavljević G., Vaderna R.:
# Arpeggio: A flexible PEG parser for Python,
# Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004
#####################################################################################################
from dataclasses import field, dataclass
from collections import defaultdict
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
from core.tokenizer import TokenKind, Tokenizer, Token
from parsers.BaseParser import BaseParser, Node, ErrorNode
import core.utils
import logging
log = logging.getLogger(__name__)
def flatten(iterable):
if iterable is None:
return []
result = []
for e in iterable:
if e.parsing_expression.rule_name is not None and e.parsing_expression.rule_name != "":
if hasattr(e, "children"):
e.children = flatten(e.children)
result.append(e)
elif hasattr(e, "children"):
result.extend(flatten(e.children))
else:
result.append(e)
return result
@dataclass()
class LexerNode(Node):
start: int
end: int
tokens: list = None
source: str = None
def __post_init__(self):
if self.source is None:
self.source = BaseParser.get_text_from_tokens(self.tokens)
def __eq__(self, other):
if not isinstance(other, LexerNode):
return False
return self.start == other.start and self.end == other.end
class ConceptNode(LexerNode):
"""
Returned by the ConceptLexerParser
It represents a recognized concept
"""
def __init__(self, concept, start, end, tokens=None, source=None, underlying=None):
super().__init__(start, end, tokens, source)
self.concept = concept
self.underlying = underlying
if self.source is None:
self.source = BaseParser.get_text_from_tokens(self.tokens)
def __eq__(self, other):
if isinstance(other, tuple):
if len(other) == 2:
return self.concept == other[0] and self.source == other[1]
else:
return self.concept == other[0] and \
self.start == other[1] and \
self.end == other[2] and \
self.source == other[3]
if not super().__eq__(other):
return False
if not isinstance(other, ConceptNode):
return False
return self.concept == other.concept and \
self.source == other.source and \
self.underlying == other.underlying
def __hash__(self):
return hash((self.concept, self.start, self.end, self.source, self.underlying))
def __repr__(self):
return f"ConceptNode(concept='{self.concept}', start={self.start}, end={self.end}, source='{self.source}')"
class NonTerminalNode(LexerNode):
"""
Returned by the ConceptLexerParser
"""
def __init__(self, parsing_expression, start, end, tokens, children=None):
super().__init__(start, end, tokens)
self.parsing_expression = parsing_expression
self.children = children
def __repr__(self):
name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__
if len(self.children) > 0:
sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")"
else:
sub_names = ""
return name + sub_names
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, NonTerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.children == other.children
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.children))
class TerminalNode(LexerNode):
"""
Returned by the ConceptLexerParser
"""
def __init__(self, parsing_expression, start, end, value):
super().__init__(start, end, source=value)
self.parsing_expression = parsing_expression
self.value = value
def __repr__(self):
name = self.parsing_expression.rule_name or ""
return name + f"'{self.value}'"
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, TerminalNode):
return False
return self.parsing_expression == other.parsing_expression and \
self.start == other.start and \
self.end == other.end and \
self.value == other.value
def __hash__(self):
return hash((self.parsing_expression, self.start, self.end, self.value))
@dataclass()
class GrammarErrorNode(ErrorNode):
message: str
@dataclass()
class UnknownConceptNode(ErrorNode):
concept_key: str
@dataclass()
class TooManyConceptNode(ErrorNode):
concept_key: str
class ParsingExpression:
def __init__(self, *args, **kwargs):
self.elements = args
nodes = kwargs.get('nodes', [])
if not hasattr(nodes, '__iter__'):
nodes = [nodes]
self.nodes = nodes
self.rule_name = kwargs.get('rule_name', '')
def __eq__(self, other):
if not isinstance(other, ParsingExpression):
return False
return self.rule_name == other.rule_name and self.elements == other.elements
def __hash__(self):
return hash((self.rule_name, self.elements))
def parse(self, parser):
return self._parse(parser)
class Sequence(ParsingExpression):
"""
Will match sequence of parser expressions in exact order they are defined.
"""
def _parse(self, parser):
init_pos = parser.pos
end_pos = parser.pos
children = []
for e in self.nodes:
node = e.parse(parser)
if node is None:
return None
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return f"({to_str})"
class OrderedChoice(ParsingExpression):
"""
Will match one among multiple
It will stop at the first match (so the order of definition is important)
"""
def _parse(self, parser):
init_pos = parser.pos
for e in self.nodes:
node = e.parse(parser)
if node:
return NonTerminalNode(self, init_pos, node.end, parser.tokens[init_pos: node.end + 1], [node])
parser.seek(init_pos) # backtrack
return None
def __repr__(self):
to_str = "| ".join(repr(n) for n in self.elements)
return f"({to_str})"
class Optional(ParsingExpression):
"""
Will match or not the elements
if many matches, will choose longest one
If you need order, use Optional(OrderedChoice)
"""
def _parse(self, parser):
init_pos = parser.pos
selected_node = NonTerminalNode(self, parser.pos, -1, [], []) # means that nothing is found
for e in self.nodes:
node = e.parse(parser)
if node:
if node.end > selected_node.end:
selected_node = NonTerminalNode(
self,
node.start,
node.end,
parser.tokens[node.start: node.end + 1],
[node])
parser.seek(init_pos) # backtrack
if selected_node.end != -1:
parser.seek(selected_node.end)
parser.next_token() # eat the tokens found
return selected_node
def __repr__(self):
if len(self.elements) == 1:
return f"{self.elements[0]}?"
else:
to_str = ", ".join(repr(n) for n in self.elements)
return f"({to_str})?"
class Repetition(ParsingExpression):
"""
Base class for all repetition-like parser expressions (?,*,+)
Args:
eolterm(bool): Flag that indicates that end of line should
terminate repetition match.
"""
def __init__(self, *elements, **kwargs):
super(Repetition, self).__init__(*elements, **kwargs)
self.sep = kwargs.get('sep', None)
class ZeroOrMore(Repetition):
"""
ZeroOrMore will try to match parser expression specified zero or more
times. It will never fail.
"""
def _parse(self, parser):
init_pos = parser.pos
end_pos = -1
children = []
while True:
current_pos = parser.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser)
if sep_result is None:
parser.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser)
if node is None:
parser.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0:
return NonTerminalNode(self, init_pos, -1, [], [])
return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return f"({to_str})*"
class OneOrMore(Repetition):
"""
OneOrMore will try to match parser expression specified one or more times.
"""
def _parse(self, parser):
init_pos = parser.pos
end_pos = -1
children = []
while True:
current_pos = parser.pos
# maybe eat the separator if needed
if self.sep and children:
sep_result = self.sep.parse(parser)
if sep_result is None:
parser.seek(current_pos)
break
# eat the ZeroOrMore
node = self.nodes[0].parse(parser)
if node is None:
parser.seek(current_pos)
break
else:
if node.end != -1: # because returns -1 when no match
children.append(node)
end_pos = node.end
if len(children) == 0: # if nothing is found, it's an error
return None
return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children)
def __repr__(self):
to_str = ", ".join(repr(n) for n in self.elements)
return f"({to_str})+"
class UnorderedGroup(Repetition):
"""
Will try to match all of the parsing expression in any order.
"""
def _parse(self, parser):
raise NotImplementedError()
# def __repr__(self):
# to_str = ", ".join(repr(n) for n in self.elements)
# return f"({to_str})#"
class Match(ParsingExpression):
"""
Base class for all classes that will try to match something from the input.
"""
def __init__(self, rule_name, root=False):
super(Match, self).__init__(rule_name=rule_name, root=root)
def parse(self, parser):
result = self._parse(parser)
return result
class StrMatch(Match):
"""
Matches a literal
"""
def __init__(self, to_match, rule_name="", root=False, ignore_case=True):
super(Match, self).__init__(rule_name=rule_name, root=root)
self.to_match = to_match
self.ignore_case = ignore_case
def __repr__(self):
return f"'{self.to_match}'"
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, StrMatch):
return False
return self.to_match == other.to_match and self.ignore_case == other.ignore_case
def _parse(self, parser):
token = parser.get_token()
m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \
else token.value == self.to_match
if m:
node = TerminalNode(self, parser.pos, parser.pos, token.value)
parser.next_token()
return node
return None
class ConceptMatch(Match):
"""
Will match a concept
It used only for rule definition
When the grammar is created, it is replaced by the actual concept
"""
def __init__(self, concept, rule_name=""):
super(Match, self).__init__(rule_name=rule_name)
self.concept = concept
def __repr__(self):
return f"{self.concept}"
def __eq__(self, other):
if not super().__eq__(other):
return False
if not isinstance(other, ConceptMatch):
return False
if isinstance(self.concept, Concept):
return self.concept.name == other.concept.name
return self.concept == other.concept
def _parse(self, parser):
to_match = parser.get_concept(self.concept) if isinstance(self.concept, str) else self.concept
if parser.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT):
return None
if to_match not in parser.concepts_grammars:
return None
self.concept = to_match # Memoize
node = parser.concepts_grammars[to_match].parse(parser)
if node is None:
return None
return NonTerminalNode(self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node])
class ConceptLexerParser(BaseParser):
def __init__(self, **kwargs):
super().__init__("ConceptLexer")
if 'grammars' in kwargs:
self.concepts_grammars = kwargs.get("grammars")
elif 'sheerka' in kwargs:
self.concepts_grammars = kwargs.get("sheerka").concepts_grammars
else:
self.concepts_grammars = {}
self.ignore_case = True
self.token = None
self.pos = -1
self.tokens = None
self.context = None
self.text = None
self.sheerka = None
def add_error(self, error, next_token=True):
self.has_error = True
self.error_sink.append(error)
if next_token:
self.next_token()
return error
def reset_parser(self, context, text):
self.context = context
self.sheerka = context.sheerka
self.text = text
if isinstance(text, str):
self.tokens = list(Tokenizer(text))
else:
self.tokens = list(text)
self.tokens.append(Token(TokenKind.EOF, "", -1, -1, -1)) # make sure to finish with end of file token
self.token = None
self.pos = -1
self.next_token()
def get_token(self) -> Token:
return self.token
def next_token(self, skip_whitespace=True):
if self.token and self.token.type == TokenKind.EOF:
return False
self.pos += 1
self.token = self.tokens[self.pos]
if skip_whitespace:
while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE:
self.pos += 1
self.token = self.tokens[self.pos]
return self.token.type != TokenKind.EOF
def seek(self, pos):
self.pos = pos
self.token = self.tokens[self.pos]
return True
def rewind(self, offset, skip_whitespace=True):
self.pos += offset
self.token = self.tokens[self.pos]
if skip_whitespace:
while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE):
self.pos -= 1
self.token = self.tokens[self.pos]
def initialize(self, context, concepts_definitions):
"""
Adds a bunch of concepts, and how they can be recognized
:param context: execution context
:param concepts_definitions: dictionary of concept, concept_definition
:return:
"""
self.context = context
self.sheerka = context.sheerka
concepts_to_resolve = set()
# ## Gets the grammars
for concept, concept_def in concepts_definitions.items():
concept.init_key() # make sure that the key is initialized
grammar = self.get_model(concept_def, concepts_to_resolve)
self.concepts_grammars[concept] = grammar
if self.has_error:
return self.sheerka.ret(self.name, False, self.error_sink)
# ## Removes concepts with infinite recursions
concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve)
for concept in concepts_to_remove:
concepts_to_resolve.remove(concept)
del self.concepts_grammars[concept]
if self.has_error:
return self.sheerka.ret(self.name, False, self.error_sink)
else:
return self.sheerka.ret(self.name, True, self.concepts_grammars)
def get_concept(self, concept_name):
if concept_name in self.context.concepts_cache:
return self.context.concepts_cache[concept_name]
return self.sheerka.get(concept_name)
def get_model(self, concept_def, concepts_to_resolve):
# TODO
# inner_get_model must not modify the initial ParsingExpression
# A copy must be created
def inner_get_model(expression):
if isinstance(expression, Concept):
ret = ConceptMatch(expression, rule_name=expression.name)
concepts_to_resolve.add(expression)
elif isinstance(expression, ConceptMatch):
if expression.rule_name is None or expression.rule_name == "":
expression.rule_name = expression.concept.name if isinstance(expression.concept, Concept) \
else expression.concept
concepts_to_resolve.add(expression.concept)
ret = expression
elif isinstance(expression, str):
ret = StrMatch(expression, ignore_case=self.ignore_case)
elif isinstance(expression, StrMatch):
ret = expression
if ret.ignore_case is None:
ret.ignore_case = self.ignore_case
elif isinstance(expression, Sequence) or \
isinstance(expression, OrderedChoice) or \
isinstance(expression, ZeroOrMore) or \
isinstance(expression, OneOrMore) or \
isinstance(expression, Optional):
ret = expression
ret.nodes.extend([inner_get_model(e) for e in ret.elements])
else:
ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False)
# Translate separator expression.
if isinstance(expression, Repetition) and expression.sep:
expression.sep = inner_get_model(expression.sep)
return ret
model = inner_get_model(concept_def)
return model
def detect_infinite_recursion(self, concepts_to_resolve):
# infinite recursion matcher
def _is_infinite_recursion(ref_concept, node):
if isinstance(node, ConceptMatch):
if node.concept == ref_concept:
return True
if isinstance(node.concept, str):
to_match = self.get_concept(node.concept)
if self.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT):
return False
else:
to_match = node.concept
return _is_infinite_recursion(ref_concept, self.concepts_grammars[to_match])
if isinstance(node, OrderedChoice):
return _is_infinite_recursion(ref_concept, node.nodes[0])
if isinstance(node, Sequence):
for node in node.nodes:
if _is_infinite_recursion(ref_concept, node):
return True
return False
return False
removed_concepts = []
for e in concepts_to_resolve:
if isinstance(e, str):
e = self.get_concept(e)
if self.sheerka.isinstance(e, BuiltinConcepts.UNKNOWN_CONCEPT):
continue
to_resolve = self.concepts_grammars[e]
if _is_infinite_recursion(e, to_resolve):
removed_concepts.append(e)
return removed_concepts
def parse(self, context, text):
if text == "":
return context.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.IS_EMPTY)
)
self.reset_parser(context, text)
concepts_found = [[]]
# actually list of list
# The first dimension is the number of possibilities found
# The second dimension is the number of concepts found, under one possibility
#
# Example 1
# concept foo : 'one' 'two'
# concept bar : 'one' 'two'
# input 'one two' -> will produce two possibilities (foo and bar).
#
# Example 2
# concept foo : 'one'
# concept bar : 'two'
# input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar)
while True:
init_pos = self.pos
res = []
for concept, grammar in self.concepts_grammars.items():
self.seek(init_pos)
node = grammar.parse(self) # a node is TerminalNode or NonTerminalNode
if node is not None and node.end != -1:
concept_node = ConceptNode(
concept,
node.start,
node.end,
self.tokens[node.start: node.end + 1],
None,
node)
res.append(concept_node)
if len(res) == 0: # not recognized
self.seek(init_pos)
not_recognized = self.get_text_from_tokens(self.get_token())
self.add_error(self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=not_recognized))
break
res = self.get_bests(res) # only keep the concepts that eat the more tokens
concepts_found = core.utils.product(concepts_found, res)
# loop
self.seek(res[0].end)
if not self.next_token():
break
# manage when nothing is recognized (or other error)
if self.has_error:
return self.sheerka.ret(
self.name,
False,
self.sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=text,
body=self.error_sink,
try_parsed=concepts_found[0] if len(concepts_found) == 1 else concepts_found))
# else
# returns as many ReturnValue than choices found
ret = []
for choice in concepts_found:
ret.append(
self.sheerka.ret(
self.name,
True,
self.sheerka.new(
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=text,
body=choice,
try_parsed=choice)))
return ret[0] if len(ret) == 1 else ret
@staticmethod
def get_bests(results):
"""
Returns the result that is the longest
:param results:
:return:
"""
by_end_pos = defaultdict(list)
for result in results:
by_end_pos[result.end].append(result)
return by_end_pos[max(by_end_pos)]
class ParsingExpressionVisitor:
"""
visit ParsingExpression
"""
def visit(self, parsing_expression):
name = parsing_expression.__class__.__name__
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(parsing_expression)
def generic_visit(self, parsing_expression):
if hasattr(self, "visit_all"):
self.visit_all(parsing_expression)
for node in parsing_expression.elements:
if isinstance(node, Concept):
self.visit(ConceptMatch(node.key or node.name))
elif isinstance(node, str):
self.visit(StrMatch(node))
else:
self.visit(node)