414 lines
12 KiB
Python
414 lines
12 KiB
Python
import logging
|
|
from dataclasses import dataclass
|
|
|
|
import core.utils
|
|
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
|
|
from core.concept import Concept
|
|
from core.sheerka.ExecutionContext import ExecutionContext
|
|
from core.sheerka_logger import get_logger
|
|
from core.tokenizer import TokenKind, Keywords, Token, Tokenizer
|
|
|
|
|
|
@dataclass()
|
|
class Node:
|
|
pass
|
|
|
|
|
|
@dataclass()
|
|
class NopNode(Node):
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return "nop"
|
|
|
|
|
|
class NotInitializedNode(Node):
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return "**N/A**"
|
|
|
|
|
|
@dataclass()
|
|
class ErrorNode(Node):
|
|
pass
|
|
|
|
|
|
@dataclass()
|
|
class UnexpectedTokenErrorNode(ErrorNode):
|
|
message: str
|
|
token: Token
|
|
expected_tokens: list
|
|
|
|
def __eq__(self, other):
|
|
if id(other) == id(self):
|
|
return True
|
|
|
|
if not isinstance(other, UnexpectedTokenErrorNode):
|
|
return False
|
|
|
|
if self.message != other.message:
|
|
return False
|
|
|
|
if self.token.type != other.token.type or self.token.value != other.token.value:
|
|
return False
|
|
|
|
if len(self.expected_tokens) != len(other.expected_tokens):
|
|
return False
|
|
|
|
for i, t in enumerate(self.expected_tokens):
|
|
if t != other.expected_tokens[i]:
|
|
return False
|
|
|
|
return True
|
|
|
|
def __hash__(self):
|
|
return hash((self.message, self.token, self.expected_tokens))
|
|
|
|
|
|
@dataclass()
|
|
class UnexpectedEof(ErrorNode):
|
|
message: str
|
|
|
|
|
|
class BaseParser:
|
|
PREFIX = "parsers."
|
|
|
|
def __init__(self, name, priority: int, enabled=True):
|
|
self.log = get_logger("parsers." + self.__class__.__name__)
|
|
self.init_log = get_logger("init." + self.PREFIX + self.__class__.__name__)
|
|
self.verbose_log = get_logger("verbose." + self.PREFIX + self.__class__.__name__)
|
|
|
|
self.name = self.PREFIX + name
|
|
self.priority = priority
|
|
self.enabled = enabled
|
|
|
|
self.error_sink = []
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
return self.name == other.name
|
|
|
|
def __hash__(self):
|
|
return hash(self.name)
|
|
|
|
def __repr__(self):
|
|
return self.name
|
|
|
|
def parse(self, context, parser_input):
|
|
pass
|
|
|
|
@property
|
|
def has_error(self):
|
|
return len(self.error_sink) > 0
|
|
|
|
def log_result(self, context, source, ret):
|
|
if not self.log.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
if ret.status:
|
|
value = context.return_value_to_str(ret)
|
|
context.log(f"Recognized '{source}' as {value}", self.name)
|
|
else:
|
|
context.log(f"Failed to recognize '{source}'", self.name)
|
|
|
|
def log_multiple_results(self, context, source, list_of_ret):
|
|
if not self.log.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
context.log(f"Recognized '{source}' as multiple concepts", self.name)
|
|
for r in list_of_ret:
|
|
value = context.return_value_to_str(r)
|
|
context.log(f" Recognized '{value}'", self.name)
|
|
|
|
def get_return_value_body(self, sheerka, source, tree, try_parse):
|
|
"""
|
|
All parsers must return their result in a standard way
|
|
:param sheerka:
|
|
:param source:
|
|
:param tree:
|
|
:param try_parse:
|
|
:return:
|
|
"""
|
|
if len(self.error_sink) == 1 and isinstance(self.error_sink[0], Concept):
|
|
return self.error_sink[0]
|
|
|
|
return sheerka.new(
|
|
BuiltinConcepts.PARSER_RESULT,
|
|
parser=self,
|
|
source=source,
|
|
body=self.error_sink if self.has_error else tree,
|
|
try_parsed=try_parse)
|
|
|
|
def get_input_as_text(self, parser_input, custom_switcher=None, tracker=None):
|
|
"""
|
|
Recreate back the source code from parser_input
|
|
:param parser_input: list of Tokens
|
|
:param custom_switcher: map of [TokenKind, overridden values]
|
|
:param tracker: keep track of the value overridden by custom_switcher
|
|
:return:
|
|
"""
|
|
if isinstance(parser_input, list):
|
|
return self.get_text_from_tokens(parser_input, custom_switcher, tracker)
|
|
|
|
if isinstance(parser_input, ParserResultConcept):
|
|
parser_input = parser_input.source
|
|
|
|
if "c:" in parser_input:
|
|
return self.get_text_from_tokens(list(Tokenizer(parser_input)), custom_switcher, tracker)
|
|
|
|
return parser_input
|
|
|
|
def get_input_as_tokens(self, parser_input, strip_eof=False):
|
|
if isinstance(parser_input, list):
|
|
return self.manage_eof(parser_input, strip_eof)
|
|
|
|
if isinstance(parser_input, ParserResultConcept):
|
|
if parser_input.tokens:
|
|
return self.manage_eof(parser_input.tokens, strip_eof)
|
|
else:
|
|
return Tokenizer(parser_input.source)
|
|
|
|
return Tokenizer(parser_input, yield_eof=not strip_eof)
|
|
|
|
def get_input_as_lexer_nodes(self, parser_input, expected_parser=None):
|
|
if not isinstance(parser_input, ParserResultConcept):
|
|
return None
|
|
|
|
if expected_parser and parser_input.parser != expected_parser:
|
|
return None
|
|
|
|
if len(parser_input.value) == 0:
|
|
return None
|
|
|
|
for node in parser_input.value:
|
|
from parsers.BaseNodeParser import LexerNode
|
|
if not isinstance(node, LexerNode):
|
|
return None
|
|
|
|
return parser_input.value
|
|
|
|
@staticmethod
|
|
def manage_eof(lst, strip_eof):
|
|
if strip_eof:
|
|
if len(lst) and lst[-1].type == TokenKind.EOF:
|
|
lst.pop()
|
|
return lst
|
|
|
|
if len(lst) == 0 or not lst[-1].type == TokenKind.EOF:
|
|
lst.append(Token(TokenKind.EOF, "", -1, -1, -1))
|
|
return lst
|
|
|
|
@staticmethod
|
|
def get_text_from_tokens(tokens, custom_switcher=None, tracker=None):
|
|
"""
|
|
Create the source code, from the list of token
|
|
:param tokens: list of tokens
|
|
:param custom_switcher: to override the behaviour (the return value) of some token
|
|
:param tracker: keep track of the original token value when custom switched
|
|
:return:
|
|
"""
|
|
if tokens is None:
|
|
return ""
|
|
res = ""
|
|
|
|
if not hasattr(tokens, "__iter__"):
|
|
tokens = [tokens]
|
|
|
|
switcher = {
|
|
TokenKind.KEYWORD: lambda t: Keywords(t.value).value,
|
|
TokenKind.CONCEPT: lambda t: core.utils.str_concept(t.value),
|
|
}
|
|
|
|
if custom_switcher:
|
|
switcher.update(custom_switcher)
|
|
|
|
for token in tokens:
|
|
value = switcher.get(token.type, lambda t: t.value)(token)
|
|
res += value
|
|
if tracker is not None and token.type in custom_switcher:
|
|
tracker[value] = token.value
|
|
return res
|
|
|
|
@staticmethod
|
|
def get_tokens_boundaries(tokens):
|
|
"""
|
|
Returns the first and the last valid index of the tokens
|
|
a valid index is a token that is not a whitespace nor and EOF
|
|
:param tokens:
|
|
:return:
|
|
"""
|
|
if tokens is None:
|
|
return None
|
|
|
|
if len(tokens) == 0:
|
|
return 0, 0
|
|
|
|
if tokens[0].type == TokenKind.EOF:
|
|
return 0, 0
|
|
|
|
start = 1 if tokens[0].type == TokenKind.WHITESPACE else 0
|
|
end = len(tokens) - 1
|
|
while tokens[end].type in (TokenKind.WHITESPACE, TokenKind.EOF):
|
|
end -= 1
|
|
|
|
return start, end
|
|
|
|
|
|
class BaseTokenizerIterParser(BaseParser):
|
|
|
|
def __init__(self, name, priority, parse_word=False, none_on_eof=True):
|
|
super().__init__(name, priority)
|
|
self.lexer_iter = None
|
|
self._current = None
|
|
self.context: ExecutionContext = None
|
|
self.text = None
|
|
self.sheerka = None
|
|
|
|
self.parse_word = parse_word
|
|
self.none_on_eof = none_on_eof
|
|
|
|
def reset_parser(self, context, text):
|
|
self.context = context
|
|
self.sheerka = context.sheerka
|
|
|
|
self.text = text
|
|
self.lexer_iter = iter(Tokenizer(text, self.parse_word))
|
|
self._current = None
|
|
|
|
self.next_token()
|
|
|
|
def add_error(self, error, next_token=True):
|
|
self.error_sink.append(error)
|
|
if next_token:
|
|
self.next_token()
|
|
return error
|
|
|
|
def get_token(self) -> Token:
|
|
return self._current
|
|
|
|
def next_token(self, skip_whitespace=True):
|
|
try:
|
|
self._current = next(self.lexer_iter)
|
|
|
|
if self.none_on_eof and self._current.type == TokenKind.EOF:
|
|
self._current = None
|
|
return False
|
|
|
|
if skip_whitespace:
|
|
while self._current.type == TokenKind.WHITESPACE or self._current.type == TokenKind.NEWLINE:
|
|
self._current = next(self.lexer_iter)
|
|
except StopIteration:
|
|
self._current = None
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class BaseSplitIterParser(BaseParser):
|
|
|
|
def __init__(self, name, priority, none_on_eof=False):
|
|
super().__init__(name, priority)
|
|
self._current = None
|
|
self.context: ExecutionContext = None
|
|
self.text = None
|
|
self.sheerka = None
|
|
self.iter_split = None
|
|
self.split_and_eat_tokens = (" ", "\n", "\t")
|
|
self.split_and_keep_tokens = ("=", ")", "(", ",")
|
|
self.split_tokens = self.split_and_eat_tokens + self.split_and_keep_tokens
|
|
|
|
self.none_on_eof = none_on_eof # current token is set to None when EOF is hit
|
|
|
|
def parse_word(self, c, index, line, column):
|
|
end = self.split_tokens
|
|
escaped = False
|
|
buffer = ""
|
|
|
|
while escaped or c not in end:
|
|
if not escaped and c == "\\":
|
|
escaped = True
|
|
elif not escaped and c in ("'", '"'):
|
|
end = [c]
|
|
else:
|
|
buffer += c
|
|
escaped = False
|
|
|
|
index, column = index + 1, column + 1
|
|
if index == len(self.text):
|
|
break
|
|
c = self.text[index]
|
|
|
|
if c == "\n":
|
|
line += 1
|
|
column = 0
|
|
|
|
if c not in self.split_and_keep_tokens: # 'not in' instead of 'in' to when c is a quote
|
|
index, column = index + 1, column + 1
|
|
|
|
return buffer, index, line, column
|
|
|
|
def split(self):
|
|
index = 0
|
|
line = 1
|
|
column = 1
|
|
|
|
while index < len(self.text):
|
|
c = self.text[index]
|
|
|
|
if c == "=":
|
|
if index + 1 < len(self.text) and self.text[index + 1] == "=":
|
|
yield Token(TokenKind.EQUALSEQUALS, "==", index, line, column)
|
|
index, column = index + 2, column + 2
|
|
else:
|
|
yield Token(TokenKind.EQUALS, "=", index, line, column)
|
|
index, column = index + 1, column + 1
|
|
elif c == ")":
|
|
yield Token(TokenKind.RPAR, ")", index, line, column)
|
|
index, column = index + 1, column + 1
|
|
elif c == "(":
|
|
yield Token(TokenKind.LPAR, "(", index, line, column)
|
|
index, column = index + 1, column + 1
|
|
elif c == ",":
|
|
yield Token(TokenKind.COMMA, ",", index, line, column)
|
|
index, column = index + 1, column + 1
|
|
else:
|
|
|
|
buffer, end_index, end_line, end_column = self.parse_word(c, index, line, column)
|
|
if buffer:
|
|
yield Token(TokenKind.WORD, buffer, index, line, column)
|
|
index, line, column = end_index, end_line, end_column
|
|
|
|
yield Token(TokenKind.EOF, "<eof>", index, line, column)
|
|
|
|
def reset_parser(self, context, text):
|
|
self.context = context
|
|
self.sheerka = context.sheerka if context else None
|
|
|
|
self.text = text
|
|
self._current = None
|
|
self.iter_split = iter(self.split())
|
|
|
|
def add_error(self, error, next_token=True):
|
|
self.error_sink.append(error)
|
|
if next_token:
|
|
self.next_token()
|
|
return error
|
|
|
|
def get_token(self) -> Token:
|
|
return self._current
|
|
|
|
def next_token(self):
|
|
try:
|
|
self._current = next(self.iter_split)
|
|
if self._current.type == TokenKind.EOF:
|
|
if self.none_on_eof:
|
|
self._current = None
|
|
return False
|
|
except StopIteration:
|
|
self._current = None
|
|
return False
|
|
|
|
return True
|