Files
Sheerka-Old/core/sheerka.py
T

447 lines
17 KiB
Python

from dataclasses import dataclass
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept
from core.concept import Concept, ConceptParts
from evaluators.BaseEvaluator import OneReturnValueEvaluator
from parsers.BaseParser import BaseParser
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError
import core.utils
import logging
log = logging.getLogger(__name__)
class Sheerka(Concept):
"""
Main controller for the project
"""
CONCEPTS_ENTRY = "All_Concepts"
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts"
USER_CONCEPTS_KEYS = "User_Concepts"
def __init__(self, debug=False):
log.debug("Starting Sheerka.")
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
# cache of the most used concepts
# Note that these are only templates
# They are used as a footprint for instantiation
self.concepts_cache = {}
# cache for builtin types.
# It allow instantiation of a builtin clas
self.builtin_cache = {}
# a concept can be instantiated
# ex: File is a concept, but File('foo.txt') is an instance
# TODO: manage contexts
self.instances = []
# List of the known rules by the system
# ex: hello => say('hello')
self.rules = []
self.sdp = None
self.parsers = []
self.evaluators = []
self.debug = debug
def initialize(self, root_folder=None):
"""
Starting Sheerka
Loads the current configuration
Notes that when it's the first time, it also create the needed working folders
:param root_folder: root configuration folder
:return: ReturnValue(Success or Error)
"""
try:
self.init_logging()
self.sdp = SheerkaDataProvider(root_folder)
if self.sdp.first_time:
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
self.initialize_builtin_concepts()
self.parsers.append(core.utils.get_class("parsers.DefaultParser.DefaultParser"))
self.parsers.append(core.utils.get_class("parsers.PythonParser.PythonParser"))
self.parsers.append(core.utils.get_class("parsers.ExactConceptParser.ExactConceptParser"))
self.evaluators.append(core.utils.new_object("evaluators.ParsersEvaluator.ParsersEvaluator"))
self.evaluators.append(core.utils.new_object("evaluators.AddConceptEvaluator.AddConceptEvaluator"))
self.evaluators.append(core.utils.new_object("evaluators.PythonEvaluator.PythonEvaluator"))
self.evaluators.append(core.utils.new_object("evaluators.ConceptEvaluator.ConceptEvaluator"))
self.evaluators.append(
core.utils.new_object("evaluators.DuplicateConceptEvaluator.DuplicateConceptEvaluator"))
except IOError as e:
return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
return ReturnValueConcept(self, True, self)
def set_id_if_needed(self, obj, is_builtin):
"""
Set the key for the concept if needed
:param obj:
:param is_builtin:
:return:
"""
if obj.id is not None:
return
obj.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
log.debug(f"Setting id '{obj.id}' to concept '{obj.name}'.")
def initialize_builtin_concepts(self):
"""
Initializes the builtin concepts
:return: None
"""
log.debug("Initializing builtin concepts")
builtins_classes = self.get_builtins_classes_as_dict()
# this all initialization of the builtins seems to be little bit complicated
# why do we need to update it from DB ?
for key in BuiltinConcepts:
concept = self if key == BuiltinConcepts.SHEERKA \
else builtins_classes[str(key)]() if str(key) in builtins_classes \
else Concept(key, True, False, key)
if not concept.is_unique and str(key) in builtins_classes:
self.builtin_cache[key] = builtins_classes[str(key)]
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
if from_db is None:
log.debug(f"'{concept.name}' concept is not found in db. Adding.")
self.set_id_if_needed(concept, True)
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
else:
log.debug(f"Found concept '{from_db}' in db. Updating.")
concept.update_from(from_db)
self.add_in_cache(concept)
def init_logging(self):
if self.debug:
log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
log_level = logging.DEBUG
else:
log_format = "%(message)s"
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level)
def eval(self, text):
evt_digest = self.sdp.save_event(Event(text))
exec_context = ExecutionContext(self.key, evt_digest, self)
before_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.BEFORE_PARSING))
return_values = self.process(exec_context, [], [before_parsing])
return_values = core.utils.remove_from_list(return_values, [before_parsing])
parsing_results = self.parse(exec_context, text)
return_values.extend(parsing_results)
processing_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.PARSING))
return_values = self.process(exec_context, return_values, [processing_parsing])
return_values = core.utils.remove_from_list(return_values, [processing_parsing])
after_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.AFTER_PARSING))
return_values = self.process(exec_context, return_values, [after_parsing])
return_values = core.utils.remove_from_list(return_values, [after_parsing])
return return_values
def expect_one(self, context, items):
if not isinstance(items, list):
items = [items]
if len(items) == 0:
return self.ret(context.who, False, self.new(BuiltinConcepts.IS_EMPTY, obj=items))
successful_results = [item for item in items if item.status]
number_of_successful = len(successful_results)
total_items = len(items)
# remove errors when a winner is found
if number_of_successful == 1:
# log.debug(f"1 / {total_items} good item found.")
return successful_results[0]
# too many winners, which one to choose ?
if number_of_successful > 1:
log.debug(f"{number_of_successful} / {total_items} good items. Too many success")
return self.ret(context.who, False, self.new(BuiltinConcepts.TOO_MANY_SUCCESS, obj=successful_results))
# only errors, i cannot help you
log.debug(f"{total_items} items. Only errors")
return self.ret(context.who, False, self.new(BuiltinConcepts.TOO_MANY_ERRORS, obj=items))
def parse(self, context, text):
result = []
if log.isEnabledFor(logging.DEBUG):
debug_text = "'" + text + "'" if isinstance(text, str) \
else "'" + BaseParser.get_text_from_tokens(text) + "' as tokens"
log.debug(f"Parsing {debug_text}")
for parser in self.parsers:
p = parser()
res = p.parse(context, text)
if isinstance(res, list):
result.extend(res)
else:
result.append(res)
return result
def process(self, context, return_values, contextual_concepts=None):
log.debug("Evaluating parsing result.")
# init
if not isinstance(return_values, list):
return_values = [return_values]
if contextual_concepts:
return_values.extend(contextual_concepts)
# group the evaluators by priority and sort them
# The first one to be applied will be the one with the highest priority
grouped_evaluators = {}
for item in self.evaluators:
grouped_evaluators.setdefault(item.priority, []).append(item)
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
# process
while True:
simple_digest = return_values[:] # set(id(r) for r in return_values)
for priority in sorted_priorities:
# log.debug("Processing priority " + str(priority))
# for item in return_values:
# log.debug(item)
original_items = return_values[:]
evaluated_items = []
to_delete = []
for evaluator in grouped_evaluators[priority]:
# process evaluators that work on return value
if isinstance(evaluator, OneReturnValueEvaluator):
for item in original_items:
if evaluator.matches(context, item):
result = evaluator.eval(context, item)
if result is None:
continue
elif isinstance(result, list):
evaluated_items.extend(result)
to_delete.append(item)
elif isinstance(result, ReturnValueConcept):
evaluated_items.append(result)
to_delete.append(item)
else:
error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
evaluator=evaluator)
evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item]))
to_delete.append(item)
# process evaluators that work on all return values
else:
if evaluator.matches(context, original_items):
results = evaluator.eval(context, original_items)
if not isinstance(results, list):
results = [results]
for result in results:
evaluated_items.append(result)
to_delete.extend(result.parents)
return_values = evaluated_items
return_values.extend([item for item in original_items if item not in to_delete])
# have we done something ?
to_compare = return_values[:] # set(id(r) for r in return_values)
if simple_digest == to_compare:
break
return return_values
def create_new_concept(self, context, concept):
"""
Adds a new concept to the system
:param context:
:param concept: DefConceptNode
:return: digest of the new concept
"""
concept.init_key()
# checks for duplicate concepts
if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()):
error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept)
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
# set id before saving in db
self.set_id_if_needed(concept, False)
# save the new context in sdp
try:
self.sdp.add(context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True)
except SheerkaDataProviderDuplicateKeyError as error:
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
# add in cache for quick further reference
self.concepts_cache[concept.key] = concept
# process the return in needed
ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept))
return ret
def add_codes_to_concept(self, context, concept):
"""
Updates the codes of the newly created concept
Basically, it runs the parsers on all parts
:param concept:
:param context:
:return:
"""
for part_key in ConceptParts:
source = getattr(concept, part_key.value)
if source is None or source == "":
continue
ret_val = self.expect_one(context, self.parse(context, source))
concept.codes[part_key] = ret_val
def add_in_cache(self, concept):
"""
Adds a concept template in cache.
The cache is used as a proxy before looking at sdp
:param concept:
:return:
"""
self.concepts_cache[concept.key] = concept
def get(self, concept_key):
"""
Tries to find a concept
What is return must be used a template for another concept.
You must not modify the returned concept
:param concept_key:
:return:
"""
if isinstance(concept_key, BuiltinConcepts):
concept_key = str(concept_key)
# first search in cache
if concept_key in self.concepts_cache:
return self.concepts_cache[concept_key]
# else look in sdp
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
if from_db is not None:
return from_db
# else return new Unknown concept
# Note that I don't call the new() method, as it use get() -> cyclic call
unknown_concept = Concept()
template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)]
unknown_concept.update_from(template)
unknown_concept.body = concept_key
return unknown_concept
def new(self, concept_key, **kwargs):
"""
Returns an instance of a new concept
When the concept is supposed to be unique, returns the same instance
:param concept_key:
:param kwargs:
:return:
"""
template = self.get(concept_key)
# manage concept not found
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
return template
# manage singleton
if template.is_unique:
return template
# otherwise, create another instance
concept = self.builtin_cache[concept_key]() if concept_key in self.builtin_cache else Concept()
concept.update_from(template)
# update the properties
for k, v in kwargs.items():
if k in concept.props:
concept.set_prop(k, v)
elif hasattr(concept, k):
setattr(concept, k, v)
else:
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
# TODO : add the concept to the list of known concepts (self.instances)
return concept
def ret(self, who, status, value, message=None, parents=None):
"""
Creates and returns a ReturnValue concept
:param who:
:param status:
:param value:
:param message:
:param parents:
:return:
"""
return self.new(
BuiltinConcepts.RETURN_VALUE,
who=who,
status=status,
value=value,
message=message,
parents=parents)
def isinstance(self, a, b):
"""
return true if the concept a is an instance of the concept b
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
# TODO : manage when a is the list of all possible b
# for example, if a is a color, it will be found the entry 'All_Colors'
return a.key == b_key
@staticmethod
def get_builtins_classes_as_dict():
res = {}
for c in core.utils.get_classes("core.builtin_concepts"):
if issubclass(c, Concept) and c != Concept:
res[c().key] = c
return res
@staticmethod
def test():
return "I have access to Sheerka !"
@dataclass
class ExecutionContext:
"""
To keep track of the execution of a request
"""
who: object
event_digest: str
sheerka: Sheerka
def push(self, who):
return ExecutionContext(who, self.event_digest, self.sheerka)