836 lines
31 KiB
Python
836 lines
31 KiB
Python
from dataclasses import dataclass, field
|
|
|
|
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept
|
|
from core.concept import Concept, ConceptParts, PROPERTIES_FOR_DIGEST
|
|
from parsers.BaseParser import BaseParser
|
|
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError
|
|
import core.utils
|
|
import core.builtin_helpers
|
|
|
|
from core.sheerka_logger import console_handler, get_logger
|
|
|
|
import logging
|
|
|
|
concept_evaluation_steps = [BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION]
|
|
CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser"
|
|
DEBUG_TAB_SIZE = 4
|
|
|
|
|
|
class Sheerka(Concept):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts
|
|
CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
|
|
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
|
|
|
|
def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None):
|
|
self.init_logging(debug, loggers)
|
|
|
|
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
|
|
self.log.debug("Starting Sheerka.")
|
|
|
|
# cache of the most used concepts
|
|
# Note that these are only templates
|
|
# They are used as a footprint for instantiation
|
|
# Except of source when the concept is supposed to be unique
|
|
# key is the key of the concept (not the name or the id)
|
|
self.concepts_cache = {}
|
|
|
|
# cache for concept definitions,
|
|
# Primarily used for unit test that does not have access to sdp
|
|
self.concepts_definition_cache = {}
|
|
|
|
#
|
|
# cache for concepts grammars
|
|
# a grammar is a resolved BNF
|
|
self.concepts_grammars = {}
|
|
|
|
# a concept can be instantiated
|
|
# ex: File is a concept, but File('foo.txt') is an instance
|
|
# TODO: manage contexts
|
|
self.instances = []
|
|
|
|
# List of the known rules by the system
|
|
# ex: hello => say('hello')
|
|
self.rules = []
|
|
|
|
self.sdp: SheerkaDataProvider = None # SheerkaDataProvider
|
|
self.builtin_cache = {} # cache for builtin concepts
|
|
self.parsers = {} # cache for builtin parsers
|
|
self.evaluators = [] # cache for builtin evaluators
|
|
|
|
self.evaluators_prefix: str = None
|
|
self.parsers_prefix: str = None
|
|
|
|
self.skip_builtins_in_db = skip_builtins_in_db
|
|
|
|
def initialize(self, root_folder: str = None):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param root_folder: root configuration folder
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
try:
|
|
self.sdp = SheerkaDataProvider(root_folder)
|
|
if self.sdp.first_time:
|
|
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
|
|
|
|
evt_digest = self.sdp.save_event(Event("Initializing Sheerka."))
|
|
exec_context = ExecutionContext(self.key, evt_digest, self)
|
|
|
|
self.initialize_builtin_concepts()
|
|
self.initialize_builtin_parsers()
|
|
self.initialize_builtin_evaluators()
|
|
self.initialize_concepts_definitions(exec_context)
|
|
|
|
except IOError as e:
|
|
return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
|
|
|
|
return ReturnValueConcept(self, True, self)
|
|
|
|
def initialize_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
self.init_log.debug("Initializing builtin concepts")
|
|
builtins_classes = self.get_builtins_classes_as_dict()
|
|
|
|
# this all initialization of the builtins seems to be little bit complicated
|
|
# why do we need to update it from DB ?
|
|
for key in BuiltinConcepts:
|
|
concept = self if key == BuiltinConcepts.SHEERKA \
|
|
else builtins_classes[str(key)]() if str(key) in builtins_classes \
|
|
else Concept(key, True, False, key)
|
|
|
|
if not concept.metadata.is_unique and str(key) in builtins_classes:
|
|
self.builtin_cache[key] = builtins_classes[str(key)]
|
|
|
|
if not self.skip_builtins_in_db:
|
|
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key)
|
|
if from_db is None:
|
|
self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.")
|
|
self.set_id_if_needed(concept, True)
|
|
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
else:
|
|
self.init_log.debug(f"Found concept '{from_db}' in db. Updating.")
|
|
concept.update_from(from_db)
|
|
|
|
self.add_in_cache(concept)
|
|
|
|
def initialize_builtin_parsers(self):
|
|
"""
|
|
Init the parsers
|
|
:return:
|
|
"""
|
|
core.utils.init_package_import("parsers")
|
|
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
for parser in core.utils.get_sub_classes("parsers", base_class):
|
|
if parser.__module__ == base_class.__module__:
|
|
continue
|
|
|
|
self.init_log.debug(f"Adding builtin parser '{parser.__name__}'")
|
|
self.parsers[core.utils.get_full_qualified_name(parser)] = parser
|
|
|
|
def initialize_builtin_evaluators(self):
|
|
"""
|
|
Init the evaluators
|
|
:return:
|
|
"""
|
|
core.utils.init_package_import("evaluators")
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
def initialize_concepts_definitions(self, execution_context):
|
|
self.init_log.debug("Initializing concepts definitions")
|
|
definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False)
|
|
|
|
if definitions is None:
|
|
self.init_log.debug("No BNF defined")
|
|
return
|
|
|
|
lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
|
|
ret_val = lexer_parser.initialize(execution_context, definitions)
|
|
if not ret_val.status:
|
|
self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body))
|
|
return
|
|
|
|
self.concepts_grammars = lexer_parser.concepts_grammars
|
|
|
|
def evaluate_user_input(self, text: str, user_name="kodjo"):
|
|
"""
|
|
Note to KSI: If you try to add execution context to this function,
|
|
You may end in an infinite loop
|
|
:param text:
|
|
:param user_name:
|
|
:return:
|
|
"""
|
|
self.log.debug(f"Processing user input '{text}', {user_name=}.")
|
|
evt_digest = self.sdp.save_event(Event(text, user_name))
|
|
self.log.debug(f"{evt_digest=}")
|
|
execution_context = ExecutionContext(self.key, evt_digest, self)
|
|
|
|
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
|
|
steps = [
|
|
BuiltinConcepts.BEFORE_PARSING,
|
|
BuiltinConcepts.PARSING,
|
|
BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION
|
|
]
|
|
|
|
return self.execute(execution_context, user_input, steps)
|
|
|
|
def _call_parsers(self, execution_context, return_values, logger=None):
|
|
|
|
result = []
|
|
# return_values must be a list
|
|
if not isinstance(return_values, list):
|
|
return_values = [return_values]
|
|
|
|
for return_value in return_values:
|
|
# make sure we only parse user input
|
|
if not return_value.status or not self.isinstance(return_value.body, BuiltinConcepts.USER_INPUT):
|
|
continue
|
|
|
|
to_parse = self.value(return_value)
|
|
|
|
if self.log.isEnabledFor(logging.DEBUG):
|
|
debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \
|
|
else "'" + BaseParser.get_text_from_tokens(to_parse) + "' as tokens"
|
|
execution_context.log(logger or self.log, f"Parsing {debug_text}")
|
|
|
|
for parser in self.parsers.values():
|
|
p = parser(sheerka=self)
|
|
if logger:
|
|
p.log = logger
|
|
res = p.parse(execution_context, to_parse)
|
|
|
|
if hasattr(res, "__iter__"):
|
|
for r in res:
|
|
r.parents = [return_value]
|
|
result.append(r)
|
|
else:
|
|
res.parents = [return_value]
|
|
result.append(res)
|
|
|
|
return result
|
|
|
|
def _call_evaluators(self, execution_context, return_values, process_step, evaluation_context=None, logger=None):
|
|
|
|
# return_values must be a list
|
|
if not isinstance(return_values, list):
|
|
return_values = [return_values]
|
|
|
|
# Evaluation context are contexts that may modify the behaviour of the execution
|
|
# For example, a concept to indicate that the value is not wanted
|
|
# Or a concept to indicate that we want the letter form of the response
|
|
# But first, they need to be transformed into return values
|
|
if evaluation_context is None:
|
|
evaluation_return_values = []
|
|
else:
|
|
evaluation_return_values = [self.ret(execution_context.who, True, c) for c in evaluation_context]
|
|
|
|
# add the current step as part as the evaluation context
|
|
evaluation_return_values.append(self.ret(execution_context.who, True, self.new(process_step)))
|
|
|
|
# the pool of return values are the mix
|
|
return_values.extend(evaluation_return_values)
|
|
|
|
# group the evaluators by priority and sort them
|
|
# The first one to be applied will be the one with the highest priority
|
|
grouped_evaluators = {}
|
|
for evaluator in [e() for e in self.evaluators if e.enabled]:
|
|
if logger:
|
|
evaluator.log = logger
|
|
grouped_evaluators.setdefault(evaluator.priority, []).append(evaluator)
|
|
|
|
# order the groups by priority, the higher first
|
|
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
|
|
|
|
# process
|
|
while True:
|
|
simple_digest = return_values[:]
|
|
|
|
for priority in sorted_priorities:
|
|
|
|
original_items = return_values[:]
|
|
evaluated_items = []
|
|
to_delete = []
|
|
for evaluator in grouped_evaluators[priority]:
|
|
|
|
# process evaluators that work on return value
|
|
from evaluators.BaseEvaluator import OneReturnValueEvaluator
|
|
if isinstance(evaluator, OneReturnValueEvaluator):
|
|
for item in original_items:
|
|
if evaluator.matches(execution_context, item):
|
|
result = evaluator.eval(execution_context, item)
|
|
if result is None:
|
|
continue
|
|
elif isinstance(result, list):
|
|
evaluated_items.extend(result)
|
|
to_delete.append(item)
|
|
elif isinstance(result, ReturnValueConcept):
|
|
evaluated_items.append(result)
|
|
to_delete.append(item)
|
|
else:
|
|
error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
|
|
evaluator=evaluator)
|
|
evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item]))
|
|
to_delete.append(item)
|
|
|
|
# process evaluators that work on all return values
|
|
else:
|
|
if evaluator.matches(execution_context, original_items):
|
|
results = evaluator.eval(execution_context, original_items)
|
|
if results is None:
|
|
continue
|
|
if not isinstance(results, list):
|
|
results = [results]
|
|
for result in results:
|
|
evaluated_items.append(result)
|
|
to_delete.extend(result.parents)
|
|
|
|
return_values = evaluated_items
|
|
return_values.extend([item for item in original_items if item not in to_delete])
|
|
|
|
# have we done something ?
|
|
to_compare = return_values[:]
|
|
if simple_digest == to_compare:
|
|
break
|
|
|
|
# inc the iteration and continue
|
|
execution_context = execution_context.push(iteration=execution_context.iteration + 1)
|
|
|
|
# remove all evaluation context that are not reduced
|
|
return_values = core.utils.remove_list_from_list(return_values, evaluation_return_values)
|
|
return return_values
|
|
|
|
def execute(self, execution_context, return_values, execution_steps, logger=None):
|
|
"""
|
|
Executes process for all initial contexts
|
|
:param execution_context:
|
|
:param return_values:
|
|
:param execution_steps:
|
|
:param logger: logger to use (if not directly called by sheerka)
|
|
:return:
|
|
"""
|
|
|
|
for step in execution_steps:
|
|
sub_context = execution_context.push(step=step)
|
|
sub_context.log(logger or self.log, f"{step=}, context='{sub_context}'")
|
|
if step == BuiltinConcepts.PARSING:
|
|
return_values = self._call_parsers(sub_context, return_values, logger)
|
|
else:
|
|
return_values = self._call_evaluators(sub_context, return_values, step, None, logger)
|
|
|
|
sub_context.log_result(logger or self.log, return_values)
|
|
|
|
return return_values
|
|
|
|
def set_id_if_needed(self, obj: Concept, is_builtin: bool):
|
|
"""
|
|
Set the key for the concept if needed
|
|
:param obj:
|
|
:param is_builtin:
|
|
:return:
|
|
"""
|
|
if obj.metadata.id is not None:
|
|
return
|
|
obj.metadata.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
|
|
self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.")
|
|
|
|
def create_new_concept(self, context, concept: Concept):
|
|
"""
|
|
Adds a new concept to the system
|
|
:param context:
|
|
:param concept: DefConceptNode
|
|
:return: digest of the new concept
|
|
"""
|
|
|
|
concept.init_key()
|
|
concepts_definitions = None
|
|
init_ret_value = None
|
|
|
|
# checks for duplicate concepts
|
|
# TODO checks if it exists in cache first
|
|
if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()):
|
|
error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept)
|
|
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
|
|
|
|
# set id before saving in db
|
|
self.set_id_if_needed(concept, False)
|
|
|
|
# add the BNF if known
|
|
if concept.bnf:
|
|
concepts_definitions = self.get_concept_definition()
|
|
concepts_definitions[concept] = concept.bnf
|
|
|
|
# check if it's a valid BNF or whether it breaks the known rules
|
|
concept_lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
|
|
sub_context = context.push(self.name, desc=f"Initializing concept definition for {concept}")
|
|
sub_context.concepts[concept.key] = concept # the concept is not in the real cache yet
|
|
init_ret_value = concept_lexer_parser.initialize(sub_context, concepts_definitions)
|
|
if not init_ret_value.status:
|
|
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(init_ret_value.value))
|
|
|
|
# save the new context in sdp
|
|
try:
|
|
self.sdp.add(context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
if concepts_definitions is not None:
|
|
self.sdp.set(context.event_digest, self.CONCEPTS_DEFINITIONS_ENTRY, concepts_definitions, use_ref=True)
|
|
except SheerkaDataProviderDuplicateKeyError as error:
|
|
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
|
|
|
|
# Updates the caches
|
|
self.concepts_cache[concept.key] = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
|
|
if init_ret_value is not None and init_ret_value.status:
|
|
self.concepts_grammars = init_ret_value.body
|
|
|
|
# process the return in needed
|
|
ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept))
|
|
return ret
|
|
|
|
def initialize_concept_asts(self, context, concept: Concept, logger=None):
|
|
"""
|
|
Updates the codes of the newly created concept
|
|
Basically, it runs the parsers on all parts
|
|
:param concept:
|
|
:param context:
|
|
:param logger:
|
|
:return:
|
|
"""
|
|
# steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
|
|
steps = [BuiltinConcepts.PARSING]
|
|
for part_key in ConceptParts:
|
|
source = getattr(concept.metadata, part_key.value)
|
|
if source is None or not isinstance(source, str) or source == "":
|
|
# the only sources that I am sure to parse are strings
|
|
# I refuse empty strings for performance, I don't want to handle useless NOPConcepts
|
|
continue
|
|
else:
|
|
to_parse = self.ret(context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=source))
|
|
concept.cached_asts[part_key] = self.execute(context, to_parse, steps, logger)
|
|
|
|
for prop in concept.props:
|
|
to_parse = self.ret(context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=concept.props[prop].value))
|
|
concept.cached_asts[prop] = self.execute(context, to_parse, steps)
|
|
|
|
# updates the code of the reference when possible
|
|
if concept.key in self.concepts_cache:
|
|
entry = self.concepts_cache[concept.key]
|
|
if isinstance(entry, list):
|
|
# TODO : manage when there are multiple entries
|
|
pass
|
|
else:
|
|
self.concepts_cache[concept.key].cached_asts = concept.cached_asts
|
|
|
|
def eval_concept(self, context, concept: Concept, properties_to_eval=None, logger=None):
|
|
"""
|
|
Evaluation a concept
|
|
It means that if the where clause is True, will evaluate the body
|
|
:param context:
|
|
:param concept:
|
|
:param properties_to_eval:
|
|
:param logger:
|
|
:return:
|
|
"""
|
|
if len(concept.cached_asts) == 0:
|
|
self.initialize_concept_asts(context, concept, logger)
|
|
|
|
if properties_to_eval is None:
|
|
properties_to_eval = ["where", "pre", "post", "body", "props"]
|
|
|
|
for prop in properties_to_eval:
|
|
if prop == "props":
|
|
pass
|
|
else:
|
|
part_key = ConceptParts(prop)
|
|
if concept.cached_asts[part_key] is None:
|
|
continue
|
|
res = self.execute(context, concept.cached_asts[part_key], concept_evaluation_steps, logger)
|
|
res = core.builtin_helpers.expect_one(context, res)
|
|
setattr(concept.metadata, prop, res.value)
|
|
|
|
def add_in_cache(self, concept: Concept):
|
|
"""
|
|
Adds a concept template in cache.
|
|
The cache is used as a proxy before looking at sdp
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
# sanity check
|
|
if concept.key is None:
|
|
concept.init_key()
|
|
|
|
if concept.key is None:
|
|
raise KeyError()
|
|
|
|
self.concepts_cache[concept.key] = concept
|
|
return concept
|
|
|
|
def get(self, concept_key):
|
|
"""
|
|
Tries to find a concept
|
|
What is return must be used a template for another concept.
|
|
You must not modify the returned concept
|
|
:param concept_key:
|
|
:return:
|
|
"""
|
|
|
|
if isinstance(concept_key, BuiltinConcepts):
|
|
concept_key = str(concept_key)
|
|
|
|
# first search in cache
|
|
if concept_key in self.concepts_cache:
|
|
return self.concepts_cache[concept_key]
|
|
|
|
# else look in sdp
|
|
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
|
|
if from_db is not None:
|
|
return from_db
|
|
|
|
# else return new Unknown concept
|
|
# Note that I don't call the new() method, as it use get() -> cyclic call
|
|
unknown_concept = Concept()
|
|
template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)]
|
|
unknown_concept.update_from(template)
|
|
unknown_concept.metadata.body = concept_key
|
|
return unknown_concept
|
|
|
|
def new(self, concept_key, **kwargs):
|
|
"""
|
|
Returns an instance of a new concept
|
|
When the concept is supposed to be unique, returns the same instance
|
|
:param concept_key:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
template = self.get(concept_key)
|
|
|
|
def new_from_template(t, k, **kwargs_):
|
|
# manage singleton
|
|
if t.metadata.is_unique:
|
|
return t
|
|
|
|
# otherwise, create another instance
|
|
concept = self.builtin_cache[k]() if k in self.builtin_cache else Concept()
|
|
concept.update_from(t)
|
|
|
|
# update the properties
|
|
for k, v in kwargs_.items():
|
|
if k in concept.props:
|
|
concept.set_prop(k, v)
|
|
elif k in PROPERTIES_FOR_DIGEST:
|
|
setattr(concept.metadata, k, v)
|
|
elif hasattr(concept, k):
|
|
setattr(concept, k, v)
|
|
else:
|
|
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
|
|
|
|
# TODO : add the concept to the list of known concepts (self.instances)
|
|
return concept
|
|
|
|
# manage concept not found
|
|
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
|
|
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
|
|
return template
|
|
|
|
if not isinstance(template, list):
|
|
return new_from_template(template, concept_key, **kwargs)
|
|
|
|
# if template is a list, it means that there a multiple concepts under the same key
|
|
concepts = [new_from_template(t, concept_key, **kwargs) for t in template]
|
|
return self.new(BuiltinConcepts.ENUMERATION, body=concepts)
|
|
|
|
def ret(self, who: str, status: bool, value, message=None, parents=None):
|
|
"""
|
|
Creates and returns a ReturnValue concept
|
|
:param who:
|
|
:param status:
|
|
:param value:
|
|
:param message:
|
|
:param parents:
|
|
:return:
|
|
"""
|
|
return self.new(
|
|
BuiltinConcepts.RETURN_VALUE,
|
|
who=who,
|
|
status=status,
|
|
value=value,
|
|
message=message,
|
|
parents=parents)
|
|
|
|
def value(self, obj, allow_none_body=False):
|
|
if obj is None:
|
|
return None
|
|
|
|
if self.isinstance(obj, BuiltinConcepts.RETURN_VALUE) and \
|
|
obj.status and \
|
|
self.isinstance(obj.value, BuiltinConcepts.USER_INPUT):
|
|
return obj.value.text
|
|
|
|
if not isinstance(obj, Concept):
|
|
return obj
|
|
|
|
if hasattr(obj, "get_value"):
|
|
return obj.get_value()
|
|
|
|
if obj.body is not None:
|
|
return obj.body
|
|
|
|
return obj if allow_none_body else self.new(BuiltinConcepts.CANNOT_RESOLVE_VALUE_ERROR, body=obj)
|
|
|
|
def values(self, objs):
|
|
if not (isinstance(objs, list) or
|
|
self.isinstance(objs, BuiltinConcepts.LIST) or
|
|
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
|
|
objs = [objs]
|
|
|
|
return (self.value(obj) for obj in objs)
|
|
|
|
def is_success(self, obj):
|
|
if isinstance(obj, bool):
|
|
return obj
|
|
|
|
if self.isinstance(obj, BuiltinConcepts.RETURN_VALUE):
|
|
return obj.status
|
|
|
|
if self.isinstance(obj, BuiltinConcepts.ERROR):
|
|
return False
|
|
|
|
return False
|
|
|
|
def isinstance(self, a, b):
|
|
"""
|
|
return true if the concept a is an instance of the concept b
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
|
|
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
|
|
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
b_key = b.key if isinstance(b, Concept) else str(b)
|
|
|
|
# TODO : manage when a is the list of all possible b
|
|
# for example, if a is a color, it will be found the entry 'All_Colors'
|
|
return a.key == b_key
|
|
|
|
def isa(self, a, b):
|
|
"""
|
|
return true if the concept a is a b
|
|
Will handle when the keyword isa will be implemented
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
|
|
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
|
|
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
b_key = b.key if isinstance(b, Concept) else str(b)
|
|
|
|
# TODO : manage when a is the list of all possible b
|
|
# for example, if a is a color, it will be found the entry 'All_Colors'
|
|
return a.key == b_key
|
|
|
|
def get_evaluator_name(self, name):
|
|
if self.evaluators_prefix is None:
|
|
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
|
|
self.evaluators_prefix = base_evaluator_class.PREFIX
|
|
|
|
return self.evaluators_prefix + name
|
|
|
|
def get_parser_name(self, name):
|
|
if self.parsers_prefix is None:
|
|
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
self.parsers_prefix = base_parser_class.PREFIX
|
|
|
|
return self.parsers_prefix + name
|
|
|
|
def get_concept_definition(self):
|
|
if self.concepts_definition_cache:
|
|
return self.concepts_definition_cache
|
|
|
|
self.concepts_definition_cache = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) or {}
|
|
return self.concepts_definition_cache
|
|
|
|
def concepts(self):
|
|
res = []
|
|
lst = self.sdp.list(self.CONCEPTS_ENTRY)
|
|
for item in lst:
|
|
if isinstance(item, list):
|
|
res.extend(item)
|
|
else:
|
|
res.append(item)
|
|
|
|
return sorted(res, key=lambda i: int(i.id))
|
|
|
|
def test(self):
|
|
return f"I have access to Sheerka !"
|
|
|
|
def test_error(self):
|
|
raise Exception("I can raise an error")
|
|
|
|
def dump_concepts(self):
|
|
lst = self.sdp.list(self.CONCEPTS_ENTRY)
|
|
for item in lst:
|
|
if hasattr(item, "__iter__"):
|
|
for i in item:
|
|
self.log.info(i)
|
|
else:
|
|
self.log.info(item)
|
|
|
|
def dump_definitions(self):
|
|
defs = self.sdp.get(self.CONCEPTS_DEFINITIONS_ENTRY)
|
|
self.log.info(defs)
|
|
|
|
@staticmethod
|
|
def get_builtins_classes_as_dict():
|
|
res = {}
|
|
for c in core.utils.get_classes("core.builtin_concepts"):
|
|
if issubclass(c, Concept) and c != Concept:
|
|
res[c().metadata.key] = c
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def init_logging(debug, loggers):
|
|
core.sheerka_logger.set_enabled(loggers)
|
|
if debug:
|
|
# log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
log_format = "%(asctime)s [%(levelname)s] %(message)s"
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_format = "%(message)s"
|
|
log_level = logging.INFO
|
|
|
|
# logging.root.setLevel(log_level)
|
|
# fmt = logging.Formatter(log_format, None, "%")
|
|
# console_handler.setFormatter(fmt)
|
|
|
|
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
|
|
|
|
|
|
@dataclass
|
|
class ExecutionContext:
|
|
"""
|
|
To keep track of the execution of a request
|
|
"""
|
|
|
|
def __init__(self,
|
|
who,
|
|
event_digest: str,
|
|
sheerka: Sheerka,
|
|
/,
|
|
desc: str = None,
|
|
obj: Concept = None,
|
|
step: BuiltinConcepts = None,
|
|
iteration: int = 0,
|
|
concepts: dict = None):
|
|
|
|
self.who = who # who is asking
|
|
self.event_digest = event_digest # what was the (original) trigger
|
|
self.sheerka = sheerka # sheerka
|
|
|
|
self.step = step
|
|
self.iteration = iteration
|
|
|
|
self.desc = desc # human description of what is going on
|
|
self.obj = obj # what is the subject of the execution context (if known)
|
|
|
|
self.concepts = concepts or {}
|
|
|
|
self._id = ExecutionContextIdManager.get_id(event_digest)
|
|
self._tab = ""
|
|
|
|
@property
|
|
def id(self):
|
|
return self._id
|
|
|
|
def push(self, who=None, /, **kwargs):
|
|
who = who or self.who
|
|
desc = kwargs.get("desc", "")
|
|
obj = kwargs.get("obj", self.obj)
|
|
concepts = kwargs.get("concepts", self.concepts)
|
|
step = kwargs.get("step", self.step)
|
|
iteration = kwargs.get("iteration", self.iteration)
|
|
new = ExecutionContext(
|
|
who,
|
|
self.event_digest,
|
|
self.sheerka,
|
|
desc=desc,
|
|
obj=obj,
|
|
concepts=concepts,
|
|
step=step,
|
|
iteration=iteration,
|
|
)
|
|
new._tab = self._tab + " " * DEBUG_TAB_SIZE
|
|
return new
|
|
|
|
def log_new(self, logger):
|
|
logger.debug(f"[{self._id:2}]" + self._tab + str(self))
|
|
|
|
def log(self, logger, message, who=None):
|
|
logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
|
|
|
|
def log_error(self, logger, message, who=None):
|
|
logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
|
|
|
|
def log_result(self, logger, return_values):
|
|
if not logger.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
if len(return_values) == 0:
|
|
logger.debug(self._tab + "No return value")
|
|
|
|
for r in return_values:
|
|
to_str = self.return_value_to_str(r)
|
|
logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str)
|
|
|
|
@staticmethod
|
|
def return_value_to_str(r):
|
|
value = str(r.value)
|
|
if len(value) > 50:
|
|
value = value[:47] + "..."
|
|
to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})"
|
|
return to_str
|
|
|
|
def __repr__(self):
|
|
msg = f"ExecutionContext(who={self.who}, id={self._id}"
|
|
if self.desc:
|
|
msg += f", desc='{self.desc}'"
|
|
msg += ")"
|
|
return msg
|
|
|
|
|
|
class ExecutionContextIdManager:
|
|
ids = {}
|
|
|
|
@staticmethod
|
|
def get_id(event_digest):
|
|
if event_digest in ExecutionContextIdManager.ids:
|
|
ExecutionContextIdManager.ids[event_digest] += 1
|
|
else:
|
|
ExecutionContextIdManager.ids[event_digest] = 0
|
|
return ExecutionContextIdManager.ids[event_digest]
|