Files
Sheerka-Old/core/sheerka.py
T

994 lines
38 KiB
Python

from dataclasses import dataclass, field
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors
from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW
from parsers.BaseParser import BaseParser
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError
import core.utils
import core.builtin_helpers
from core.sheerka_logger import console_handler
import logging
CONCEPT_EVALUATION_STEPS = [
BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION]
CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser"
DEBUG_TAB_SIZE = 4
class Sheerka(Concept):
"""
Main controller for the project
"""
CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts
CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None):
self.init_logging(debug, loggers)
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
self.log.debug("Starting Sheerka.")
# cache of the most used concepts
# Note that these are only templates
# They are used as a footprint for instantiation
# Except of source when the concept is supposed to be unique
# key is the key of the concept (not the name or the id)
self.concepts_cache = {}
# cache for concept definitions,
# Primarily used for unit test that does not have access to sdp
self.concepts_definition_cache = {}
#
# cache for concepts grammars
# a grammar is a resolved BNF
self.concepts_grammars = {}
# a concept can be instantiated
# ex: File is a concept, but File('foo.txt') is an instance
# TODO: manage contexts
self.instances = []
# List of the known rules by the system
# ex: hello => say('hello')
self.rules = []
self.sdp: SheerkaDataProvider = None # SheerkaDataProvider
self.builtin_cache = {} # cache for builtin concepts
self.parsers = {} # cache for builtin parsers
self.evaluators = [] # cache for builtin evaluators
self.evaluators_prefix: str = None
self.parsers_prefix: str = None
self.skip_builtins_in_db = skip_builtins_in_db
def initialize(self, root_folder: str = None):
"""
Starting Sheerka
Loads the current configuration
Notes that when it's the first time, it also create the needed working folders
:param root_folder: root configuration folder
:return: ReturnValue(Success or Error)
"""
try:
self.sdp = SheerkaDataProvider(root_folder)
if self.sdp.first_time:
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
event = Event("Initializing Sheerka.")
self.sdp.save_event(event)
exec_context = ExecutionContext(self.key, event, self)
self.initialize_builtin_concepts()
self.initialize_builtin_parsers()
self.initialize_builtin_evaluators()
self.initialize_concepts_definitions(exec_context)
except IOError as e:
return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
return ReturnValueConcept(self, True, self)
def initialize_builtin_concepts(self):
"""
Initializes the builtin concepts
:return: None
"""
self.init_log.debug("Initializing builtin concepts")
builtins_classes = self.get_builtins_classes_as_dict()
# this all initialization of the builtins seems to be little bit complicated
# why do we need to update it from DB ?
for key in BuiltinConcepts:
concept = self if key == BuiltinConcepts.SHEERKA \
else builtins_classes[str(key)]() if str(key) in builtins_classes \
else Concept(key, True, False, key)
if not concept.metadata.is_unique and str(key) in builtins_classes:
self.builtin_cache[key] = builtins_classes[str(key)]
if not self.skip_builtins_in_db:
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key)
if from_db is None:
self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.")
self.set_id_if_needed(concept, True)
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
else:
self.init_log.debug(f"Found concept '{from_db}' in db. Updating.")
concept.update_from(from_db)
self.add_in_cache(concept)
def initialize_builtin_parsers(self):
"""
Init the parsers
:return:
"""
core.utils.init_package_import("parsers")
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
for parser in core.utils.get_sub_classes("parsers", base_class):
if parser.__module__ == base_class.__module__:
continue
self.init_log.debug(f"Adding builtin parser '{parser.__name__}'")
self.parsers[core.utils.get_full_qualified_name(parser)] = parser
def initialize_builtin_evaluators(self):
"""
Init the evaluators
:return:
"""
core.utils.init_package_import("evaluators")
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
self.evaluators.append(evaluator)
def initialize_concepts_definitions(self, execution_context):
self.init_log.debug("Initializing concepts definitions")
definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False)
if definitions is None:
self.init_log.debug("No BNF defined")
return
lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
ret_val = lexer_parser.initialize(execution_context, definitions)
if not ret_val.status:
self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body))
return
self.concepts_grammars = lexer_parser.concepts_grammars
def evaluate_user_input(self, text: str, user_name="kodjo"):
"""
Note to KSI: If you try to add execution context to this function,
You may end in an infinite loop
:param text:
:param user_name:
:return:
"""
self.log.debug(f"Processing user input '{text}', {user_name=}.")
event = Event(text, user_name)
evt_digest = self.sdp.save_event(event)
self.log.debug(f"{evt_digest=}")
execution_context = ExecutionContext(self.key, event, self)
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED))
steps = [
BuiltinConcepts.BEFORE_PARSING,
BuiltinConcepts.PARSING,
BuiltinConcepts.AFTER_PARSING,
BuiltinConcepts.BEFORE_EVALUATION,
BuiltinConcepts.EVALUATION,
BuiltinConcepts.AFTER_EVALUATION
]
return self.execute(execution_context, [user_input, reduce_requested], steps)
def _call_parsers(self, execution_context, return_values, logger=None):
result = []
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
for return_value in return_values:
# make sure we only parse user input
if not return_value.status or not self.isinstance(return_value.body, BuiltinConcepts.USER_INPUT):
result.append(return_value)
continue
to_parse = self.value(return_value)
if self.log.isEnabledFor(logging.DEBUG):
debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \
else "'" + BaseParser.get_text_from_tokens(to_parse) + "' as tokens"
execution_context.log(logger or self.log, f"Parsing {debug_text}")
for parser in self.parsers.values():
p = parser(sheerka=self)
if logger:
p.log = logger
res = p.parse(execution_context, to_parse)
if hasattr(res, "__iter__"):
for r in res:
r.parents = [return_value]
result.append(r)
else:
res.parents = [return_value]
result.append(res)
return result
def _call_evaluators(self, execution_context, return_values, process_step, evaluation_context=None, logger=None):
def _preprocess_evaluators(context, evaluators):
if not context.preprocess:
return evaluators
if not hasattr(evaluators, "__iter__"):
single_one = True
evaluators = [evaluators]
else:
single_one = False
for preprocess in context.preprocess:
for e in evaluators:
if preprocess.props["name"].value == e.name:
for prop, value in preprocess.props.items():
if prop == "name":
continue
if hasattr(e, prop):
setattr(e, prop, value.value)
return evaluators[0] if single_one else evaluators
# return_values must be a list
if not isinstance(return_values, list):
return_values = [return_values]
# Evaluation context are contexts that may modify the behaviour of the execution
# For example, a concept to indicate that the value is not wanted
# Or a concept to indicate that we want the letter form of the response
# But first, they need to be transformed into return values
if evaluation_context is None:
evaluation_return_values = []
else:
evaluation_return_values = [self.ret(execution_context.who, True, c) for c in evaluation_context]
# add the current step as part as the evaluation context
evaluation_return_values.append(self.ret(execution_context.who, True, self.new(process_step)))
# the pool of return values are the mix
return_values.extend(evaluation_return_values)
# group the evaluators by priority and sort them
# The first one to be applied will be the one with the highest priority
grouped_evaluators = {}
instantiated_evaluators = [e_class() for e_class in self.evaluators]
# pre-process evaluators if needed
instantiated_evaluators = _preprocess_evaluators(execution_context, instantiated_evaluators)
for evaluator in [e for e in instantiated_evaluators if e.enabled and process_step in e.steps]:
if logger:
evaluator.log = logger
grouped_evaluators.setdefault(evaluator.priority, []).append(evaluator)
# order the groups by priority, the higher first
sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True)
# process
while True:
simple_digest = return_values[:]
for priority in sorted_priorities:
original_items = return_values[:]
evaluated_items = []
to_delete = []
for evaluator in grouped_evaluators[priority]:
evaluator = _preprocess_evaluators(execution_context, evaluator.__class__()) # fresh copy
# process evaluators that work on return value
from evaluators.BaseEvaluator import OneReturnValueEvaluator
if isinstance(evaluator, OneReturnValueEvaluator):
for item in original_items:
if evaluator.matches(execution_context, item):
result = evaluator.eval(execution_context, item)
if result is None:
continue
elif isinstance(result, list):
evaluated_items.extend(result)
to_delete.append(item)
elif isinstance(result, ReturnValueConcept):
evaluated_items.append(result)
to_delete.append(item)
else:
error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result,
evaluator=evaluator)
evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item]))
to_delete.append(item)
# process evaluators that work on all return values
else:
if evaluator.matches(execution_context, original_items):
results = evaluator.eval(execution_context, original_items)
if results is None:
continue
if not isinstance(results, list):
results = [results]
for result in results:
evaluated_items.append(result)
to_delete.extend(result.parents)
return_values = evaluated_items
return_values.extend([item for item in original_items if item not in to_delete])
# have we done something ?
to_compare = return_values[:]
if simple_digest == to_compare:
break
# inc the iteration and continue
execution_context = execution_context.push(iteration=execution_context.iteration + 1)
# remove all evaluation context that are not reduced
return_values = core.utils.remove_list_from_list(return_values, evaluation_return_values)
return return_values
def execute(self, execution_context, return_values, execution_steps, logger=None):
"""
Executes process for all initial contexts
:param execution_context:
:param return_values:
:param execution_steps:
:param logger: logger to use (if not directly called by sheerka)
:return:
"""
for step in execution_steps:
sub_context = execution_context.push(step=step)
sub_context.log(logger or self.log, f"{step=}, context='{sub_context}'")
copy = return_values[:] if hasattr(return_values, "__iter__") else [return_values]
if step == BuiltinConcepts.PARSING:
return_values = self._call_parsers(sub_context, return_values, logger)
else:
return_values = self._call_evaluators(sub_context, return_values, step, None, logger)
if copy != return_values:
sub_context.log_result(logger or self.log, return_values)
return return_values
def set_id_if_needed(self, obj: Concept, is_builtin: bool):
"""
Set the key for the concept if needed
:param obj:
:param is_builtin:
:return:
"""
if obj.metadata.id is not None:
return
obj.metadata.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS)
self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.")
def create_new_concept(self, context, concept: Concept, logger=None):
"""
Adds a new concept to the system
:param context:
:param concept: DefConceptNode
:param logger
:return: digest of the new concept
"""
logger = logger or self.log
concept.init_key()
concepts_definitions = None
init_ret_value = None
# checks for duplicate concepts
# TODO checks if it exists in cache first
if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()):
error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept)
return self.ret(
self.create_new_concept.__name__,
False,
self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept),
error.args[0])
# set id before saving in db
self.set_id_if_needed(concept, False)
# add the BNF if known
if concept.bnf:
concepts_definitions = self.get_concept_definition()
concepts_definitions[concept] = concept.bnf
# check if it's a valid BNF or whether it breaks the known rules
concept_lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
sub_context = context.push(self.name, desc=f"Initializing concept definition for {concept}")
sub_context.concepts[concept.key] = concept # the concept is not in the real cache yet
sub_context.log_new(logger)
init_ret_value = concept_lexer_parser.initialize(sub_context, concepts_definitions)
if not init_ret_value.status:
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(init_ret_value.value))
# save the new context in sdp
try:
self.sdp.add(context.event.get_digest(), self.CONCEPTS_ENTRY, concept, use_ref=True)
if concepts_definitions is not None:
self.sdp.set(context.event.get_digest(),
self.CONCEPTS_DEFINITIONS_ENTRY,
concepts_definitions, use_ref=True)
except SheerkaDataProviderDuplicateKeyError as error:
context.log_error(logger, "Failed to create a new concept.", who=self.create_new_concept.__name__)
return self.ret(
self.create_new_concept.__name__,
False,
self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept),
error.args[0])
# Updates the caches
self.concepts_cache[concept.key] = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key)
if init_ret_value is not None and init_ret_value.status:
self.concepts_grammars = init_ret_value.body
# process the return in needed
ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept))
return ret
def add_concept_to_set(self, context, concept, concept_set, logger=None):
"""
Add an entry in sdp to tell that concept isa concept_set
:param context:
:param concept:
:param concept_set:
:param logger:
:return:
"""
logger = logger or self.log
context.log(logger, f"Adding concept {concept} to set {concept_set}", who=self.add_concept_to_set.__name__)
assert concept.id
assert concept_set.id
try:
ret = self.sdp.add_unique(context.event.get_digest(), "All_" + str(concept_set.id), concept.id)
if ret == (None, None): # concept already in set
return self.ret(
self.add_concept_to_set.__name__,
False,
self.new(BuiltinConcepts.CONCEPT_ALREADY_IN_SET, body=concept, concept_set=concept_set))
else:
return self.ret(self.add_concept_to_set.__name__, True, self.new(BuiltinConcepts.SUCCESS))
except Exception as error:
context.log_error(logger, "Failed to add to set.", who=self.add_concept_to_set.__name__)
return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0])
def initialize_concept_asts(self, context, concept: Concept, logger=None):
"""
Updates the codes of the newly created concept
Basically, it runs the parsers on all parts
:param concept:
:param context:
:param logger:
:return:
"""
steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING]
for part_key in ConceptParts:
source = getattr(concept.metadata, part_key.value)
if source is None or not isinstance(source, str) or source == "":
# the only sources that I am sure to parse are strings
# I refuse empty strings for performance matters, I don't want to handle useless NOPConcepts
continue
else:
sub_context = context.push(desc=f"Initializing AST for {part_key}")
sub_context.log_new(logger)
to_parse = self.ret(context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=source))
concept.cached_asts[part_key] = self.execute(sub_context, to_parse, steps, logger)
for prop in concept.props:
if concept.props[prop].value:
to_parse = self.ret(
context.who,
True,
self.new(BuiltinConcepts.USER_INPUT, body=concept.props[prop].value))
sub_context = context.push(desc=f"Initializing AST for property {prop}")
sub_context.log_new(logger)
concept.cached_asts[prop] = self.execute(context, to_parse, steps)
# Updates the cache of concepts when possible
if concept.key in self.concepts_cache:
entry = self.concepts_cache[concept.key]
if isinstance(entry, list):
# TODO : manage when there are multiple entries
pass
else:
self.concepts_cache[concept.key].cached_asts = concept.cached_asts
def evaluate_concept(self, context, concept: Concept, logger=None):
"""
Evaluation a concept
It means that if the where clause is True, will evaluate the body
:param context:
:param concept:
:param logger:
:return: value of the evaluation or error
"""
logger = logger or self.log
if concept.metadata.is_evaluated:
return concept
def _resolve(return_value, desc, obj):
context.log(logger, desc, self.evaluate_concept.__name__)
sub_context = context.push(desc=desc, obj=obj)
sub_context.add_preprocess(self.get_evaluator_name("Concept"), return_body=True)
sub_context.log_new(logger)
r = self.execute(sub_context, return_value, CONCEPT_EVALUATION_STEPS, logger)
return core.builtin_helpers.expect_one(context, r)
# WHERE condition should already be validated by the parser.
# It's a mandatory condition for the concept before it can be recognized
#
# TODO : Validate the PRE condition
#
if len(concept.cached_asts) == 0:
context.log(logger, "concept asts are not initialized. Initializing.", self.evaluate_concept.__name__)
self.initialize_concept_asts(context, concept, logger)
# to make sure of the order, it don't use ConceptParts.get_parts()
# props must be evaluated first
all_metadata_to_eval = ["props", "where", "pre", "post", "body"]
for metadata_to_eval in all_metadata_to_eval:
if metadata_to_eval == "props":
for prop_name in (p for p in concept.props if p in concept.cached_asts):
res = _resolve(concept.cached_asts[prop_name], f"Evaluating property '{prop_name}'", None)
if res.status:
concept.set_prop(prop_name, res.value)
else:
return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR,
body=res.value,
concept=concept,
property_name=prop_name)
else:
part_key = ConceptParts(metadata_to_eval)
if part_key in concept.cached_asts and concept.cached_asts[part_key] is not None:
res = _resolve(concept.cached_asts[part_key], f"Evaluating '{part_key}'", concept)
if res.status:
setattr(concept.metadata, metadata_to_eval, res.value)
else:
return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR,
body=res.value,
concept=concept,
property_name=part_key)
#
# TODO : Validate the POST condition
#
concept.init_key() # only does it if needed
concept.metadata.is_evaluated = True
return concept
def add_in_cache(self, concept: Concept):
"""
Adds a concept template in cache.
The cache is used as a proxy before looking at sdp
:param concept:
:return:
"""
# sanity check
if concept.key is None:
concept.init_key()
if concept.key is None:
raise KeyError()
self.concepts_cache[concept.key] = concept
return concept
def get(self, concept_key):
"""
Tries to find a concept
What is return must be used a template for another concept.
You must not modify the returned concept
:param concept_key:
:return:
"""
if isinstance(concept_key, BuiltinConcepts):
concept_key = str(concept_key)
# first search in cache
if concept_key in self.concepts_cache:
return self.concepts_cache[concept_key]
# else look in sdp
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
if from_db is not None:
return from_db
# else return new Unknown concept
# Note that I don't call the new() method to prevent cyclic call
unknown_concept = Concept()
template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)]
unknown_concept.update_from(template)
unknown_concept.metadata.body = concept_key
return unknown_concept
def new(self, concept_key, **kwargs):
"""
Returns an instance of a new concept
When the concept is supposed to be unique, returns the same instance
:param concept_key:
:param kwargs:
:return:
"""
template = self.get(concept_key)
def new_from_template(t, k, **kwargs_):
# manage singleton
if t.metadata.is_unique:
return t
# otherwise, create another instance
concept = self.builtin_cache[k]() if k in self.builtin_cache else Concept()
concept.update_from(t)
# update the properties
for k, v in kwargs_.items():
if k in concept.props:
concept.set_prop(k, v)
elif k in PROPERTIES_FOR_NEW:
setattr(concept.metadata, k, v)
elif hasattr(concept, k):
setattr(concept, k, v)
else:
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
# TODO : add the concept to the list of known concepts (self.instances)
return concept
# manage concept not found
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
return template
if not isinstance(template, list):
return new_from_template(template, concept_key, **kwargs)
# if template is a list, it means that there a multiple concepts under the same key
concepts = [new_from_template(t, concept_key, **kwargs) for t in template]
return self.new(BuiltinConcepts.ENUMERATION, body=concepts)
def ret(self, who: str, status: bool, value, message=None, parents=None):
"""
Creates and returns a ReturnValue concept
:param who:
:param status:
:param value:
:param message:
:param parents:
:return:
"""
return self.new(
BuiltinConcepts.RETURN_VALUE,
who=who,
status=status,
value=value,
message=message,
parents=parents)
def value(self, obj, reduce_simple_list=False):
if obj is None:
return None
if hasattr(obj, "get_value"):
return obj.get_value()
if not isinstance(obj, Concept):
return obj
if obj.body is None:
return obj
if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1:
body_to_use = obj.body[0]
else:
body_to_use = obj.body
return self.value(body_to_use)
def values(self, objs):
if not (isinstance(objs, list) or
self.isinstance(objs, BuiltinConcepts.LIST) or
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
objs = [objs]
return (self.value(obj) for obj in objs)
def is_success(self, obj):
if isinstance(obj, bool): # quick win
return obj
if isinstance(obj, ReturnValueConcept):
return obj.status
if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors:
return False
return obj
def isinstance(self, a, b):
"""
return true if the concept a is an instance of the concept b
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
# TODO : manage when a is the list of all possible b
# for example, if a is a color, it will be found the entry 'All_Colors'
return a.key == b_key
def isa(self, a, b):
"""
return true if the concept a is a b
Will handle when the keyword isa will be implemented
:param a:
:param b:
:return:
"""
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
if not isinstance(a, Concept):
return False
b_key = b.key if isinstance(b, Concept) else str(b)
# TODO : manage when a is the list of all possible b
# for example, if a is a color, it will be found the entry 'All_Colors'
return a.key == b_key
def get_evaluator_name(self, name):
if self.evaluators_prefix is None:
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
self.evaluators_prefix = base_evaluator_class.PREFIX
return self.evaluators_prefix + name
def get_parser_name(self, name):
if self.parsers_prefix is None:
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
self.parsers_prefix = base_parser_class.PREFIX
return self.parsers_prefix + name
def get_concept_definition(self):
if self.concepts_definition_cache:
return self.concepts_definition_cache
self.concepts_definition_cache = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) or {}
return self.concepts_definition_cache
def concepts(self):
res = []
lst = self.sdp.list(self.CONCEPTS_ENTRY)
for item in lst:
if isinstance(item, list):
res.extend(item)
else:
res.append(item)
return sorted(res, key=lambda i: int(i.id))
def test(self):
return f"I have access to Sheerka !"
def test_error(self):
raise Exception("I can raise an error")
def dump_concepts(self):
lst = self.sdp.list(self.CONCEPTS_ENTRY)
for item in lst:
if hasattr(item, "__iter__"):
for i in item:
self.log.info(i)
else:
self.log.info(item)
def dump_definitions(self):
defs = self.sdp.get(self.CONCEPTS_DEFINITIONS_ENTRY)
self.log.info(defs)
def dump_desc(self, concept_name):
if isinstance(concept_name, Concept):
c = concept_name
else:
c = self.get(concept_name)
if self.isinstance(c, BuiltinConcepts.UNKNOWN_CONCEPT):
self.log.error("Concept unknown")
return False
self.log.info(f"name : {c.name}")
self.log.info(f"bnf : {c.metadata.definition}")
self.log.info(f"key : {c.key}")
self.log.info(f"body : {c.body}")
@staticmethod
def get_builtins_classes_as_dict():
res = {}
for c in core.utils.get_classes("core.builtin_concepts"):
if issubclass(c, Concept) and c != Concept:
res[c().metadata.key] = c
return res
@staticmethod
def init_logging(debug, loggers):
core.sheerka_logger.set_enabled(loggers)
if debug:
# log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
log_format = "%(asctime)s [%(levelname)s] %(message)s"
log_level = logging.DEBUG
else:
log_format = "%(message)s"
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
@dataclass
class ExecutionContext:
"""
To keep track of the execution of a request
"""
def __init__(self,
who,
event: Event,
sheerka: Sheerka,
/,
desc: str = None,
obj: Concept = None,
step: BuiltinConcepts = None,
iteration: int = 0,
concepts: dict = None):
self.who = who # who is asking
self.event = event # what was the (original) trigger
self.sheerka = sheerka # sheerka
self.step = step
self.iteration = iteration
self.preprocess = None
self.desc = desc # human description of what is going on
self.obj = obj # what is the subject of the execution context (if known)
self.concepts = concepts or {} # cache for concepts that are specific to this execution
self._id = ExecutionContextIdManager.get_id(event.get_digest())
self._tab = ""
def add_preprocess(self, name, **kwargs):
preprocess = self.sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS)
preprocess.set_prop("name", name)
for k, v in kwargs.items():
preprocess.set_prop(k, v)
if not self.preprocess:
self.preprocess = set()
self.preprocess.add(preprocess)
return self
@property
def id(self):
return self._id
def push(self, who=None, /, **kwargs):
who = who or self.who
desc = kwargs.get("desc", "")
obj = kwargs.get("obj", self.obj)
concepts = kwargs.get("concepts", self.concepts)
step = kwargs.get("step", self.step)
iteration = kwargs.get("iteration", self.iteration)
new = ExecutionContext(
who,
self.event,
self.sheerka,
desc=desc,
obj=obj,
concepts=concepts,
step=step,
iteration=iteration,
)
new._tab = self._tab + " " * DEBUG_TAB_SIZE
new.preprocess = self.preprocess
return new
def log_new(self, logger):
logger.debug(f"[{self._id:2}]" + self._tab + str(self))
def log(self, logger, message, who=None):
logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
def log_error(self, logger, message, who=None):
logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message))
def log_result(self, logger, return_values):
if not logger.isEnabledFor(logging.DEBUG):
return
if len(return_values) == 0:
logger.debug(self._tab + "No return value")
for r in return_values:
to_str = self.return_value_to_str(r)
logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str)
@staticmethod
def return_value_to_str(r):
value = str(r.value)
if len(value) > 50:
value = value[:47] + "..."
to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})"
return to_str
def __repr__(self):
msg = f"ExecutionContext(who={self.who}, id={self._id}"
if self.desc:
msg += f", desc='{self.desc}'"
msg += ")"
return msg
class ExecutionContextIdManager:
ids = {}
@staticmethod
def get_id(event_digest):
if event_digest in ExecutionContextIdManager.ids:
ExecutionContextIdManager.ids[event_digest] += 1
else:
ExecutionContextIdManager.ids[event_digest] = 0
return ExecutionContextIdManager.ids[event_digest]