import ast import logging from dataclasses import dataclass import core.utils from core.builtin_concepts import BuiltinConcepts from core.sheerka.services.SheerkaExecute import ParserInput from core.tokenizer import TokenKind from parsers.BaseParser import BaseParser, Node, ParsingError log = logging.getLogger(__name__) def get_python_node(obj): if isinstance(obj, PythonNode): return obj if hasattr(obj, "python_node"): return obj.python_node return None @dataclass() class PythonErrorNode(ParsingError): source: str exception: Exception # def __post_init__(self): # self.log.debug("-> PythonErrorNode: " + str(self.exception)) @dataclass() class ConceptDetectedError(ParsingError): name: str class PythonNode(Node): def __init__(self, source, ast_=None, objects=None): self.source = source self.ast_ = ast_ # if ast_ else ast.parse(source, mode="eval") if source else None self.objects = objects or {} # when objects (mainly concepts or rules) are recognized in the expression self.compiled = None self.ast_str = self.get_dump(self.ast_) def init_ast(self): if self.ast_ is None and self.source: self.ast_ = ast.parse(self.source, mode="eval") self.ast_str = self.get_dump(self.ast_) return self def get_compiled(self): if self.compiled is None: self.compiled = compile(self.ast_, "", "eval") return self.compiled def __repr__(self): ast_type = "expr" if isinstance(self.ast_, ast.Expression) else "module" return "PythonNode(" + ast_type + "='" + self.source + "')" def __eq__(self, other): if not isinstance(other, PythonNode): return False if self.source != other.source: return False if self.ast_ and other.ast_: self_dump = self.get_dump(self.ast_) other_dump = self.get_dump(other.ast_) return self_dump == other_dump return True def __hash__(self): return hash((self.source, self.ast_.hash)) @staticmethod def get_dump(ast_): if not ast_: return None dump = ast.dump(ast_) for to_remove in [", ctx=Load()", ", kind=None", ", type_ignores=[]"]: dump = dump.replace(to_remove, "") return dump class PythonGetNamesVisitor(ast.NodeVisitor): """ This visitor will find all the name declared in the ast """ def __init__(self): self.names = set() def visit_Name(self, node): self.names.add(node.id) class PythonParser(BaseParser): """ Parse Python scripts """ NAME = "Python" def __init__(self, **kwargs): BaseParser.__init__(self, PythonParser.NAME, 50) self.source = kwargs.get("source", "") def parse(self, context, parser_input: ParserInput): if not isinstance(parser_input, ParserInput): return None sheerka = context.sheerka tree = None tracker = {} # to keep track of concept tokens (c:xxx:) python_switcher = { TokenKind.CONCEPT: lambda t: core.utils.encode_concept(t.value), TokenKind.RULE: lambda t: core.utils.encode_concept(t.value, "R") } if self.reset_parser(context, parser_input): source_code = parser_input.as_text(python_switcher, tracker) source_code = source_code.strip() # first, try to parse an expression res, tree, error = self.try_parse_expression(source_code) if not res: # then try to parse a statement res, tree, error = self.try_parse_statement(source_code) if not res: error_node = PythonErrorNode(parser_input.as_text(), error) self.error_sink.append(error_node) # Python parser will refuse input that directly refers to a concept if isinstance(tree, ast.Expression) and isinstance(tree.body, ast.Name): if tree.body.id in tracker or context.sheerka.fast_resolve(tree.body.id, return_new=False) is not None: context.log("It's a simple concept. Not for me.", self.name) self.error_sink.append(ConceptDetectedError(tree.body.id)) if self.has_error: ret = sheerka.ret( self.name, False, sheerka.new( BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text(), reason=self.error_sink)) else: ret = sheerka.ret( self.name, True, sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input.as_text(), body=PythonNode(source_code, tree, tracker), try_parsed=None)) self.log_result(context, parser_input.as_text(), ret) return ret def try_parse_expression(self, text): try: return True, ast.parse(text, f"<{self.source}>", 'eval'), None except Exception as error: return False, None, error def try_parse_statement(self, text): try: return True, ast.parse(text, f"<{self.source}>", 'exec'), None except Exception as error: return False, None, error