217 lines
7.0 KiB
Python
217 lines
7.0 KiB
Python
from dataclasses import dataclass
|
|
|
|
from core.concept import Concept, ErrorConcept, Property
|
|
from parsers.PythonParser import PythonParser, PythonGetNamesVisitor, PythonNode
|
|
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event
|
|
from parsers.DefaultParser import DefaultParser, DefConceptNode
|
|
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Singleton(type):
|
|
_instances = {}
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
if cls not in cls._instances:
|
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
|
return cls._instances[cls]
|
|
|
|
|
|
@dataclass
|
|
class ReturnValue:
|
|
"""
|
|
Class that handle the return of a concept
|
|
To avoid using the try/except pattern for each and every call
|
|
To give context (ie return message) even when the call is successful
|
|
"""
|
|
status: bool
|
|
value: Concept
|
|
message: str = None
|
|
|
|
|
|
@dataclass
|
|
class ExecutionContext:
|
|
"""
|
|
To keep track of the execution of a request
|
|
"""
|
|
event_digest: str
|
|
|
|
|
|
class Sheerka(Concept, metaclass=Singleton):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
NAME = "Sheerka"
|
|
UNKNOWN_CONCEPT_NAME = "Unknown Concept"
|
|
ERROR_CONCEPT_NAME = "Error"
|
|
SUCCESS_CONCEPT_NAME = "Success"
|
|
|
|
CONCEPTS_ENTRY = "Concepts"
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins"
|
|
USER_CONCEPTS_KEYS = "Concepts"
|
|
|
|
def __init__(self):
|
|
log.debug("Starting Sheerka.")
|
|
super().__init__(Sheerka.NAME)
|
|
|
|
# cache of the most used concepts
|
|
self.concepts_cache = []
|
|
|
|
# a concept can be instantiated
|
|
# ex: File is a concept, but File('foo.txt') is an instance
|
|
# TODO: manage contexts
|
|
self.instances = []
|
|
|
|
# List of the known rules by the system
|
|
# ex: hello => say('hello')
|
|
self.rules = []
|
|
|
|
self.sdp = None
|
|
self.parsers = []
|
|
|
|
self.key = self.NAME
|
|
|
|
def initialize(self, root_folder=None):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param root_folder: root configuration folder
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
try:
|
|
self.sdp = SheerkaDataProvider(root_folder)
|
|
self.parsers.append(lambda text: DefaultParser(text, PythonParser))
|
|
|
|
if self.sdp.first_time:
|
|
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
|
|
|
|
self.create_builtin_concepts()
|
|
except IOError as e:
|
|
return ReturnValue(False, self.get_concept(Sheerka.ERROR_CONCEPT_NAME), e)
|
|
|
|
return ReturnValue(True, self.get_concept(Sheerka.SUCCESS_CONCEPT_NAME))
|
|
|
|
def set_id_if_needed(self, obj, is_builtin):
|
|
"""
|
|
Set the key for the concept if needed
|
|
:param obj:
|
|
:param is_builtin:
|
|
:return:
|
|
"""
|
|
if obj.id is not None:
|
|
return
|
|
obj.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
|
|
log.debug(f"Setting id '{obj.id}' to concept '{obj.name}'.")
|
|
|
|
def create_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
log.debug("Initializing builtin concepts")
|
|
builtins = [
|
|
self,
|
|
Concept(Sheerka.UNKNOWN_CONCEPT_NAME, key=Sheerka.UNKNOWN_CONCEPT_NAME),
|
|
Concept(Sheerka.SUCCESS_CONCEPT_NAME, key=Sheerka.SUCCESS_CONCEPT_NAME),
|
|
Concept(Sheerka.ERROR_CONCEPT_NAME, key=Sheerka.ERROR_CONCEPT_NAME),
|
|
]
|
|
|
|
for concept in builtins:
|
|
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.name)
|
|
if from_db is None:
|
|
log.debug(f"'{concept.name}' concept is not found. Adding.")
|
|
self.set_id_if_needed(concept, True)
|
|
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
else:
|
|
log.debug(f"Found concept '{from_db}'. Updating.")
|
|
concept.update_from(from_db)
|
|
|
|
def eval(self, text):
|
|
evt_digest = self.sdp.save_event(Event(text))
|
|
exec_context = ExecutionContext(evt_digest)
|
|
result = self.try_parse(text)
|
|
|
|
return_values = []
|
|
for parser_name, status, node in result:
|
|
if not status:
|
|
return_values.append(ReturnValue(False, ErrorConcept(body=node)))
|
|
elif status and isinstance(node, DefConceptNode):
|
|
return_values.append(self.add_concept(exec_context, node))
|
|
|
|
return return_values
|
|
|
|
def try_parse(self, text):
|
|
result = []
|
|
log.debug(f"Parsing '{text}'")
|
|
for parser in self.parsers:
|
|
p = parser(text)
|
|
# try:
|
|
# tree = p.parse()
|
|
# result.append((p.name, tree))
|
|
# except Exception as e:
|
|
# result.append((p.name, e))
|
|
tree = p.parse()
|
|
result.append((p.name, not p.has_error, p.error_sink if p.has_error else tree))
|
|
return result
|
|
|
|
def get_concept(self, name):
|
|
"""
|
|
Given a concept name, tries to find it
|
|
:param name: name of the concept to look for
|
|
:param is_builtin: is it a builtin concept ?
|
|
:return: concept if found, UNKNOWN_CONCEPT otherwise
|
|
"""
|
|
for concept in self.concepts_cache:
|
|
if concept.name == name:
|
|
return concept
|
|
return ErrorConcept()
|
|
|
|
def add_concept(self, exec_context, def_concept_node: DefConceptNode):
|
|
"""
|
|
Adds a new concept to the system
|
|
:param exec_context:
|
|
:param def_concept_node: DefConceptNode
|
|
:return: digest of the new concept
|
|
"""
|
|
|
|
# validate the node
|
|
get_names_visitor = PythonGetNamesVisitor()
|
|
|
|
concept = Concept(def_concept_node.name)
|
|
for prop in ("where", "pre", "post", "body"):
|
|
# put back the sources
|
|
concept_part_node = getattr(def_concept_node, prop)
|
|
if isinstance(concept_part_node, PythonNode):
|
|
get_names_visitor.visit(concept_part_node.ast)
|
|
source = concept_part_node.source if hasattr(concept_part_node, "source") else ""
|
|
setattr(concept, prop, source)
|
|
|
|
# try to find variables (eg props)
|
|
for token in def_concept_node.tokens["name"]:
|
|
if token.value in get_names_visitor.names:
|
|
concept.props.append(Property(token.value, None))
|
|
|
|
concept.key = DefaultParser.get_concept_name(def_concept_node.tokens["name"], [p.name for p in concept.props])
|
|
concept.add_codes(def_concept_node.get_codes())
|
|
self.set_id_if_needed(concept, False)
|
|
|
|
self.sdp.add(exec_context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
|
|
return ReturnValue(True, concept)
|
|
|
|
@staticmethod
|
|
def concept_equals(concept1, concept2):
|
|
"""True if the two concepts refer to the same concept"""
|
|
if concept1 is None and concept2 is None:
|
|
return True
|
|
|
|
if concept1 is None or concept2 is None:
|
|
return False
|
|
|
|
return concept1.key == concept2.key
|