First steps of ConceptLexer. Need to update DefaultParser before continuing
This commit is contained in:
@@ -51,6 +51,10 @@ class BaseParser:
|
||||
if tokens is None:
|
||||
return ""
|
||||
res = ""
|
||||
|
||||
if not hasattr(tokens, "__iter__"):
|
||||
tokens = [tokens]
|
||||
|
||||
for token in tokens:
|
||||
value = Keywords(token.value).value if token.type == TokenKind.KEYWORD else token.value
|
||||
res += value
|
||||
|
||||
@@ -0,0 +1,495 @@
|
||||
#####################################################################################################
|
||||
# This part of code is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio)
|
||||
# I don't directly use the project, but it helped me figure out
|
||||
# what to do.
|
||||
# Dejanović I., Milosavljević G., Vaderna R.:
|
||||
# Arpeggio: A flexible PEG parser for Python,
|
||||
# Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004
|
||||
#####################################################################################################
|
||||
from dataclasses import field, dataclass
|
||||
from collections import defaultdict
|
||||
from core.builtin_concepts import BuiltinConcepts
|
||||
from core.concept import Concept
|
||||
from core.tokenizer import TokenKind, Tokenizer, Token
|
||||
from parsers.BaseParser import BaseParser, Node, ErrorNode
|
||||
import core.utils
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def flatten(iterable):
|
||||
if iterable is None:
|
||||
return []
|
||||
|
||||
result = []
|
||||
for e in iterable:
|
||||
if e.parsing_expression.rule_name is not None and e.parsing_expression.rule_name != "":
|
||||
if hasattr(e, "children"):
|
||||
e.children = flatten(e.children)
|
||||
result.append(e)
|
||||
elif hasattr(e, "children"):
|
||||
result.extend(flatten(e.children))
|
||||
else:
|
||||
result.append(e)
|
||||
return result
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LexerNode(Node):
|
||||
start: int
|
||||
end: int
|
||||
|
||||
|
||||
class ConceptNode(LexerNode):
|
||||
def __init__(self, concept, start, end, tokens=None, source=None, children=None):
|
||||
super().__init__(start, end)
|
||||
self.concept = concept
|
||||
self.tokens = tokens
|
||||
self.source = source
|
||||
self.children = children
|
||||
|
||||
if self.source is None:
|
||||
self.source = BaseParser.get_text_from_tokens(self.tokens)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not super().__eq__(other):
|
||||
return False
|
||||
|
||||
if not isinstance(other, ConceptNode):
|
||||
return False
|
||||
|
||||
return self.concept == other.concept and \
|
||||
self.source == other.source
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.concept, self.start, self.end, self.source))
|
||||
|
||||
|
||||
class NonTerminalNode(LexerNode):
|
||||
def __init__(self, parsing_expression, start, end, children=None):
|
||||
super().__init__(start, end)
|
||||
self.parsing_expression = parsing_expression
|
||||
self.children = children
|
||||
|
||||
def __repr__(self):
|
||||
name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__
|
||||
if len(self.children) > 0:
|
||||
sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")"
|
||||
else:
|
||||
sub_names = ""
|
||||
return name + sub_names
|
||||
|
||||
|
||||
class TerminalNode(LexerNode):
|
||||
def __init__(self, parsing_expression, start, end, value):
|
||||
super().__init__(start, end)
|
||||
self.parsing_expression = parsing_expression
|
||||
self.value = value
|
||||
|
||||
def __repr__(self):
|
||||
name = self.parsing_expression.rule_name or ""
|
||||
return name + f"'{self.value}'"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class GrammarErrorNode(ErrorNode):
|
||||
message: str
|
||||
|
||||
|
||||
class ParsingExpression:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.elements = args
|
||||
|
||||
nodes = kwargs.get('nodes', [])
|
||||
if not hasattr(nodes, '__iter__'):
|
||||
nodes = [nodes]
|
||||
self.nodes = nodes
|
||||
|
||||
self.rule_name = kwargs.get('rule_name', '')
|
||||
|
||||
def parse(self, parser):
|
||||
return self._parse(parser)
|
||||
|
||||
|
||||
class Sequence(ParsingExpression):
|
||||
"""
|
||||
Will match sequence of parser expressions in exact order they are defined.
|
||||
"""
|
||||
|
||||
def _parse(self, parser):
|
||||
init_pos = parser.pos
|
||||
end_pos = parser.pos
|
||||
|
||||
children = []
|
||||
for e in self.nodes:
|
||||
node = e.parse(parser)
|
||||
if node is None:
|
||||
return None
|
||||
else:
|
||||
if node.end != -1: # because Optional returns -1 when no match
|
||||
children.append(node)
|
||||
end_pos = node.end
|
||||
|
||||
return NonTerminalNode(self, init_pos, end_pos, children)
|
||||
|
||||
|
||||
class OrderedChoice(ParsingExpression):
|
||||
"""
|
||||
Will match one among multiple
|
||||
It will stop at the first match (so the order of definition is important)
|
||||
"""
|
||||
|
||||
def _parse(self, parser):
|
||||
init_pos = parser.pos
|
||||
|
||||
for e in self.nodes:
|
||||
node = e.parse(parser)
|
||||
if node:
|
||||
return NonTerminalNode(self, init_pos, node.end, [node])
|
||||
|
||||
parser.seek(init_pos) # backtrack
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class Optional(ParsingExpression):
|
||||
"""
|
||||
Will match or not the elements
|
||||
if many matches, will choose longest one
|
||||
If you need order, use Optional(OrderedChoice)
|
||||
"""
|
||||
|
||||
def _parse(self, parser):
|
||||
init_pos = parser.pos
|
||||
selected_node = NonTerminalNode(self, parser.pos, -1, [])
|
||||
|
||||
for e in self.nodes:
|
||||
node = e.parse(parser)
|
||||
if node:
|
||||
if node.end > selected_node.end:
|
||||
selected_node = node
|
||||
|
||||
parser.seek(init_pos) # backtrack
|
||||
|
||||
if selected_node.end != -1:
|
||||
parser.seek(selected_node.end)
|
||||
parser.next_token() # eat the tokens found
|
||||
|
||||
return selected_node
|
||||
|
||||
|
||||
class Match(ParsingExpression):
|
||||
"""
|
||||
Base class for all classes that will try to match something from the input.
|
||||
"""
|
||||
|
||||
def __init__(self, rule_name, root=False):
|
||||
super(Match, self).__init__(rule_name=rule_name, root=root)
|
||||
|
||||
def parse(self, parser):
|
||||
result = self._parse(parser)
|
||||
return result
|
||||
|
||||
|
||||
class StrMatch(Match):
|
||||
"""
|
||||
Matches a literal
|
||||
"""
|
||||
|
||||
def __init__(self, to_match, rule_name="", root=False, ignore_case=None):
|
||||
super(Match, self).__init__(rule_name=rule_name, root=root)
|
||||
self.to_match = to_match
|
||||
self.ignore_case = ignore_case
|
||||
|
||||
def __repr__(self):
|
||||
return f"StrMatch('{self.to_match}')"
|
||||
|
||||
def _parse(self, parser):
|
||||
token = parser.get_token()
|
||||
m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \
|
||||
else token.value == self.to_match
|
||||
|
||||
if m:
|
||||
node = TerminalNode(self, parser.pos, parser.pos, token.value)
|
||||
parser.next_token()
|
||||
return node
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class CrossRef:
|
||||
"""
|
||||
During the creation of the model,
|
||||
Creates reference to a concept, as it may not be resolved yet
|
||||
"""
|
||||
|
||||
def __init__(self, concept):
|
||||
self.concept = concept
|
||||
|
||||
|
||||
class ConceptLexerParser(BaseParser):
|
||||
def __init__(self):
|
||||
super().__init__("ConceptLexer")
|
||||
self.concepts_dict = {}
|
||||
self.ignore_case = True
|
||||
|
||||
self.token = None
|
||||
self.pos = -1
|
||||
self.tokens = None
|
||||
|
||||
self.context = None
|
||||
self.text = None
|
||||
self.sheerka = None
|
||||
|
||||
def add_error(self, error, next_token=True):
|
||||
self.has_error = True
|
||||
self.error_sink.append(error)
|
||||
if next_token:
|
||||
self.next_token()
|
||||
return error
|
||||
|
||||
def reset_parser(self, context, text):
|
||||
self.context = context
|
||||
self.sheerka = context.sheerka
|
||||
self.text = text
|
||||
|
||||
if isinstance(text, str):
|
||||
self.tokens = list(Tokenizer(text))
|
||||
else:
|
||||
self.tokens = list(text)
|
||||
self.tokens.append(Token(TokenKind.EOF, "", -1, -1, -1)) # make sure to finish with end of file token
|
||||
|
||||
self.token = None
|
||||
self.pos = -1
|
||||
self.next_token()
|
||||
|
||||
def get_token(self) -> Token:
|
||||
return self.token
|
||||
|
||||
def next_token(self, skip_whitespace=True):
|
||||
if self.token and self.token.type == TokenKind.EOF:
|
||||
return False
|
||||
|
||||
self.pos += 1
|
||||
self.token = self.tokens[self.pos]
|
||||
|
||||
if skip_whitespace:
|
||||
while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE:
|
||||
self.pos += 1
|
||||
self.token = self.tokens[self.pos]
|
||||
|
||||
return self.token.type != TokenKind.EOF
|
||||
|
||||
def seek(self, pos):
|
||||
self.pos = pos
|
||||
self.token = self.tokens[self.pos]
|
||||
return True
|
||||
|
||||
def rewind(self, offset, skip_whitespace=True):
|
||||
self.pos += offset
|
||||
self.token = self.tokens[self.pos]
|
||||
|
||||
if skip_whitespace:
|
||||
while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE):
|
||||
self.pos -= 1
|
||||
self.token = self.tokens[self.pos]
|
||||
|
||||
def initialize(self, dict):
|
||||
"""
|
||||
Adds a bunch of concepts, and how they can be recognized
|
||||
:param dict: dictionary of concept; concept_definition
|
||||
:return:
|
||||
"""
|
||||
|
||||
nodes_to_resolve = []
|
||||
concepts_to_resolve = set()
|
||||
|
||||
# ## Gets the grammars
|
||||
for concept, concept_def in dict.items():
|
||||
concept.init_key() # make sure that the key is initialized
|
||||
grammar = self.get_model(concept, concept_def, nodes_to_resolve, concepts_to_resolve)
|
||||
self.concepts_dict[concept] = grammar
|
||||
|
||||
# ## Removes concepts with infinite recursions
|
||||
concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve)
|
||||
for concept in concepts_to_remove:
|
||||
concepts_to_resolve.remove(concept)
|
||||
del self.concepts_dict[concept]
|
||||
|
||||
# ## Resolves cross references and remove grammar with unresolved references
|
||||
self.resolve_cross_references(concepts_to_resolve, nodes_to_resolve)
|
||||
|
||||
def get_model(self, concept, concept_def, nodes_to_resolve, concepts_to_resolve):
|
||||
def inner_get_model(expression):
|
||||
if isinstance(expression, Concept):
|
||||
ret = CrossRef(expression)
|
||||
concepts_to_resolve.add(concept)
|
||||
nodes_to_resolve.append(ret)
|
||||
elif isinstance(expression, str):
|
||||
ret = StrMatch(expression, ignore_case=self.ignore_case)
|
||||
elif isinstance(expression, StrMatch):
|
||||
ret = expression
|
||||
if ret.ignore_case is None:
|
||||
ret.ignore_case = self.ignore_case
|
||||
elif isinstance(expression, Sequence) or \
|
||||
isinstance(expression, OrderedChoice) or \
|
||||
isinstance(expression, Optional):
|
||||
ret = expression
|
||||
ret.nodes.extend([inner_get_model(e) for e in ret.elements])
|
||||
if any((isinstance(x, CrossRef) for x in ret.nodes)):
|
||||
concepts_to_resolve.add(concept)
|
||||
nodes_to_resolve.append(ret)
|
||||
else:
|
||||
ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."))
|
||||
return ret
|
||||
|
||||
model = inner_get_model(concept_def)
|
||||
if isinstance(model, CrossRef):
|
||||
concepts_to_resolve.add(concept)
|
||||
|
||||
model.rule_name = concept.key
|
||||
return model
|
||||
|
||||
def detect_infinite_recursion(self, concepts_to_resolve):
|
||||
|
||||
# infinite recursion matcher
|
||||
def _is_infinite_recursion(ref_concept, node):
|
||||
if isinstance(node, CrossRef):
|
||||
if node.concept == ref_concept:
|
||||
return True
|
||||
return _is_infinite_recursion(ref_concept, self.concepts_dict[node.concept])
|
||||
|
||||
if isinstance(node, OrderedChoice):
|
||||
return _is_infinite_recursion(ref_concept, node.nodes[0])
|
||||
|
||||
if isinstance(node, Sequence):
|
||||
for node in node.nodes:
|
||||
if _is_infinite_recursion(ref_concept, node):
|
||||
return True
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
removed_concepts = []
|
||||
for e in concepts_to_resolve:
|
||||
to_resolve = self.concepts_dict[e]
|
||||
if _is_infinite_recursion(e, to_resolve):
|
||||
removed_concepts.append(e)
|
||||
return removed_concepts
|
||||
|
||||
# Cross-ref resolving
|
||||
def resolve_cross_references(self, concepts_to_resolve, nodes_to_resolve):
|
||||
|
||||
repeat = True
|
||||
while repeat:
|
||||
repeat = False
|
||||
for e in concepts_to_resolve:
|
||||
to_resolve = self.concepts_dict[e]
|
||||
if isinstance(to_resolve, CrossRef):
|
||||
repeat = True
|
||||
self.concepts_dict[e] = self.concepts_dict[to_resolve.concept]
|
||||
|
||||
for e in nodes_to_resolve:
|
||||
if not isinstance(e, ParsingExpression):
|
||||
continue # cases when a concept directly references another concept
|
||||
|
||||
for i, node in enumerate(e.nodes):
|
||||
if isinstance(node, CrossRef):
|
||||
if node.concept in self.concepts_dict:
|
||||
e.nodes[i] = self.concepts_dict[node.concept]
|
||||
|
||||
def parse(self, context, text):
|
||||
if text == "":
|
||||
return context.sheerka.ret(
|
||||
self.name,
|
||||
False,
|
||||
context.sheerka.new(BuiltinConcepts.IS_EMPTY)
|
||||
)
|
||||
|
||||
self.reset_parser(context, text)
|
||||
|
||||
concepts_found = [[]]
|
||||
# actually list of list
|
||||
# The first dimension is the number of possibilities found
|
||||
# The second dimension is the number of concepts found, under one possibility
|
||||
#
|
||||
# Example 1
|
||||
# concept foo : 'one' 'two'
|
||||
# concept bar : 'one' 'two'
|
||||
# input 'one two' -> will produce two possibilities (foo and bar).
|
||||
#
|
||||
# Example 2
|
||||
# concept foo : 'one'
|
||||
# concept bar : 'two'
|
||||
# input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar)
|
||||
|
||||
while True:
|
||||
init_pos = self.pos
|
||||
res = []
|
||||
for concept, grammar in self.concepts_dict.items():
|
||||
self.seek(init_pos)
|
||||
node = grammar.parse(self)
|
||||
if node is not None:
|
||||
concept_node = ConceptNode(concept, node.start, node.end, self.tokens[node.start: node.end + 1])
|
||||
if hasattr(node, "children"):
|
||||
concept_node.children = node.children
|
||||
res.append(concept_node)
|
||||
|
||||
if len(res) == 0: # not recognized
|
||||
self.seek(init_pos)
|
||||
not_recognized = self.get_text_from_tokens(self.get_token())
|
||||
self.add_error(self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=not_recognized))
|
||||
break
|
||||
|
||||
res = self.get_bests(res) # only keep the concept that eat the more tokens
|
||||
for r in res:
|
||||
r.children = flatten(r.children)
|
||||
concepts_found = core.utils.product(concepts_found, res)
|
||||
|
||||
# loop
|
||||
self.seek(res[0].end)
|
||||
if not self.next_token():
|
||||
break
|
||||
|
||||
# manage when nothing is recognized (or other error)
|
||||
if self.has_error:
|
||||
return self.sheerka.ret(
|
||||
self.name,
|
||||
False,
|
||||
self.sheerka.new(
|
||||
BuiltinConcepts.PARSER_RESULT,
|
||||
parser=self,
|
||||
source=text,
|
||||
body=self.error_sink,
|
||||
try_parsed=concepts_found[0] if len(concepts_found) == 1 else concepts_found))
|
||||
|
||||
# else
|
||||
# returns as many ReturnValue than choices found
|
||||
ret = []
|
||||
for choice in concepts_found:
|
||||
ret.append(
|
||||
self.sheerka.ret(
|
||||
self.name,
|
||||
True,
|
||||
self.sheerka.new(
|
||||
BuiltinConcepts.PARSER_RESULT,
|
||||
parser=self,
|
||||
source=text,
|
||||
body=choice,
|
||||
try_parsed=choice)))
|
||||
|
||||
return ret[0] if len(ret) == 1 else ret
|
||||
|
||||
@staticmethod
|
||||
def get_bests(results):
|
||||
"""
|
||||
Returns the result that is the longest
|
||||
:param results:
|
||||
:return:
|
||||
"""
|
||||
by_end_pos = defaultdict(list)
|
||||
for result in results:
|
||||
by_end_pos[result.end].append(result)
|
||||
|
||||
return by_end_pos[max(by_end_pos)]
|
||||
Reference in New Issue
Block a user