Files
Sheerka-Old/src/parsers/BaseParser.py
T

462 lines
13 KiB
Python

import logging
from dataclasses import dataclass
from typing import Union
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
from core.concept import Concept
from core.error import ErrorObj
from core.sheerka.ExecutionContext import ExecutionContext
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka_logger import get_logger
from core.tokenizer import TokenKind, Token, Tokenizer, LexerError
# # keep a cache for the parser input
# pi_cache = Cache(default=lambda key: ParserInput(key), max_size=20)
#
#
# def get_parser_input(text, tokens=None, length=None):
# """
# Returns new or existing parser input
# :param text:
# :param tokens:
# :param length:
# :return:
# """
# if tokens is None or pi_cache.has(text):
# return pi_cache.get(text)
# pi = ParserInput(text, tokens, length)
# pi_cache.put(text, pi)
# return pi
@dataclass()
class Node:
pass
@dataclass()
class NopNode(Node):
pass
def __repr__(self):
return "nop"
class NotInitializedNode(Node):
pass
def __repr__(self):
return "**N/A**"
@dataclass()
class ErrorNode(Node, ErrorObj):
pass
@dataclass()
class UnexpectedTokenErrorNode(ErrorNode):
message: str
token: Union[Token, str]
expected_tokens: list
def __eq__(self, other):
if id(other) == id(self):
return True
if not isinstance(other, UnexpectedTokenErrorNode):
return False
if self.message != other.message:
return False
to_compare = self.token.repr_value if isinstance(other.token, str) else self.token
if to_compare != other.token:
return False
return self.expected_tokens == other.expected_tokens
def __hash__(self):
return hash((self.message, self.token, self.expected_tokens))
@dataclass()
class UnexpectedEofNode(ErrorNode):
message: str
class BaseParser:
PREFIX = "parsers."
def __init__(self, name, priority: int, enabled=True, yield_eof=False):
# self.log = get_logger("parsers." + self.__class__.__name__)
# self.init_log = get_logger("init." + self.PREFIX + self.__class__.__name__)
# self.verbose_log = get_logger("verbose." + self.PREFIX + self.__class__.__name__)
self.name = BaseParser.get_name(name)
self.short_name = name
self.priority = priority
self.enabled = enabled
self.error_sink = []
self.context: ExecutionContext = None
self.sheerka = None
self.parser_input: ParserInput = None
self.yield_eof = yield_eof
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.name == other.name
def __hash__(self):
return hash(self.name)
def __repr__(self):
return self.name
def reset_parser(self, context, parser_input: ParserInput):
self.context = context
self.sheerka = context.sheerka
self.parser_input = parser_input
self.error_sink.clear()
try:
self.parser_input.reset(self.yield_eof)
except LexerError as e:
self.add_error(e, False)
return False
return True
def parse(self, context, parser_input):
pass
def add_error(self, error, next_token=True):
self.error_sink.append(error)
if next_token:
self.parser_input.next_token()
return error
@property
def has_error(self):
return len(self.error_sink) > 0
def log_result(self, context, source, ret):
pass
# if not self.log.isEnabledFor(logging.DEBUG):
# return
#
# if ret.status:
# value = context.return_value_to_str(ret)
# context.log(f"Recognized '{source}' as {value}", self.name)
# else:
# context.log(f"Failed to recognize '{source}'", self.name)
def log_multiple_results(self, context, source, list_of_ret):
pass
# if not self.log.isEnabledFor(logging.DEBUG):
# return
#
# context.log(f"Recognized '{source}' as multiple concepts", self.name)
# for r in list_of_ret:
# value = context.return_value_to_str(r)
# context.log(f" Recognized '{value}'", self.name)
def get_return_value_body(self, sheerka, source, parsed, try_parse):
"""
All parsers must return their result in a standard way
:param sheerka:
:param source:
:param parsed:
:param try_parse:
:return:
"""
if len(self.error_sink) == 1 and isinstance(self.error_sink[0], Concept):
return self.error_sink[0]
if self.has_error:
if parsed is None:
return sheerka.new(BuiltinConcepts.NOT_FOR_ME,
body=source,
reason=self.error_sink)
else:
return sheerka.new(BuiltinConcepts.ERROR,
body=self.error_sink)
return sheerka.new(BuiltinConcepts.PARSER_RESULT,
parser=self,
source=source,
body=parsed,
try_parsed=try_parse)
@staticmethod
def get_input_as_lexer_nodes(parser_input, expected_parser=None):
"""
Extract the lexer node from the parser_input
:param parser_input:
:param expected_parser: returns the nodes if the parent parser is the expected one
:return:
"""
if not isinstance(parser_input, ParserResultConcept):
return None
if expected_parser and parser_input.parser != expected_parser:
return None
if len(parser_input.value) == 0:
return None
for node in parser_input.value:
from parsers.BaseNodeParser import LexerNode
if not isinstance(node, LexerNode):
return None
return parser_input.value
@staticmethod
def manage_eof(lst, strip_eof):
if strip_eof:
if len(lst) and lst[-1].type == TokenKind.EOF:
lst.pop()
return lst
if len(lst) == 0 or not lst[-1].type == TokenKind.EOF:
lst.append(Token(TokenKind.EOF, "", -1, -1, -1))
return lst
# @staticmethod
# def get_text_from_tokens(tokens, custom_switcher=None, tracker=None):
# """
# Create the source code, from the list of token
# :param tokens: list of tokens
# :param custom_switcher: to override the behaviour (the return value) of some token
# :param tracker: keep track of the original token value when custom switched
# :return:
# """
# if tokens is None:
# return ""
# res = ""
#
# if not hasattr(tokens, "__iter__"):
# tokens = [tokens]
#
# switcher = {
# # TokenKind.CONCEPT: lambda t: core.utils.str_concept(t.value),
# }
#
# if custom_switcher:
# switcher.update(custom_switcher)
#
# for token in tokens:
# value = switcher.get(token.type, lambda t: t.str_value)(token)
# res += value
# if tracker is not None and token.type in custom_switcher:
# tracker[value] = token.value
# return res
@staticmethod
def get_tokens_boundaries(tokens):
"""
Returns the first and the last valid index of the tokens
a valid index is a token that is not a whitespace nor and EOF
:param tokens:
:return:
"""
if tokens is None:
return None
if len(tokens) == 0:
return 0, 0
if tokens[0].type == TokenKind.EOF:
return 0, 0
start = 1 if tokens[0].type == TokenKind.WHITESPACE else 0
end = len(tokens) - 1
while tokens[end].type in (TokenKind.WHITESPACE, TokenKind.EOF):
end -= 1
return start, end
@staticmethod
def merge_concepts(list_a, b):
if not b:
return list_a
list_b = b if isinstance(b, list) else [b]
if not list_a:
return list_b
by_ids = {c.id for c in list_b}
for c in list_b:
if c.id in by_ids: # and c.metadata.is_evaluated == by_ids[c.id].metadata.is_evaluated:
continue
list_a.append(c)
by_ids.add(c.id)
return list_a
@staticmethod
def get_name(name):
return BaseParser.PREFIX + name
class BaseTokenizerIterParser(BaseParser):
def __init__(self, name, priority, parse_word=False, none_on_eof=True):
super().__init__(name, priority)
self.lexer_iter = None
self._current = None
self.context: ExecutionContext = None
self.text = None
self.sheerka = None
self.parse_word = parse_word
self.none_on_eof = none_on_eof
def reset_parser(self, context, text):
self.context = context
self.sheerka = context.sheerka
self.text = text
self.lexer_iter = iter(Tokenizer(text, self.parse_word))
self._current = None
self.next_token()
def add_error(self, error, next_token=True):
self.error_sink.append(error)
if next_token:
self.next_token()
return error
def get_token(self) -> Token:
return self._current
def next_token(self, skip_whitespace=True):
try:
self._current = next(self.lexer_iter)
if self.none_on_eof and self._current.type == TokenKind.EOF:
self._current = None
return False
if skip_whitespace:
while self._current.type == TokenKind.WHITESPACE or self._current.type == TokenKind.NEWLINE:
self._current = next(self.lexer_iter)
except StopIteration:
self._current = None
return False
return True
class BaseSplitIterParser(BaseParser):
def __init__(self, name, priority, none_on_eof=False):
super().__init__(name, priority)
self._current = None
self.context: ExecutionContext = None
self.text = None
self.sheerka = None
self.iter_split = None
self.split_and_eat_tokens = (" ", "\n", "\t")
self.split_and_keep_tokens = ("=", ")", "(", ",")
self.split_tokens = self.split_and_eat_tokens + self.split_and_keep_tokens
self.none_on_eof = none_on_eof # current token is set to None when EOF is hit
def parse_word(self, c, index, line, column):
end = self.split_tokens
escaped = False
buffer = ""
while escaped or c not in end:
if not escaped and c == "\\":
escaped = True
elif not escaped and c in ("'", '"'):
end = [c]
else:
buffer += c
escaped = False
index, column = index + 1, column + 1
if index == len(self.text):
break
c = self.text[index]
if c == "\n":
line += 1
column = 0
if c not in self.split_and_keep_tokens: # 'not in' instead of 'in' to when c is a quote
index, column = index + 1, column + 1
return buffer, index, line, column
def split(self):
index = 0
line = 1
column = 1
while index < len(self.text):
c = self.text[index]
if c == "=":
if index + 1 < len(self.text) and self.text[index + 1] == "=":
yield Token(TokenKind.EQUALSEQUALS, "==", index, line, column)
index, column = index + 2, column + 2
else:
yield Token(TokenKind.EQUALS, "=", index, line, column)
index, column = index + 1, column + 1
elif c == ")":
yield Token(TokenKind.RPAR, ")", index, line, column)
index, column = index + 1, column + 1
elif c == "(":
yield Token(TokenKind.LPAR, "(", index, line, column)
index, column = index + 1, column + 1
elif c == ",":
yield Token(TokenKind.COMMA, ",", index, line, column)
index, column = index + 1, column + 1
else:
buffer, end_index, end_line, end_column = self.parse_word(c, index, line, column)
if buffer:
yield Token(TokenKind.WORD, buffer, index, line, column)
index, line, column = end_index, end_line, end_column
yield Token(TokenKind.EOF, "<eof>", index, line, column)
def reset_parser(self, context, text):
self.context = context
self.sheerka = context.sheerka if context else None
self.text = text
self._current = None
self.iter_split = iter(self.split())
def add_error(self, error, next_token=True):
self.error_sink.append(error)
if next_token:
self.next_token()
return error
def get_token(self) -> Token:
return self._current
def next_token(self):
try:
self._current = next(self.iter_split)
if self._current.type == TokenKind.EOF:
if self.none_on_eof:
self._current = None
return False
except StopIteration:
self._current = None
return False
return True