from core.builtin_concepts import BuiltinConcepts from core.tokenizer import Tokenizer, LexerError, TokenKind from parsers.BaseParser import BaseParser, Node, ErrorNode from dataclasses import dataclass, field import ast import logging from parsers.ConceptLexerParser import ConceptNode log = logging.getLogger(__name__) @dataclass() class PythonErrorNode(ErrorNode): source: str exception: Exception # def __post_init__(self): # self.log.debug("-> PythonErrorNode: " + str(self.exception)) class PythonNode(Node): def __init__(self, source, ast_, concepts=None): self.source = source self.ast_ = ast_ self.concepts = concepts or {} # when concepts are recognized in the expression # def __repr__(self): # return "PythonNode(source='" + self.source + "', ast=" + self.get_dump(self.ast_) + ")" def __repr__(self): ast_type = "expr" if isinstance(self.ast_, ast.Expression) else "module" return "PythonNode(" + ast_type + "='" + self.source + "')" def __eq__(self, other): if not isinstance(other, PythonNode): return False if self.source != other.source: return False self_dump = self.get_dump(self.ast_) other_dump = self.get_dump(other.ast_) return self_dump == other_dump def __hash__(self): return hash((self.source, self.ast_.hash)) @staticmethod def get_dump(ast_): dump = ast.dump(ast_) for to_remove in [", ctx=Load()", ", kind=None", ", type_ignores=[]"]: dump = dump.replace(to_remove, "") return dump class PythonParser(BaseParser): """ Parse Python scripts """ def __init__(self, **kwargs): BaseParser.__init__(self, "Python", 50) self.source = kwargs.get("source", "") def parse(self, context, text): sheerka = context.sheerka tree = None python_switcher = { TokenKind.CONCEPT: lambda t: f"__C__USE_CONCEPT__{t.value}__C__" } try: if isinstance(text, str) and "c:" in text: source = self.get_text_from_tokens(list(Tokenizer(text)), python_switcher) elif isinstance(text, str): source = text else: source = self.get_text_from_tokens(text, python_switcher) source = source.strip() text = text if isinstance(text, str) else source # first, try to parse an expression res, tree, error = self.try_parse_expression(source) if not res: # then try to parse a statement res, tree, error = self.try_parse_statement(source) if not res: self.has_error = True error_node = PythonErrorNode(text, error) self.error_sink.append(error_node) except LexerError as e: self.has_error = True self.error_sink.append(e) ret = sheerka.ret( self.name, not self.has_error, sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=self.error_sink if self.has_error else PythonNode(text, tree), try_parsed=None)) self.log_result(context, text, ret) return ret def try_parse_expression(self, text): try: return True, ast.parse(text, f"<{self.source}>", 'eval'), None except Exception as error: return False, None, error def try_parse_statement(self, text): try: return True, ast.parse(text, f"<{self.source}>", 'exec'), None except Exception as error: return False, None, error class PythonGetNamesVisitor(ast.NodeVisitor): """ This visitor will find all the name declared in the ast """ def __init__(self): self.names = set() def visit_Name(self, node): self.names.add(node.id) class LexerNodeParserHelperForPython: """Helper class to parse mix of concepts and Python""" def __init__(self): self.identifiers = {} # cache for already created identifier (the key is id(concept)) self.identifiers_key = {} # number of identifiers with the same root (prefix) def _get_identifier(self, concept): """ Get an identifier for a concept. Make sure to return the same identifier if the same concept Make sure to return a different identifier if same name but different concept Internal function because I don't want identifiers, identifiers_key and python_ids_mappings to be instance variables I would like to keep this parser as stateless as possible :param concept: :return: """ if id(concept) in self.identifiers: return self.identifiers[id(concept)] identifier = "__C__" + self._sanitize(concept.key or concept.name) if concept.id: identifier += "__" + concept.id if identifier in self.identifiers_key: self.identifiers_key[identifier] += 1 identifier += f"_{self.identifiers_key[identifier]}" else: self.identifiers_key[identifier] = 0 identifier += "__C__" self.identifiers[id(concept)] = identifier return identifier @staticmethod def _sanitize(identifier): res = "" for c in identifier: res += c if c.isalnum() else "0" return res def parse(self, context, nodes): source = "" to_parse = "" concepts = {} # the key is the Python identifier for node in nodes: if isinstance(node, ConceptNode): source += node.source if to_parse: to_parse += " " concept = node.concept python_id = self._get_identifier(concept) to_parse += python_id concepts[python_id] = concept else: source += node.source to_parse += node.source with context.push(self, desc="Trying Python for '" + to_parse + "'") as sub_context: sub_context.add_inputs(to_parse=to_parse) python_parser = PythonParser() result = python_parser.parse(sub_context, to_parse) sub_context.add_values(return_values=result) if result.status: python_node = result.body.body python_node.source = source python_node.concepts = concepts return python_node return result.body # the error