Files
Sheerka-Old/core/sheerka.py
T

606 lines
22 KiB
Python

from dataclasses import dataclass
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept
from core.concept import Concept, ConceptParts
from evaluators.BaseEvaluator import OneReturnValueEvaluator
from parsers.BaseParser import BaseParser
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError
import core.utils
import core.builtin_helpers
import logging
log = logging.getLogger(__name__)
concept_evaluation_steps = [BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION]
class Sheerka(Concept):
"""
Main controller for the project
"""
CONCEPTS_ENTRY = "All_Concepts"
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts"
USER_CONCEPTS_KEYS = "User_Concepts"
def __init__(self, debug=False):
log.debug("Starting Sheerka.")
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
# cache of the most used concepts
# Note that these are only templates
# They are used as a footprint for instantiation
self.concepts_cache = {}
# cache for builtin types.
# It allow instantiation of a builtin clas
self.builtin_cache = {}
# a concept can be instantiated
# ex: File is a concept, but File('foo.txt') is an instance
# TODO: manage contexts
self.instances = []
# List of the known rules by the system
# ex: hello => say('hello')
self.rules = []
self.sdp = None
self.parsers = []
self.evaluators = []
self.evaluators_prefix = None
self.parsers_prefix = None
self.debug = debug
def initialize(self, root_folder=None):
"""
Starting Sheerka
Loads the current configuration
Notes that when it's the first time, it also create the needed working folders
:param root_folder: root configuration folder
:return: ReturnValue(Success or Error)
"""
try:
self.init_logging()
self.sdp = SheerkaDataProvider(root_folder)
if self.sdp.first_time:
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
self.initialize_builtin_concepts()
self.initialize_builtin_parsers()
self.initialize_builtin_evaluators()
except IOError as e:
return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
return ReturnValueConcept(self, True, self)
def set_id_if_needed(self, obj, is_builtin):
"""
Set the key for the concept if needed
:param obj:
:param is_builtin:
:return:
"""
if obj.id is not None:
return
obj.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
log.debug(f"Setting id '{obj.id}' to concept '{obj.name}'.")
def initialize_builtin_concepts(self):
"""
Initializes the builtin concepts
:return: None
"""
log.debug("Initializing builtin concepts")
builtins_classes = self.get_builtins_classes_as_dict()
# this all initialization of the builtins seems to be little bit complicated
# why do we need to update it from DB ?
for key in BuiltinConcepts:
concept = self if key == BuiltinConcepts.SHEERKA \
else builtins_classes[str(key)]() if str(key) in builtins_classes \
else Concept(key, True, False, key)
if not concept.is_unique and str(key) in builtins_classes:
self.builtin_cache[key] = builtins_classes[str(key)]
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
if from_db is None:
log.debug(f"'{concept.name}' concept is not found in db. Adding.")
self.set_id_if_needed(concept, True)
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
else:
log.debug(f"Found concept '{from_db}' in db. Updating.")
concept.update_from(from_db)
self.add_in_cache(concept)
def initialize_builtin_parsers(self):
"""
Init the parsers
:return:
"""
for parser in core.utils.get_sub_classes("parsers", "parsers.BaseParser.BaseParser"):
log.debug(f"Adding builtin parser '{parser.__name__}'")
self.parsers.append(parser)
def initialize_builtin_evaluators(self):
"""
Init the evaluators
:return:
"""
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
def init_logging(self):
if self.debug:
log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
log_level = logging.DEBUG
else:
log_format = "%(message)s"
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level)
def eval(self, text):
"""
Note to KSI: If you try to add execution context to this function,
You may end in an infinite loop
:param text:
:return:
"""
evt_digest = self.sdp.save_event(Event(text))
exec_context = ExecutionContext(self.key, evt_digest, self)
# Before parsing
before_parsing = self.new(BuiltinConcepts.BEFORE_PARSING)
return_values = self.process(exec_context, [], [before_parsing])
return_values = core.utils.remove_from_list(return_values, lambda x: x.value == before_parsing)
# parse
parsing_results = self.parse(exec_context, text)
return_values.extend(parsing_results)
# evaluate
evaluating = self.new(BuiltinConcepts.EVALUATION)
return_values = self.process(exec_context, return_values, [evaluating])
return_values = core.utils.remove_from_list(return_values, lambda x: x.value == evaluating)
# post evaluation
after_evaluation = self.new(BuiltinConcepts.AFTER_EVALUATION)
return_values = self.process(exec_context, return_values, [after_evaluation])
return_values = core.utils.remove_from_list(return_values, lambda x: x.value == after_evaluation)
return return_values
def parse(self, context, text):
result = []
if log.isEnabledFor(logging.DEBUG):
debug_text = "'" + text + "'" if isinstance(text, str) \
else "'" + BaseParser.get_text_from_tokens(text) + "' as tokens"
log.debug(f"Parsing {debug_text}")
for parser in self.parsers:
p = parser()
res = p.parse(context, text)
if isinstance(res, list):
result.extend(res)
else:
result.append(res)
return result
def process(self, context, return_values, initial_concepts=None):
log.debug(f"Processing parsing result. context concept={initial_concepts}")
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
# adds contextual concepts
if initial_concepts:
for concept in initial_concepts:
return_values.append(self.ret(context.who, True, concept))
# group the evaluators by priority and sort them
# The first one to be applied will be the one with the highest priority
grouped_evaluators = {}
for item in [e() for e in self.evaluators]:
grouped_evaluators.setdefault(item.priority, []).append(item)
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
# process
while True:
simple_digest = return_values[:] # set(id(r) for r in return_values)
for priority in sorted_priorities:
# log.debug("Processing priority " + str(priority))
# for item in return_values:
# log.debug(item)
original_items = return_values[:]
evaluated_items = []
to_delete = []
for evaluator in grouped_evaluators[priority]:
# process evaluators that work on return value
if isinstance(evaluator, OneReturnValueEvaluator):
for item in original_items:
if evaluator.matches(context, item):
result = evaluator.eval(context, item)
if result is None:
continue
elif isinstance(result, list):
evaluated_items.extend(result)
to_delete.append(item)
elif isinstance(result, ReturnValueConcept):
evaluated_items.append(result)
to_delete.append(item)
else:
error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
evaluator=evaluator)
evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item]))
to_delete.append(item)
# process evaluators that work on all return values
else:
if evaluator.matches(context, original_items):
results = evaluator.eval(context, original_items)
if results is None:
continue
if not isinstance(results, list):
results = [results]
for result in results:
evaluated_items.append(result)
to_delete.extend(result.parents)
return_values = evaluated_items
return_values.extend([item for item in original_items if item not in to_delete])
# have we done something ?
to_compare = return_values[:] # set(id(r) for r in return_values)
if simple_digest == to_compare:
break
return return_values
def chain_process(self, context, return_values, initial_concepts):
"""
Executes process for all initial contexts
:param context:
:param return_values:
:param initial_concepts:
:return:
"""
for concept in initial_concepts:
if isinstance(concept, BuiltinConcepts):
concept = self.new(BuiltinConcepts)
init = [self.ret(context.who, True, concept)]
return_values = self.process(context, return_values, [init])
return_values = core.utils.remove_from_list(return_values, lambda x: x.value == init)
return return_values
def create_new_concept(self, context, concept):
"""
Adds a new concept to the system
:param context:
:param concept: DefConceptNode
:return: digest of the new concept
"""
concept.init_key()
# checks for duplicate concepts
if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()):
error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept)
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
# set id before saving in db
self.set_id_if_needed(concept, False)
# save the new context in sdp
try:
self.sdp.add(context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True)
except SheerkaDataProviderDuplicateKeyError as error:
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
# add in cache for quick further reference
self.concepts_cache[concept.key] = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
# process the return in needed
ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept))
return ret
def add_codes_to_concept(self, context, concept):
"""
Updates the codes of the newly created concept
Basically, it runs the parsers on all parts
:param concept:
:param context:
:return:
"""
for part_key in ConceptParts:
source = getattr(concept, part_key.value)
if source is None or not isinstance(source, str) or source == "":
# the only sources that I am sure to parse are strings
# I refuse empty strings for performance, I don't want to handle useless NOPConcepts
continue
else:
concept.codes[part_key] = self.parse(context, source)
for prop in concept.props:
concept.codes[prop] = self.parse(context, concept.props[prop].value)
# updates the code of the reference when possible
if concept.key in self.concepts_cache:
entry = self.concepts_cache[concept.key]
if isinstance(entry, list):
# TODO : manage when there are multiple entries
pass
else:
self.concepts_cache[concept.key].codes = concept.codes
def eval_concept(self, context, concept, properties_to_eval=None):
if len(concept.codes) == 0:
self.add_codes_to_concept(context, concept)
if properties_to_eval is None:
properties_to_eval = ["where", "pre", "post", "body", "props"]
for prop in properties_to_eval:
if prop == "props":
pass
else:
part_key = ConceptParts(prop)
if concept.codes[part_key] is None:
continue
res = self.chain_process(context, concept.codes[part_key], concept_evaluation_steps)
res = core.builtin_helpers.expect_one(context, res)
setattr(concept, prop, res.value)
def add_in_cache(self, concept):
"""
Adds a concept template in cache.
The cache is used as a proxy before looking at sdp
:param concept:
:return:
"""
# sanity check
if concept.key is None:
concept.init_key()
if concept.key is None:
raise KeyError()
self.concepts_cache[concept.key] = concept
return concept
def get(self, concept_key):
"""
Tries to find a concept
What is return must be used a template for another concept.
You must not modify the returned concept
:param concept_key:
:return:
"""
if isinstance(concept_key, BuiltinConcepts):
concept_key = str(concept_key)
# first search in cache
if concept_key in self.concepts_cache:
return self.concepts_cache[concept_key]
# else look in sdp
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
if from_db is not None:
return from_db
# else return new Unknown concept
# Note that I don't call the new() method, as it use get() -> cyclic call
unknown_concept = Concept()
template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)]
unknown_concept.update_from(template)
unknown_concept.body = concept_key
return unknown_concept
def new(self, concept_key, **kwargs):
"""
Returns an instance of a new concept
When the concept is supposed to be unique, returns the same instance
:param concept_key:
:param kwargs:
:return:
"""
template = self.get(concept_key)
def new_from_template(t, k, **kwargs_):
# manage singleton
if t.is_unique:
return t
# otherwise, create another instance
concept = self.builtin_cache[k]() if k in self.builtin_cache else Concept()
concept.update_from(t)
# update the properties
for k, v in kwargs_.items():
if k in concept.props:
concept.set_prop(k, v)
elif hasattr(concept, k):
setattr(concept, k, v)
else:
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
# TODO : add the concept to the list of known concepts (self.instances)
return concept
# manage concept not found
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
return template
if not isinstance(template, list):
return new_from_template(template, concept_key, **kwargs)
# if template is a list, it means that there a multiple concepts under the same key
concepts = [new_from_template(t, concept_key, **kwargs) for t in template]
return self.new(BuiltinConcepts.ENUMERATION, body=concepts)
def ret(self, who, status, value, message=None, parents=None):
"""
Creates and returns a ReturnValue concept
:param who:
:param status:
:param value:
:param message:
:param parents:
:return:
"""
return self.new(
BuiltinConcepts.RETURN_VALUE,
who=who,
status=status,
value=value,
message=message,
parents=parents)
def value(self, obj, allow_none_body=False):
if obj is None:
return None
if not isinstance(obj, Concept):
return obj
if hasattr(obj, "get_value"):
return obj.get_value()
if obj.body is not None:
return obj.body
return obj if allow_none_body else self.new(BuiltinConcepts.CANNOT_RESOLVE_VALUE_ERROR, body=obj)
def values(self, objs):
if not (isinstance(objs, list) or
self.isinstance(objs, BuiltinConcepts.LIST) or
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
objs = [objs]
return (self.value(obj) for obj in objs)
def isinstance(self, a, b):
"""
return true if the concept a is an instance of the concept b
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
# TODO : manage when a is the list of all possible b
# for example, if a is a color, it will be found the entry 'All_Colors'
return a.key == b_key
def isa(self, a, b):
"""
return true if the concept a is a b
Will handle when the keyword isa will be implemented
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
# TODO : manage when a is the list of all possible b
# for example, if a is a color, it will be found the entry 'All_Colors'
return a.key == b_key
def get_evaluator_name(self, name):
if self.evaluators_prefix is None:
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
self.evaluators_prefix = base_evaluator_class.PREFIX
return self.evaluators_prefix + name
def get_parser_name(self, name):
if self.parsers_prefix is None:
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
self.parsers_prefix = base_parser_class.PREFIX
return self.parsers_prefix + name
def concepts(self):
res = []
lst = self.sdp.list(self.CONCEPTS_ENTRY)
for item in lst:
if isinstance(item, list):
res.extend(item)
else:
res.append(item)
return sorted(res, key=lambda i: int(i.id))
@staticmethod
def get_builtins_classes_as_dict():
res = {}
for c in core.utils.get_classes("core.builtin_concepts"):
if issubclass(c, Concept) and c != Concept:
res[c().key] = c
return res
@staticmethod
def get_builtin_parsers():
res = []
# modules = core.utils.get_module("parsers")
# for m in modules:
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
for c in core.utils.get_classes_recursive("parsers"):
# if issubclass(c, base_class) and c != base_class:
res.append(c)
return res
@staticmethod
def test():
return "I have access to Sheerka !"
@dataclass
class ExecutionContext:
"""
To keep track of the execution of a request
"""
who: object # who is asking
event_digest: str # what was the (original) trigger
sheerka: Sheerka # sheerka
desc: str = None # human description of what is going on
obj: Concept = None # what is the subject of the execution context (if known)
def push(self, who, desc=None, obj=None):
return ExecutionContext(who, self.event_digest, self.sheerka, desc=desc, obj=obj)