321 lines
11 KiB
Python
321 lines
11 KiB
Python
from dataclasses import dataclass
|
|
|
|
from core.concept import Concept, ErrorConcept, Property, TooManySuccessConcept, ReturnValueConcept
|
|
from parsers.PythonParser import PythonGetNamesVisitor, PythonNode
|
|
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError
|
|
from parsers.DefaultParser import DefConceptNode, DefaultParser
|
|
import core.utils
|
|
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Singleton(type):
|
|
_instances = {}
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
if cls not in cls._instances:
|
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
|
return cls._instances[cls]
|
|
|
|
|
|
@dataclass
|
|
class ReturnValue:
|
|
"""
|
|
Class that handle the return of a concept
|
|
To avoid using the try/except pattern for each and every call
|
|
To give context (ie return message) even when the call is successful
|
|
"""
|
|
who: object
|
|
status: bool
|
|
value: Concept
|
|
message: str = None
|
|
|
|
|
|
@dataclass
|
|
class ExecutionContext:
|
|
"""
|
|
To keep track of the execution of a request
|
|
"""
|
|
sheerka: object
|
|
event_digest: str
|
|
|
|
|
|
class Sheerka(Concept):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
NAME = "Sheerka"
|
|
UNKNOWN_CONCEPT_NAME = "Unknown Concept"
|
|
SUCCESS_CONCEPT_NAME = "Success"
|
|
CONCEPT_TOO_LONG_CONCEPT_NAME = "Concept too long"
|
|
|
|
CONCEPTS_ENTRY = "All_Concepts"
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts"
|
|
USER_CONCEPTS_KEYS = "User_Concepts"
|
|
|
|
def __init__(self, debug=False):
|
|
log.debug("Starting Sheerka.")
|
|
super().__init__(Sheerka.NAME)
|
|
|
|
# cache of the most used concepts
|
|
# Note that these are only templates
|
|
# They are used as a footprint for instantiation
|
|
self.concepts_cache = {}
|
|
|
|
# a concept can be instantiated
|
|
# ex: File is a concept, but File('foo.txt') is an instance
|
|
# TODO: manage contexts
|
|
self.instances = []
|
|
|
|
# List of the known rules by the system
|
|
# ex: hello => say('hello')
|
|
self.rules = []
|
|
|
|
self.sdp = None
|
|
self.parsers = []
|
|
self.evaluators = []
|
|
|
|
self.key = self.NAME
|
|
self.debug = debug
|
|
|
|
def initialize(self, root_folder=None):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param debug:
|
|
:param root_folder: root configuration folder
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
try:
|
|
self.init_logging()
|
|
self.sdp = SheerkaDataProvider(root_folder)
|
|
self.parsers.append(core.utils.get_class("parsers.DefaultParser.DefaultParser"))
|
|
self.parsers.append(core.utils.get_class("parsers.PythonParser.PythonParser"))
|
|
#self.parsers.append(core.utils.get_class("parsers.ExactConceptParser.ExactConceptParser"))
|
|
|
|
self.evaluators.append(core.utils.get_object("evaluators.DefaultEvaluator.DefaultEvaluator"))
|
|
self.evaluators.append(core.utils.get_object("evaluators.AddConceptEvaluator.AddConceptEvaluator"))
|
|
self.evaluators.append(core.utils.get_object("evaluators.PythonEvaluator.PythonEvaluator"))
|
|
|
|
if self.sdp.first_time:
|
|
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
|
|
|
|
self.create_builtin_concepts()
|
|
except IOError as e:
|
|
return ReturnValue(self, False, self.get(ErrorConcept.NAME), e)
|
|
|
|
return ReturnValue(self, True, self.get(Sheerka.SUCCESS_CONCEPT_NAME))
|
|
|
|
def set_id_if_needed(self, obj, is_builtin):
|
|
"""
|
|
Set the key for the concept if needed
|
|
:param obj:
|
|
:param is_builtin:
|
|
:return:
|
|
"""
|
|
if obj.id is not None:
|
|
return
|
|
obj.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
|
|
log.debug(f"Setting id '{obj.id}' to concept '{obj.name}'.")
|
|
|
|
def create_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
log.debug("Initializing builtin concepts")
|
|
builtins = [
|
|
self,
|
|
Concept(Sheerka.UNKNOWN_CONCEPT_NAME, key=Sheerka.UNKNOWN_CONCEPT_NAME),
|
|
Concept(Sheerka.SUCCESS_CONCEPT_NAME, key=Sheerka.SUCCESS_CONCEPT_NAME),
|
|
Concept(Sheerka.CONCEPT_TOO_LONG_CONCEPT_NAME, key=Sheerka.CONCEPT_TOO_LONG_CONCEPT_NAME),
|
|
ErrorConcept(),
|
|
TooManySuccessConcept(),
|
|
ReturnValueConcept(),
|
|
]
|
|
|
|
for concept in builtins:
|
|
self.add_in_cache(concept)
|
|
|
|
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
|
|
if from_db is None:
|
|
log.debug(f"'{concept.name}' concept is not found. Adding.")
|
|
self.set_id_if_needed(concept, True)
|
|
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
else:
|
|
log.debug(f"Found concept '{from_db}'. Updating.")
|
|
concept.update_from(from_db)
|
|
|
|
def init_logging(self):
|
|
if self.debug:
|
|
log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_format = "%(message)s"
|
|
log_level = logging.INFO
|
|
|
|
logging.basicConfig(format=log_format, level=log_level)
|
|
|
|
def eval(self, text):
|
|
evt_digest = self.sdp.save_event(Event(text))
|
|
exec_context = ExecutionContext(self, evt_digest)
|
|
return_values = self.try_parse(exec_context, text)
|
|
return_values = self.try_eval(exec_context, return_values)
|
|
|
|
# return_values = []
|
|
# for parser_name, status, node in result:
|
|
# if not status:
|
|
# return_values.append(ReturnValue(False, ErrorConcept(body=node)))
|
|
# elif status and isinstance(node, DefConceptNode):
|
|
# return_values.append(self.add_concept(exec_context, node))
|
|
# else:
|
|
# return_values.append(ReturnValue(True, node))
|
|
|
|
return return_values
|
|
|
|
def try_parse(self, context, text):
|
|
result = []
|
|
log.debug(f"Parsing '{text}'")
|
|
for parser in self.parsers:
|
|
p = parser()
|
|
# try:
|
|
# tree = p.parse()
|
|
# result.append((p.name, tree))
|
|
# except Exception as e:
|
|
# result.append((p.name, e))
|
|
tree = p.parse(context, text)
|
|
result.append(ReturnValue(p.name, not p.has_error, p.error_sink if p.has_error else tree))
|
|
return result
|
|
|
|
def try_eval(self, context, items):
|
|
log.debug("Evaluating parsing result.")
|
|
# group the evaluators by priority and sort them
|
|
# The first one to be applied will be the one with the highest priority
|
|
grouped_evaluators = {}
|
|
for item in self.evaluators:
|
|
grouped_evaluators.setdefault(item.priority, []).append(item)
|
|
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
|
|
|
|
for priority in sorted_priorities:
|
|
log.debug("Processing priority " + str(priority))
|
|
for item in items:
|
|
log.debug(item)
|
|
original_items = items[:]
|
|
evaluated_items = []
|
|
for evaluator in grouped_evaluators[priority]:
|
|
if evaluator.matches(context, original_items):
|
|
result = evaluator.eval(context, original_items)
|
|
if isinstance(result, list):
|
|
evaluated_items.extend(result)
|
|
else:
|
|
evaluated_items.append(result)
|
|
|
|
# what was computed by this group will be the input of the following group
|
|
items = evaluated_items if len(evaluated_items) > 0 else original_items
|
|
|
|
return items
|
|
|
|
def add_concept(self, exec_context, def_concept_node: DefConceptNode):
|
|
"""
|
|
Adds a new concept to the system
|
|
:param exec_context:
|
|
:param def_concept_node: DefConceptNode
|
|
:return: digest of the new concept
|
|
"""
|
|
|
|
# validate the node
|
|
get_names_visitor = PythonGetNamesVisitor()
|
|
|
|
concept = Concept(def_concept_node.name)
|
|
for prop in ("where", "pre", "post", "body"):
|
|
# put back the sources
|
|
concept_part_node = getattr(def_concept_node, prop)
|
|
if isinstance(concept_part_node, PythonNode):
|
|
get_names_visitor.visit(concept_part_node.ast)
|
|
source = concept_part_node.source if hasattr(concept_part_node, "source") else ""
|
|
setattr(concept, prop, source)
|
|
|
|
# try to find variables (eg props)
|
|
# Note that with this method, the variables will be created in the order of appearance
|
|
for token in def_concept_node.tokens["name"]:
|
|
if token.value in get_names_visitor.names:
|
|
concept.set_prop(token.value, None)
|
|
|
|
concept.init_key(def_concept_node.tokens["name"])
|
|
concept.add_codes(def_concept_node.get_codes())
|
|
self.set_id_if_needed(concept, False)
|
|
|
|
try:
|
|
self.sdp.add(exec_context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
except SheerkaDataProviderDuplicateKeyError as error:
|
|
return ReturnValue(self.add_concept.__name__, False, ErrorConcept(body=error), error.args[0])
|
|
return ReturnValue(self.add_concept.__name__, True, concept)
|
|
|
|
def add_in_cache(self, concept):
|
|
"""
|
|
Adds a concept template in cache.
|
|
The cache is used as a proxy before looking at sdp
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
self.concepts_cache[concept.key] = concept
|
|
|
|
def get(self, concept_key):
|
|
"""
|
|
Tries to find a concept
|
|
TODO: how to manage single vs multiple instances
|
|
:param concept_key:
|
|
:return:
|
|
"""
|
|
|
|
# first search in cache
|
|
if concept_key in self.concepts_cache:
|
|
return self.concepts_cache[concept_key]
|
|
|
|
return self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key) or \
|
|
self.new(self.UNKNOWN_CONCEPT_NAME, body=concept_key)
|
|
|
|
def new(self, concept, **kwargs):
|
|
"""
|
|
Returns an instance of a new concept
|
|
TODO: Checks if the concept is supposed to be unique (ex Sheerka, or the number 'one' for example)
|
|
:param concept:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
|
|
if isinstance(concept, str):
|
|
concept = self.get(concept)
|
|
|
|
for k, v in kwargs.items():
|
|
if hasattr(concept, k):
|
|
setattr(concept, k, v)
|
|
|
|
return concept
|
|
|
|
def isinstance(self, a, b):
|
|
"""
|
|
return true if the concept a is an instance of the concept b
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if not isinstance(a, Concept):
|
|
raise SyntaxError("The first parameter of isinstance MUST be a concept")
|
|
|
|
b_key = b if isinstance(b, str) else b.key
|
|
|
|
# TODO : manage when a is the list of all possible b
|
|
# for example, if a is a color, it will be found the entry 'All_Colors'
|
|
return a.key == b_key
|
|
|
|
@staticmethod
|
|
def test():
|
|
return "I have access to Sheerka !"
|