e69745adc8
Fixed #99 : SheerkaQueryManager: I can manage contains predicate when filtering objects Fixed #97 : ERROR: list indices must be integers or slices, not Concept Fixed #96 : SequenceNodeParser: SequenceNodeParser must correctly handle concept definition Fixed #95 : ResolveAmbiguity must not remove concepts that do not require evaluation Fixed #94 : Concepts with the same key are lost when new ontology Fixed #93 : Introduce BuiltinConcepts.EVAL_GLOBAL_TRUTH_REQUESTED Fixed #92 : ExpressionParser: Implement compile_disjunctions() Fixed #91 : Implement get_concepts_complexity(context, concepts, concept_parts) Fixed #90 : ResolveAmbiguity : where predicate is not used to resolve ambiguity Fixed #89 : ResolveAmbiguityEvaluator: Concepts embedded in ConceptNode are not resolved Fixed #88: SyaNodeParser: Parse multiple parameters when some of the are not recognized Fixed #87: SyaNodeParser : Parse the multiple parameters
496 lines
15 KiB
Python
496 lines
15 KiB
Python
from dataclasses import dataclass
|
|
|
|
import core.utils
|
|
from core.tokenizer import TokenKind, Token
|
|
from core.var_ref import VariableRef
|
|
from parsers.BaseParser import Node, ParsingError, BaseParserInputParser
|
|
|
|
DEBUG_COMPILED = True
|
|
|
|
|
|
@dataclass()
|
|
class LexerNode(Node):
|
|
start: int # starting index in the tokens list
|
|
end: int # ending index in the tokens list
|
|
tokens: list = None # tokens
|
|
source: str = None # string representation of what was parsed
|
|
|
|
def __post_init__(self):
|
|
if self.source is None:
|
|
self.source = core.utils.get_text_from_tokens(self.tokens)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, LexerNode):
|
|
return False
|
|
|
|
return self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source and \
|
|
self.tokens == other.tokens
|
|
|
|
def fix_source(self, force=True):
|
|
if force or self.source is None:
|
|
self.source = core.utils.get_text_from_tokens(self.tokens)
|
|
return self
|
|
|
|
def clone(self):
|
|
pass
|
|
|
|
def to_short_str(self):
|
|
raise NotImplementedError
|
|
|
|
def get_source_to_parse(self):
|
|
return self.source
|
|
|
|
|
|
class UnrecognizedTokensNode(LexerNode):
|
|
def __init__(self, start, end, tokens):
|
|
super().__init__(start, end, tokens)
|
|
self.is_frozen = False # TODO: Remove as it seems to now be useless
|
|
self.parenthesis_count = 0
|
|
|
|
def freeze(self):
|
|
# TODO: Remove as it seems to now be useless
|
|
self.is_frozen = True
|
|
|
|
def reset(self):
|
|
self.start = self.end = -1
|
|
self.tokens.clear()
|
|
self.is_frozen = False
|
|
self.parenthesis_count = 0
|
|
self.source = ""
|
|
|
|
def add_token(self, token, pos):
|
|
if self.is_frozen:
|
|
raise Exception("The node is frozen")
|
|
|
|
if self.end != -1 and pos == self.end + 2:
|
|
# add the missing whitespace
|
|
p = self.tokens[-1] # previous token
|
|
self.tokens.append(Token(TokenKind.WHITESPACE, " ", p.index + 1, p.line, p.column + 1))
|
|
|
|
self.tokens.append(token)
|
|
self.end = pos
|
|
if self.start == -1:
|
|
self.start = pos
|
|
|
|
if token.type == TokenKind.LPAR:
|
|
self.parenthesis_count += 1
|
|
|
|
if token.type == TokenKind.RPAR:
|
|
self.parenthesis_count -= 1
|
|
|
|
return self
|
|
|
|
def pop(self, token_kind):
|
|
if self.is_frozen:
|
|
raise Exception("The node is frozen")
|
|
|
|
if len(self.tokens) > 0 and self.tokens[-1].type == token_kind:
|
|
self.tokens.pop()
|
|
if len(self.tokens) == 0:
|
|
self.reset()
|
|
else:
|
|
self.end -= 1
|
|
|
|
def has_open_paren(self):
|
|
return self.parenthesis_count > 0
|
|
|
|
def not_whitespace(self):
|
|
return not self.is_whitespace()
|
|
|
|
def is_whitespace(self):
|
|
for t in self.tokens:
|
|
if t.type not in (TokenKind.WHITESPACE, TokenKind.NEWLINE):
|
|
return False
|
|
return True
|
|
|
|
def is_empty(self):
|
|
return len(self.tokens) == 0
|
|
|
|
def last_token_type(self):
|
|
if len(self.tokens) == 0:
|
|
return None
|
|
return self.tokens[-1].type
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, UnrecognizedTokensNode):
|
|
return False
|
|
|
|
return self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source
|
|
|
|
def __hash__(self):
|
|
return hash((self.start, self.end, self.source))
|
|
|
|
def __repr__(self):
|
|
return f"UnrecognizedTokensNode(source='{self.source}', start={self.start}, end={self.end})"
|
|
|
|
def clone(self):
|
|
clone = UnrecognizedTokensNode(self.start, self.end, self.tokens[:])
|
|
clone.is_frozen = self.is_frozen
|
|
clone.parenthesis_count = self.parenthesis_count
|
|
return clone
|
|
|
|
def to_short_str(self):
|
|
return f"UTN('{self.source}')"
|
|
|
|
|
|
class RuleNode(LexerNode):
|
|
def __init__(self, rule, start, end, tokens=None, source=None):
|
|
super().__init__(start, end, tokens, source)
|
|
self.rule = rule
|
|
self.fix_source(False)
|
|
|
|
def __eq__(self, other):
|
|
if id(self) == id(other):
|
|
return True
|
|
|
|
if not isinstance(other, RuleNode):
|
|
return False
|
|
|
|
return self.rule == other.rule and \
|
|
self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source
|
|
|
|
def __hash__(self):
|
|
return hash((self.rule, self.start, self.end, self.source))
|
|
|
|
def __repr__(self):
|
|
return f"RuleNode(rule='{self.rule}', source='{self.source}', start={self.start}, end={self.end})"
|
|
|
|
def clone(self):
|
|
return RuleNode(self.rule, self.start, self.end, self.tokens, self.source)
|
|
|
|
def to_short_str(self):
|
|
return f'RN({self.rule})'
|
|
|
|
|
|
class ConceptNode(LexerNode):
|
|
"""
|
|
Returned by the BnfNodeParser
|
|
It represents a recognized concept
|
|
"""
|
|
|
|
def __init__(self, concept, start, end, tokens=None, source=None, underlying=None):
|
|
super().__init__(start, end, tokens, source)
|
|
self.concept = concept
|
|
self.underlying = underlying
|
|
self.fix_source(False)
|
|
|
|
def __eq__(self, other):
|
|
if id(self) == id(other):
|
|
return True
|
|
|
|
if not isinstance(other, ConceptNode):
|
|
return False
|
|
|
|
return self.concept == other.concept and \
|
|
self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source and \
|
|
self.underlying == other.underlying
|
|
|
|
def __hash__(self):
|
|
return hash((self.concept, self.start, self.end, self.source, self.underlying))
|
|
|
|
def __repr__(self):
|
|
text = f"ConceptNode(concept='{self.concept}', source='{self.source}', start={self.start}, end={self.end}"
|
|
if DEBUG_COMPILED:
|
|
for k, v in self.concept.get_compiled().items():
|
|
text += f", {k}='{v}'"
|
|
return text + ")"
|
|
|
|
def clone(self):
|
|
# do we need to clone the concept as well ?
|
|
return ConceptNode(self.concept, self.start, self.end, self.tokens, self.source, self.underlying)
|
|
|
|
def as_bag(self):
|
|
"""
|
|
Creates a dictionary with the useful properties of the ConceptNode
|
|
see Concept.as_bag() for extra information
|
|
"""
|
|
bag = {}
|
|
for k, v in self.__dict__.items():
|
|
bag[k] = v
|
|
|
|
# if isinstance(self.concept, Concept):
|
|
# bag["compiled"] = self.concept.get_compiled()
|
|
return bag
|
|
|
|
def to_short_str(self):
|
|
return f'CN({self.concept})'
|
|
|
|
def get_concept(self):
|
|
"""
|
|
Used when there is a mix of Concept and ConceptNode
|
|
To quickly get the inner concept
|
|
:return:
|
|
"""
|
|
return self.concept
|
|
|
|
|
|
class SourceCodeNode(LexerNode):
|
|
"""
|
|
Returned when some source code (like Python source code is recognized)
|
|
"""
|
|
|
|
def __init__(self, start, end, tokens=None, source=None,
|
|
python_node=None, return_value=None, error_when_parsing=None):
|
|
"""
|
|
|
|
:param start: start position (index of the first token)
|
|
:param end: end position (index of the last token)
|
|
:param tokens:
|
|
:param source: tokens as string
|
|
:param python_node: PythonNode found (when the SourceCodeNode is validated)
|
|
:param return_value: ReturnValueConcept returned when the source was validated
|
|
|
|
When return_value is provided,
|
|
You should have return_value.body.body == node
|
|
"""
|
|
super().__init__(start, end, tokens, source)
|
|
|
|
self.python_node = python_node # The PythonNode (or whatever language node) that is found
|
|
self.return_value = return_value # original result of the parsing
|
|
self.error_when_parsing = error_when_parsing # if python_node is still None after parsing, it explains why
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, SourceCodeNode):
|
|
return False
|
|
|
|
return self.python_node == other.python_node and \
|
|
self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source
|
|
|
|
def __hash__(self):
|
|
return hash((self.start, self.end, self.source))
|
|
|
|
def __repr__(self):
|
|
return f"SourceCodeNode(start={self.start}, end={self.end}, source='{self.source}')"
|
|
|
|
def to_short_str(self):
|
|
return f"SCN('{self.source}')"
|
|
|
|
def get_python_node(self):
|
|
return self.python_node
|
|
|
|
def get_source_to_parse(self):
|
|
return self.python_node.source
|
|
|
|
def clone(self):
|
|
clone = SourceCodeNode(
|
|
self.start,
|
|
self.end,
|
|
self.tokens,
|
|
self.source,
|
|
self.python_node,
|
|
self.return_value)
|
|
return clone
|
|
|
|
|
|
class SourceCodeWithConceptNode(LexerNode):
|
|
"""
|
|
Kind of temporary version for SourceCodeNode
|
|
I know that there is some code,
|
|
I know that there are some concepts
|
|
I just don't want to make the glue yet
|
|
|
|
So I push all the nodes into one big bag
|
|
"""
|
|
|
|
def __init__(self, first_node, last_node, content_nodes=None, has_unrecognized=False):
|
|
super().__init__(9999, -1, None) # why not sys.maxint ?
|
|
self.first = first_node
|
|
self.last = last_node
|
|
self.nodes = content_nodes or []
|
|
self.has_unrecognized = has_unrecognized
|
|
self._all_nodes = None
|
|
self.fix_all_pos()
|
|
|
|
self.python_node = None # if the source code node is validated against a python parse, here is the PythonNode
|
|
self.return_value = None # return_value that produced the PythonNode
|
|
self.error_when_parsing = None # if python_node is still None after parsing, it explains why
|
|
|
|
def add_node(self, node):
|
|
self.nodes.append(node)
|
|
self.fix_pos(node)
|
|
self._all_nodes = None
|
|
|
|
return self
|
|
|
|
def __eq__(self, other):
|
|
if id(self) == id(other):
|
|
return True
|
|
|
|
if not isinstance(other, SourceCodeWithConceptNode):
|
|
return False
|
|
|
|
if self.start != other.start or self.end != other.end:
|
|
return False
|
|
|
|
if self.first != other.first:
|
|
return False
|
|
|
|
if self.last != other.last:
|
|
return False
|
|
|
|
if len(self.nodes) != len(other.nodes):
|
|
return False
|
|
|
|
for self_node, other_node in zip(self.nodes, other.nodes):
|
|
if self_node != other_node:
|
|
return False
|
|
|
|
# at last
|
|
return True
|
|
|
|
def __hash__(self):
|
|
return hash((self.first, self.last, len(self.nodes)))
|
|
|
|
def __repr__(self):
|
|
return f"SourceCodeWithConceptNode(start={self.start}, end={self.end}, source='{self.source}')"
|
|
|
|
def fix_all_pos(self):
|
|
if self.first is None: # to ease some unit test where only the python_node is necessary
|
|
return
|
|
|
|
for n in [self.first, self.last] + self.nodes:
|
|
self.fix_pos(n)
|
|
|
|
def fix_pos(self, node):
|
|
if hasattr(node, "start") and node.start is not None:
|
|
if node.start < self.start:
|
|
self.start = node.start
|
|
|
|
if hasattr(node, "end") and node.end is not None:
|
|
if node.end > self.end:
|
|
self.end = node.end
|
|
return self
|
|
|
|
def pseudo_fix_source(self):
|
|
"""
|
|
pseudo because the code is not that clean !
|
|
:return:
|
|
"""
|
|
self.source = self.first.source
|
|
for n in self.nodes:
|
|
self.source += " "
|
|
if hasattr(n, "source"):
|
|
self.source += n.source
|
|
elif hasattr(n, "concept"):
|
|
self.source += str(n.concept)
|
|
else:
|
|
self.source += " unknown"
|
|
self.source += self.last.source
|
|
return self
|
|
|
|
def get_all_nodes(self):
|
|
if self._all_nodes:
|
|
return self._all_nodes
|
|
|
|
self._all_nodes = [self.first, *self.nodes, self.last]
|
|
return self._all_nodes
|
|
|
|
def clone(self):
|
|
nodes = [n.clone() for n in self.nodes]
|
|
clone = SourceCodeWithConceptNode(self.first.clone(), self.last.clone(), nodes, self.has_unrecognized)
|
|
clone.python_node = self.python_node
|
|
clone.return_value = self.return_value
|
|
return clone
|
|
|
|
def to_short_str(self):
|
|
return f"SCWC({self.first}" + ", ".join(n.to_short_str for n in self.nodes) + f"{self.last})"
|
|
|
|
def get_python_node(self):
|
|
return self.python_node
|
|
|
|
def get_source_to_parse(self):
|
|
return self.python_node.source
|
|
|
|
|
|
class VariableNode(LexerNode):
|
|
"""
|
|
When trying to parser source code, a reference to a variable is recognized
|
|
Not sure yet if it has to be a lexer node
|
|
"""
|
|
|
|
def __init__(self, obj, prop, start, end, tokens=None, source=None):
|
|
super().__init__(start, end, tokens, source)
|
|
self.var_ref = VariableRef(obj, prop)
|
|
|
|
def __eq__(self, other):
|
|
if id(self) == id(other):
|
|
return True
|
|
|
|
if not isinstance(other, VariableNode):
|
|
return False
|
|
|
|
return self.var_ref == other.var_ref and \
|
|
self.start == other.start and \
|
|
self.end == other.end and \
|
|
self.source == other.source
|
|
|
|
def __hash__(self):
|
|
return hash((self.var_ref.obj, self.var_ref.prop, self.start, self.end, self.source))
|
|
|
|
def __repr__(self):
|
|
ret = f"VariableNode(obj={self.var_ref.obj}, prop={self.var_ref.prop}, "
|
|
ret += f"start={self.start}, end={self.end}, source='{self.source}')"
|
|
return ret
|
|
|
|
def to_short_str(self):
|
|
return f"VN({self.var_ref.obj})" if self.var_ref.prop is None else f"VN({self.var_ref.obj}.{self.var_ref.prop})"
|
|
|
|
def clone(self):
|
|
clone = VariableNode(self.var_ref.obj, self.var_ref.prop, self.start, self.end, self.tokens, self.source)
|
|
return clone
|
|
|
|
|
|
@dataclass()
|
|
class GrammarErrorNode(ParsingError):
|
|
message: str
|
|
|
|
|
|
@dataclass()
|
|
class NoMatchingTokenError(ParsingError):
|
|
pos: int
|
|
|
|
|
|
class BaseNodeParser(BaseParserInputParser):
|
|
"""
|
|
Parser that return LexerNode
|
|
"""
|
|
|
|
def __init__(self, name, priority, **kwargs):
|
|
super().__init__(name, priority, yield_eof=True)
|
|
|
|
def init_from_concepts(self, context, concepts, **kwargs):
|
|
"""
|
|
Initialize the parser with a list of concepts
|
|
For unit tests convenience
|
|
:param context
|
|
:param concepts
|
|
:return:
|
|
"""
|
|
from core.sheerka.services.SheerkaConceptManager import SheerkaConceptManager
|
|
service = context.sheerka.services[SheerkaConceptManager.NAME]
|
|
by_token, by_regex = SheerkaConceptManager.compute_concepts_by_first_item(context, concepts).body
|
|
|
|
context.sheerka.om.put(SheerkaConceptManager.CONCEPTS_BY_REGEX_ENTRY,
|
|
False,
|
|
{k.serialize(): v for k, v in by_regex.items()})
|
|
compiled = service.compile_concepts_by_first_regex(context, by_regex).body
|
|
service.compiled_concepts_by_regex.clear()
|
|
service.compiled_concepts_by_regex.extend(compiled)
|
|
|
|
resolved = SheerkaConceptManager.resolve_concepts_by_first_keyword(context, by_token).body
|
|
context.sheerka.om.put(SheerkaConceptManager.RESOLVED_CONCEPTS_BY_FIRST_KEYWORD_ENTRY,
|
|
False,
|
|
resolved)
|
|
|
|
return self
|