968 lines
35 KiB
Python
968 lines
35 KiB
Python
import inspect
|
|
import logging
|
|
from dataclasses import dataclass
|
|
|
|
import core.builtin_helpers
|
|
import core.utils
|
|
from cache.Cache import Cache
|
|
from cache.CacheManager import CacheManager
|
|
from cache.DictionaryCache import DictionaryCache
|
|
from cache.IncCache import IncCache
|
|
from cache.ListIfNeededCache import ListIfNeededCache
|
|
from cache.SetCache import SetCache
|
|
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors, BuiltinUnique, \
|
|
UnknownConcept
|
|
from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW
|
|
from core.sheerka.ExecutionContext import ExecutionContext
|
|
from core.sheerka_logger import console_handler
|
|
from core.tokenizer import Token, TokenKind
|
|
from printer.SheerkaPrinter import SheerkaPrinter
|
|
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event
|
|
|
|
BASE_NODE_PARSER_CLASS = "parsers.BaseNodeParser.BaseNodeParser"
|
|
EXIT_COMMANDS = ("quit", "exit", "bye")
|
|
EXECUTE_STEPS = [
|
|
BuiltinConcepts.BEFORE_PARSING,
|
|
BuiltinConcepts.PARSING,
|
|
BuiltinConcepts.AFTER_PARSING,
|
|
BuiltinConcepts.BEFORE_EVALUATION,
|
|
BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class SheerkaMethod:
|
|
"""
|
|
Wrapper to sheerka method, to indicate if it's safe to call
|
|
"""
|
|
method: object
|
|
has_side_effect: bool
|
|
|
|
|
|
class Sheerka(Concept):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
CONCEPTS_BY_ID_ENTRY = "Concepts_By_ID" # to store all the concepts
|
|
CONCEPTS_BY_KEY_ENTRY = "Concepts_By_Key"
|
|
CONCEPTS_BY_NAME_ENTRY = "Concepts_By_Name"
|
|
CONCEPTS_BY_HASH_ENTRY = "Concepts_By_Hash" # store hash of concepts definitions (not values)
|
|
|
|
CONCEPTS_REFERENCES_ENTRY = "Concepts_References" # tracks references between concepts
|
|
|
|
CONCEPTS_BY_FIRST_KEYWORD_ENTRY = "Concepts_By_First_Keyword"
|
|
RESOLVED_CONCEPTS_BY_FIRST_KEYWORD_ENTRY = "Resolved_Concepts_By_First_Keyword"
|
|
CONCEPTS_SYA_DEFINITION_ENTRY = "Concepts_Sya_Definitions"
|
|
RESOLVED_CONCEPTS_SYA_DEFINITION_ENTRY = "Resolved_Concepts_Sya_Definitions"
|
|
CONCEPTS_GRAMMARS_ENTRY = "Concepts_Grammars"
|
|
CHICKEN_AND_EGG_CONCEPTS_ENTRY = "Chicken_And_Egg_Concepts"
|
|
|
|
CONCEPTS_KEYS_ENTRY = "Concepts_Keys"
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
|
|
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
|
|
|
|
MAX_EXECUTION_HISTORY = 100
|
|
MAX_RETURN_VALUES_HISTORY = 100
|
|
|
|
def __init__(self, cache_only=False, debug=False, loggers=None):
|
|
self.init_logging(debug, loggers)
|
|
self.loggers = loggers
|
|
|
|
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
|
|
self.log.debug("Starting Sheerka.")
|
|
|
|
self.bnp = None # reference to the BaseNodeParser class (to compute first keyword token)
|
|
self.return_value_concept_id = None
|
|
|
|
# a concept can be instantiated
|
|
# ex: File is a concept, but File('foo.txt') is an instance
|
|
# TODO: manage contexts
|
|
self.instances = []
|
|
|
|
# List of the known rules by the system
|
|
# ex: hello => say('hello')
|
|
self.rules = []
|
|
|
|
self.sdp: SheerkaDataProvider = None # SheerkaDataProvider
|
|
self.cache_manager = CacheManager(cache_only)
|
|
|
|
self.services = {} # sheerka plugins
|
|
|
|
self.builtin_cache = {} # cache for builtin concepts
|
|
self.parsers = {} # cache for builtin parsers
|
|
self.evaluators = [] # cache for builtin evaluators
|
|
|
|
self.evaluators_prefix: str = None
|
|
self.parsers_prefix: str = None
|
|
|
|
self.printer_handler = SheerkaPrinter(self)
|
|
|
|
self.during_restore = False
|
|
self._builtins_classes_cache = None
|
|
|
|
self.save_execution_context = True
|
|
|
|
self.methods_with_context = {"test_using_context"} # only the names, the method is defined in sheerka_methods
|
|
self.sheerka_methods = {
|
|
"test": SheerkaMethod(self.test, False),
|
|
"test_using_context": SheerkaMethod(self.test_using_context, False)
|
|
}
|
|
self.sheerka_pipeables = {}
|
|
|
|
self.locals = {}
|
|
|
|
self.last_executions = []
|
|
self.last_return_values = []
|
|
|
|
@property
|
|
def resolved_concepts_by_first_keyword(self):
|
|
"""
|
|
We return the cache as we will be interested by statistics
|
|
:return:
|
|
"""
|
|
return self.cache_manager.caches[self.RESOLVED_CONCEPTS_BY_FIRST_KEYWORD_ENTRY].cache
|
|
|
|
@property
|
|
def resolved_sya_def(self):
|
|
"""
|
|
|
|
:return:
|
|
"""
|
|
return self.cache_manager.caches[self.RESOLVED_CONCEPTS_SYA_DEFINITION_ENTRY].cache
|
|
|
|
@property
|
|
def concepts_grammars(self):
|
|
return self.cache_manager.caches[self.CONCEPTS_GRAMMARS_ENTRY].cache
|
|
|
|
@property
|
|
def chicken_and_eggs(self):
|
|
return self.cache_manager.caches[self.CHICKEN_AND_EGG_CONCEPTS_ENTRY].cache
|
|
|
|
def bind_service_method(self, bound_method, has_side_effect, as_name=None, visible=True):
|
|
"""
|
|
Bind service method to sheerka instance for ease of use ?
|
|
:param bound_method:
|
|
:param has_side_effect: False if the method is safe
|
|
:param as_name: give another name to the method
|
|
:param visible: make the method visible to Sheerka
|
|
:return:
|
|
"""
|
|
if as_name is None:
|
|
as_name = bound_method.__name__
|
|
|
|
if visible:
|
|
signature = inspect.signature(bound_method)
|
|
if len(signature.parameters) > 0 and list(signature.parameters.keys())[0] == "context":
|
|
self.methods_with_context.add(as_name)
|
|
self.sheerka_methods[as_name] = SheerkaMethod(bound_method, has_side_effect)
|
|
|
|
setattr(self, as_name, bound_method)
|
|
|
|
def add_pipeable(self, func_name, function, has_side_effect):
|
|
"""
|
|
Adds a function that can bu used with pipe '|'
|
|
:param func_name:
|
|
:param function:
|
|
:param has_side_effect:
|
|
:return:
|
|
"""
|
|
self.sheerka_pipeables[func_name] = SheerkaMethod(function, has_side_effect)
|
|
|
|
def initialize(self, root_folder: str = None, save_execution_context=True):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param root_folder: root configuration folder
|
|
:param save_execution_context:
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
self.save_execution_context = save_execution_context
|
|
|
|
try:
|
|
from sheerkapickle.sheerka_handlers import initialize_pickle_handlers
|
|
initialize_pickle_handlers()
|
|
|
|
self.sdp = SheerkaDataProvider(root_folder, self)
|
|
self.initialize_caching()
|
|
self.initialize_services()
|
|
|
|
event = Event("Initializing Sheerka.", user_id=self.name)
|
|
self.sdp.save_event(event)
|
|
with ExecutionContext(self.key,
|
|
event,
|
|
self,
|
|
BuiltinConcepts.INIT_SHEERKA,
|
|
None,
|
|
desc="Initializing Sheerka.",
|
|
logger=self.init_log) as exec_context:
|
|
if self.sdp.first_time:
|
|
self.first_time_initialisation(exec_context)
|
|
|
|
self.initialize_builtin_parsers()
|
|
self.initialize_builtin_evaluators()
|
|
self.initialize_builtin_concepts()
|
|
self.initialize_concept_node_parsing(exec_context)
|
|
res = ReturnValueConcept(self, True, self)
|
|
|
|
exec_context.add_values(return_values=res)
|
|
|
|
if self.cache_manager.is_dirty:
|
|
self.cache_manager.commit(exec_context)
|
|
|
|
if save_execution_context:
|
|
self.sdp.save_result(exec_context, is_admin=True)
|
|
self.init_log.debug(f"Sheerka successfully initialized")
|
|
|
|
except IOError as e:
|
|
res = ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
|
|
|
|
return res
|
|
|
|
def initialize_caching(self):
|
|
|
|
def params(cache_name):
|
|
return {
|
|
'default': lambda k: self.sdp.get(cache_name, k),
|
|
'extend_exists': lambda k: self.sdp.exists(cache_name, k)
|
|
}
|
|
|
|
cache = IncCache(default=lambda k: self.sdp.get(self.CONCEPTS_KEYS_ENTRY, k))
|
|
self.cache_manager.register_cache(self.CONCEPTS_KEYS_ENTRY, cache)
|
|
|
|
register_concept_cache = self.cache_manager.register_concept_cache
|
|
|
|
cache = Cache(**params(self.CONCEPTS_BY_ID_ENTRY))
|
|
register_concept_cache(self.CONCEPTS_BY_ID_ENTRY, cache, lambda c: c.id, True)
|
|
|
|
cache = ListIfNeededCache(**params(self.CONCEPTS_BY_KEY_ENTRY))
|
|
register_concept_cache(self.CONCEPTS_BY_KEY_ENTRY, cache, lambda c: c.key, True)
|
|
|
|
cache = ListIfNeededCache(**params(self.CONCEPTS_BY_NAME_ENTRY))
|
|
register_concept_cache(self.CONCEPTS_BY_NAME_ENTRY, cache, lambda c: c.name, True)
|
|
|
|
cache = ListIfNeededCache(**params(self.CONCEPTS_BY_HASH_ENTRY))
|
|
register_concept_cache(self.CONCEPTS_BY_HASH_ENTRY, cache, lambda c: c.get_definition_hash(), True)
|
|
|
|
cache = SetCache(default=lambda k: self.sdp.get(self.CONCEPTS_REFERENCES_ENTRY, k))
|
|
self.cache_manager.register_cache(self.CONCEPTS_REFERENCES_ENTRY, cache)
|
|
|
|
cache = DictionaryCache(default=lambda k: self.sdp.get(self.CONCEPTS_BY_FIRST_KEYWORD_ENTRY, k))
|
|
self.cache_manager.register_cache(self.CONCEPTS_BY_FIRST_KEYWORD_ENTRY, cache)
|
|
self.cache_manager.get(self.CONCEPTS_BY_FIRST_KEYWORD_ENTRY, None) # to init from sdp
|
|
|
|
cache = DictionaryCache(default=lambda k: self.sdp.get(self.CONCEPTS_SYA_DEFINITION_ENTRY, k))
|
|
self.cache_manager.register_cache(self.CONCEPTS_SYA_DEFINITION_ENTRY, cache)
|
|
self.cache_manager.get(self.CONCEPTS_SYA_DEFINITION_ENTRY, None) # to init from sdp
|
|
|
|
cache = DictionaryCache()
|
|
self.cache_manager.register_cache(self.RESOLVED_CONCEPTS_BY_FIRST_KEYWORD_ENTRY, cache, persist=False)
|
|
|
|
cache = DictionaryCache()
|
|
self.cache_manager.register_cache(self.RESOLVED_CONCEPTS_SYA_DEFINITION_ENTRY, cache, persist=False)
|
|
|
|
cache = Cache()
|
|
self.cache_manager.register_cache(self.CONCEPTS_GRAMMARS_ENTRY, cache, persist=False)
|
|
|
|
cache = Cache()
|
|
self.cache_manager.register_cache(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY, cache, persist=False)
|
|
|
|
def initialize_services(self):
|
|
"""
|
|
Introspect to find services and bind them
|
|
:return:
|
|
"""
|
|
self.init_log.debug("Initializing services")
|
|
|
|
core.utils.import_module_and_sub_module('core.sheerka.services')
|
|
base_class = "core.sheerka.services.sheerka_service.BaseService"
|
|
for service in core.utils.get_sub_classes("core.sheerka.services", base_class):
|
|
instance = service(self)
|
|
if hasattr(instance, "initialize"):
|
|
instance.initialize()
|
|
self.services[service.NAME] = instance
|
|
|
|
def first_time_initialisation(self, context):
|
|
|
|
self.cache_manager.put(self.CONCEPTS_KEYS_ENTRY, self.USER_CONCEPTS_KEYS, 1000)
|
|
self.record(context, self.name, "save_execution_context", True)
|
|
|
|
def initialize_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
self.init_log.debug("Initializing builtin concepts")
|
|
builtins_classes = self.get_builtins_classes_as_dict()
|
|
|
|
# this all initialization of the builtins seems to be little bit complicated
|
|
# why do we need to update it from DB ?
|
|
for key in BuiltinConcepts:
|
|
concept = self if key == BuiltinConcepts.SHEERKA \
|
|
else builtins_classes[str(key)]() if str(key) in builtins_classes \
|
|
else Concept(key, True, False, key)
|
|
|
|
if key in BuiltinUnique:
|
|
concept.metadata.is_unique = True
|
|
concept.metadata.is_evaluated = True
|
|
|
|
if not concept.metadata.is_unique and str(key) in builtins_classes:
|
|
self.builtin_cache[key] = builtins_classes[str(key)]
|
|
|
|
from_db = self.cache_manager.get(self.CONCEPTS_BY_KEY_ENTRY, concept.metadata.key)
|
|
if from_db is None:
|
|
self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.")
|
|
self.set_id_if_needed(concept, True)
|
|
self.cache_manager.add_concept(concept)
|
|
|
|
if key == BuiltinConcepts.RETURN_VALUE:
|
|
self.return_value_concept_id = concept.id
|
|
|
|
else:
|
|
self.init_log.debug(f"Found concept '{from_db}' in db. Updating.")
|
|
concept.update_from(from_db)
|
|
|
|
return
|
|
|
|
def initialize_builtin_parsers(self):
|
|
"""
|
|
Init the parsers
|
|
:return:
|
|
"""
|
|
core.utils.import_module_and_sub_module("parsers")
|
|
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
modules_to_skip = ["parsers.BaseNodeParser"]
|
|
|
|
temp_result = {}
|
|
for parser in core.utils.get_sub_classes("parsers", base_class):
|
|
if parser.__module__ == base_class.__module__:
|
|
continue
|
|
|
|
qualified_name = core.utils.get_full_qualified_name(parser)
|
|
self.init_log.debug(f"Adding builtin parser '{qualified_name}'")
|
|
temp_result[qualified_name] = parser
|
|
|
|
# keep a reference to base_node_parser
|
|
self.bnp = temp_result[BASE_NODE_PARSER_CLASS]
|
|
|
|
# Now we sort the parser by name.
|
|
# It's not important for the logic of their usage as they have their priority anyway,
|
|
# We do that for the unit tests. They are to complicated to write otherwise
|
|
for name in sorted(temp_result.keys()):
|
|
parser = temp_result[name]
|
|
|
|
if parser.__module__ in modules_to_skip:
|
|
# base node parser module does not contains any valid parser
|
|
continue
|
|
|
|
self.parsers[name] = temp_result[name]
|
|
|
|
def initialize_builtin_evaluators(self):
|
|
"""
|
|
Init the evaluators
|
|
:return:
|
|
"""
|
|
core.utils.import_module_and_sub_module("evaluators")
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
def initialize_concept_node_parsing(self, context):
|
|
self.init_log.debug("siInitializing concepts by first keyword.")
|
|
|
|
concepts_by_first_keyword = self.cache_manager.copy(self.CONCEPTS_BY_FIRST_KEYWORD_ENTRY)
|
|
res = self.bnp.resolve_concepts_by_first_keyword(context, concepts_by_first_keyword)
|
|
self.cache_manager.put(self.RESOLVED_CONCEPTS_BY_FIRST_KEYWORD_ENTRY, False, res.body)
|
|
|
|
def reset(self, cache_only=False):
|
|
if self.cache_manager.cache_only != cache_only:
|
|
self.cache_manager.reset(cache_only)
|
|
self.initialize_caching()
|
|
for service in self.services.values():
|
|
if hasattr(service, "initialize"):
|
|
service.initialize()
|
|
else:
|
|
self.cache_manager.clear()
|
|
self.printer_handler.reset()
|
|
self.sdp.reset()
|
|
self.locals = {}
|
|
|
|
# @profile()
|
|
def evaluate_user_input(self, text: str, user_name="kodjo"):
|
|
"""
|
|
Note to KSI: If you try to add execution context to this function,
|
|
You may end in an infinite loop
|
|
:param text:
|
|
:param user_name:
|
|
:return:
|
|
"""
|
|
self.log.debug(f"Processing user input '{text}', {user_name=}.")
|
|
event = Event(text, user_name)
|
|
evt_digest = self.sdp.save_event(event)
|
|
self.log.debug(f"{evt_digest=}")
|
|
|
|
with ExecutionContext(self.key,
|
|
event,
|
|
self,
|
|
BuiltinConcepts.PROCESS_INPUT,
|
|
text,
|
|
desc=f"Evaluating '{text}'",
|
|
logger=self.log) as execution_context:
|
|
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
|
|
reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED))
|
|
|
|
ret = self.execute(execution_context, [user_input, reduce_requested], EXECUTE_STEPS)
|
|
execution_context.add_values(return_values=ret)
|
|
|
|
if self.cache_manager.is_dirty:
|
|
self.cache_manager.commit(execution_context)
|
|
|
|
try:
|
|
if self.save_execution_context and self.load(self.name, "save_execution_context"):
|
|
self.sdp.save_result(execution_context)
|
|
except Exception as ex:
|
|
self.log.error(f"Failed to save execution context. Reason: {ex}")
|
|
|
|
# # hack to save valid concept definition
|
|
# if not self.during_restore:
|
|
# if len(ret) == 1 and ret[0].status and self.isinstance(ret[0].value, BuiltinConcepts.NEW_CONCEPT):
|
|
# with open(CONCEPTS_FILE, "a") as f:
|
|
# f.write(text + "\n")
|
|
|
|
self._last_execution = execution_context
|
|
if len(self.last_executions) == self.MAX_EXECUTION_HISTORY:
|
|
del self.last_executions[0]
|
|
self.last_executions.append(execution_context)
|
|
|
|
if len(self.last_return_values) == self.MAX_RETURN_VALUES_HISTORY:
|
|
del self.last_return_values[0]
|
|
self.last_return_values.append(ret)
|
|
|
|
return ret
|
|
|
|
def print(self, result, instructions=None):
|
|
"""
|
|
Print the result to output
|
|
:param result:
|
|
:param instructions:
|
|
:return:
|
|
"""
|
|
self.printer_handler.print(result, instructions)
|
|
|
|
def set_id_if_needed(self, obj: Concept, is_builtin: bool):
|
|
"""
|
|
Set the key for the concept if needed
|
|
For test purpose only !!!!!
|
|
:param obj:
|
|
:param is_builtin:
|
|
:return:
|
|
"""
|
|
if obj.metadata.id is not None:
|
|
return
|
|
|
|
key = self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS
|
|
obj.metadata.id = str(self.cache_manager.get(self.CONCEPTS_KEYS_ENTRY, key))
|
|
self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.")
|
|
|
|
def force_sya_def(self, context, list_of_def):
|
|
"""
|
|
Set the precedence and/or the associativity of a concept
|
|
FOR TESTS PURPOSE. TO REMOVE EVENTUALLY
|
|
:param context:
|
|
:param list_of_def list of tuple(concept_id, precedence (int), SyaAssociativity)
|
|
:return:
|
|
"""
|
|
|
|
# validate the entries
|
|
# If one entry is an invalid concept, rollback everything
|
|
for concept_id, precedence, associativity in list_of_def:
|
|
if concept_id == BuiltinConcepts.UNKNOWN_CONCEPT:
|
|
return self.ret(self.name,
|
|
False,
|
|
self.new(BuiltinConcepts.ERROR, body=f"Concept {concept_id} is not known"))
|
|
|
|
sya_def = self.cache_manager.copy(self.RESOLVED_CONCEPTS_SYA_DEFINITION_ENTRY) or {}
|
|
|
|
# update the definitions
|
|
for concept_id, precedence, associativity in list_of_def:
|
|
if precedence is None and associativity is None:
|
|
try:
|
|
del self.sya_definitions[concept_id]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
sya_def[concept_id] = (precedence, associativity)
|
|
|
|
# put in cache
|
|
self.cache_manager.put(self.RESOLVED_CONCEPTS_SYA_DEFINITION_ENTRY, False, sya_def)
|
|
|
|
return self.ret(self.name, True, self.new(BuiltinConcepts.SUCCESS))
|
|
|
|
def add_in_cache(self, concept: Concept):
|
|
"""
|
|
Adds a concept template in cache.
|
|
The cache is used as a proxy before looking at sdp
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
# sanity check
|
|
if concept.key is None:
|
|
concept.init_key()
|
|
|
|
if concept.key is None:
|
|
raise KeyError()
|
|
|
|
self.cache_manager.add_concept(concept)
|
|
|
|
return concept
|
|
|
|
def get_by_key(self, concept_key, concept_id=None):
|
|
concept_key = str(concept_key) if isinstance(concept_key, BuiltinConcepts) else concept_key
|
|
return self.internal_get("key", concept_key, self.CONCEPTS_BY_KEY_ENTRY, concept_id)
|
|
|
|
def get_by_name(self, concept_name, concept_id=None):
|
|
return self.internal_get("name", concept_name, self.CONCEPTS_BY_NAME_ENTRY, concept_id)
|
|
|
|
def get_by_hash(self, concept_hash, concept_id=None):
|
|
return self.internal_get("hash", concept_hash, self.CONCEPTS_BY_HASH_ENTRY, concept_id)
|
|
|
|
def get_by_id(self, concept_id):
|
|
return self.internal_get("id", concept_id, self.CONCEPTS_BY_ID_ENTRY, None)
|
|
|
|
def internal_get(self, index_name, key, cache_name, concept_id=None):
|
|
"""
|
|
Tries to find an entry
|
|
:param index_name: name of the index (ex by_id, by_key...)
|
|
:param key: index value
|
|
:param cache_name: name of the cache (ex Concepts_By_ID...)
|
|
:param concept_id: id of the concept if none, in case where there are multiple results
|
|
:return:
|
|
"""
|
|
|
|
if key is None:
|
|
return ErrorConcept(f"Concept '{key}' is undefined.")
|
|
|
|
concepts = self.cache_manager.get(cache_name, key)
|
|
if concepts:
|
|
if concept_id is None:
|
|
return concepts
|
|
|
|
if not hasattr(concepts, "__iter__"):
|
|
return concepts
|
|
|
|
for c in concepts:
|
|
if c.id == concept_id:
|
|
return c
|
|
|
|
metadata = [(index_name, key), ("id", concept_id)] if concept_id else (index_name, key)
|
|
return self._get_unknown(metadata)
|
|
|
|
def resolve(self, concept):
|
|
"""
|
|
Try to find a concept by its name, id, or c:: definition
|
|
A new instance (using new_from_template()) is returned when it's possible
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
def new_instances(concepts):
|
|
if hasattr(concepts, "__iter__"):
|
|
return [self.new_from_template(c, c.key) for c in concepts]
|
|
return self.new_from_template(concepts, concepts.key)
|
|
|
|
if concept is None:
|
|
return None
|
|
|
|
# ##############
|
|
# PREPROCESS
|
|
# ##############
|
|
# if the entry is a concept token, use its values.
|
|
if isinstance(concept, Token):
|
|
if concept.type != TokenKind.CONCEPT:
|
|
return None
|
|
concept = concept.value
|
|
|
|
if isinstance(concept, str) and \
|
|
concept.startswith("c:") and \
|
|
(tmp := core.utils.unstr_concept(concept)) != (None, None):
|
|
concept = tmp
|
|
|
|
# ##############
|
|
# PROCESS
|
|
# ##############
|
|
# if the entry is a tuple
|
|
# concept[0] is the name
|
|
# concept[1] is the id
|
|
if isinstance(concept, tuple):
|
|
if concept[1]:
|
|
if self.is_known(found := self.get_by_id(concept[1])):
|
|
instance = self.new_from_template(found, found.key)
|
|
instance.metadata.is_evaluated = True
|
|
return instance
|
|
elif concept[0]:
|
|
if self.is_known(found := self.get_by_name(concept[0])):
|
|
instances = new_instances(found)
|
|
core.builtin_helpers.set_is_evaluated(instances)
|
|
return instances
|
|
else:
|
|
return None
|
|
|
|
# otherwise search in db
|
|
if isinstance(concept, str):
|
|
if self.is_known(found := self.get_by_name(concept)):
|
|
instances = new_instances(found)
|
|
core.builtin_helpers.set_is_evaluated(instances, check_nb_variables=True)
|
|
return instances
|
|
|
|
return None
|
|
|
|
def has_id(self, concept_id):
|
|
"""
|
|
Returns True if a concept with this id exists in cache
|
|
It does not search in the remote repository
|
|
:param concept_id:
|
|
:return:
|
|
"""
|
|
if concept_id is None:
|
|
return False
|
|
return self.cache_manager.has(self.CONCEPTS_BY_ID_ENTRY, concept_id)
|
|
|
|
def has_key(self, concept_key):
|
|
"""
|
|
Returns True if concept(s) with this key exist in cache
|
|
It does not search in the remote repository
|
|
:param concept_key:
|
|
:return:
|
|
"""
|
|
return self.cache_manager.has(self.CONCEPTS_BY_KEY_ENTRY, concept_key)
|
|
|
|
def has_name(self, concept_name):
|
|
"""
|
|
Returns True if concept(s) with this name exist in cache
|
|
It does not search in the remote repository
|
|
:param concept_name:
|
|
:return:
|
|
"""
|
|
return self.cache_manager.has(self.CONCEPTS_BY_NAME_ENTRY, concept_name)
|
|
|
|
def has_hash(self, concept_hash):
|
|
"""
|
|
Returns True if concept(s) with this hash exist in cache
|
|
It does not search in the remote repository
|
|
:param concept_hash:
|
|
:return:
|
|
"""
|
|
return self.cache_manager.has(self.CONCEPTS_BY_HASH_ENTRY, concept_hash)
|
|
|
|
def new(self, concept_key, **kwargs):
|
|
"""
|
|
Returns an instance of a new concept
|
|
When the concept is supposed to be unique, returns the same instance
|
|
:param concept_key:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
if isinstance(concept_key, tuple):
|
|
concept_key, concept_id = concept_key[0], concept_key[1]
|
|
else:
|
|
concept_id = None
|
|
|
|
template = self.get_by_id(concept_id) if not concept_key else self.get_by_key(concept_key, concept_id)
|
|
|
|
# manage concept not found
|
|
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
|
|
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
|
|
return template
|
|
|
|
if isinstance(template, list):
|
|
# if template is a list, it means that there a multiple concepts under the same key
|
|
concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template]
|
|
return concepts
|
|
else:
|
|
return self.new_from_template(template, concept_key, **kwargs)
|
|
|
|
def new_from_template(self, template, key, **kwargs):
|
|
# core.utils.my_debug(f"Created {template}, {key=}, {kwargs=}")
|
|
# manage singleton
|
|
if template.metadata.is_unique:
|
|
return template
|
|
|
|
# otherwise, create another instance
|
|
concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept()
|
|
concept.update_from(template, update_value=False)
|
|
concept.freeze_definition_hash()
|
|
|
|
if len(kwargs) == 0:
|
|
return concept
|
|
|
|
# update the properties, values, attributes
|
|
# Not quite sure that this is the correct process order
|
|
for k, v in kwargs.items():
|
|
if k in concept.values:
|
|
concept.set_value(k, v)
|
|
elif k in PROPERTIES_FOR_NEW:
|
|
concept.set_value(ConceptParts(k), v)
|
|
elif hasattr(concept, k):
|
|
setattr(concept, k, v)
|
|
else:
|
|
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
|
|
|
|
# TODO : add the concept to the list of known concepts (self.instances)
|
|
concept.metadata.is_evaluated = True # because we have manually set the variables
|
|
return concept
|
|
|
|
def ret(self, who: str, status: bool, value, message=None, parents=None):
|
|
"""
|
|
Creates and returns a ReturnValue concept
|
|
:param who:
|
|
:param status:
|
|
:param value:
|
|
:param message:
|
|
:param parents:
|
|
:return:
|
|
"""
|
|
|
|
# 1 second saved every twenty seconds in unit tests
|
|
return ReturnValueConcept(
|
|
who=who,
|
|
status=status,
|
|
value=value,
|
|
message=message,
|
|
parents=parents,
|
|
concept_id=self.return_value_concept_id
|
|
)
|
|
# return self.new(
|
|
# BuiltinConcepts.RETURN_VALUE,
|
|
# who=who,
|
|
# status=status,
|
|
# value=value,
|
|
# message=message,
|
|
# parents=parents)
|
|
|
|
def objvalue(self, obj, reduce_simple_list=False):
|
|
if obj is None:
|
|
return None
|
|
|
|
if hasattr(obj, "get_obj_value"):
|
|
return obj.get_obj_value()
|
|
|
|
if not isinstance(obj, Concept):
|
|
return obj
|
|
|
|
if obj.body is BuiltinConcepts.NOT_INITIALIZED:
|
|
return obj
|
|
|
|
if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1:
|
|
body_to_use = obj.body[0]
|
|
else:
|
|
body_to_use = obj.body
|
|
|
|
return self.objvalue(body_to_use)
|
|
|
|
def objvalues(self, objs):
|
|
if not (isinstance(objs, list) or
|
|
self.isinstance(objs, BuiltinConcepts.LIST) or
|
|
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
|
|
objs = [objs]
|
|
|
|
if isinstance(objs, list):
|
|
return (self.objvalue(obj) for obj in objs)
|
|
|
|
return (self.objvalue(obj) for obj in objs.body)
|
|
|
|
def value_by_concept(self, obj, concept):
|
|
if obj is None:
|
|
return None
|
|
|
|
if not isinstance(obj, Concept):
|
|
return None
|
|
|
|
if isinstance(concept, tuple) and obj.key in [str(key) for key in concept]:
|
|
return obj
|
|
|
|
if obj.key == str(concept):
|
|
return obj
|
|
|
|
return self.value_by_concept(obj.body, concept)
|
|
|
|
def get_error(self, obj):
|
|
if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors:
|
|
return obj
|
|
|
|
if isinstance(obj, (list, set, tuple)):
|
|
return [self.get_error(o) for o in obj]
|
|
|
|
if self.isinstance(obj, BuiltinConcepts.RETURN_VALUE):
|
|
if obj.status:
|
|
return None
|
|
|
|
if self.isinstance(obj.body, BuiltinConcepts.PARSER_RESULT):
|
|
return self.get_error(obj.body.body)
|
|
else:
|
|
return obj.body
|
|
|
|
raise NotImplementedError()
|
|
|
|
def get_evaluator_name(self, name):
|
|
if self.evaluators_prefix is None:
|
|
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
|
|
self.evaluators_prefix = base_evaluator_class.PREFIX
|
|
|
|
return self.evaluators_prefix + name
|
|
|
|
def get_parser_name(self, name):
|
|
if self.parsers_prefix is None:
|
|
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
self.parsers_prefix = base_parser_class.PREFIX
|
|
|
|
return self.parsers_prefix + name
|
|
|
|
def concepts(self):
|
|
"""
|
|
List of all known concepts (look up in sdp)
|
|
:return:
|
|
"""
|
|
res = []
|
|
lst = self.sdp.list(self.CONCEPTS_BY_ID_ENTRY)
|
|
for item in lst:
|
|
if isinstance(item, list):
|
|
res.extend(item)
|
|
else:
|
|
res.append(item)
|
|
|
|
return sorted(res, key=lambda i: int(i.id))
|
|
|
|
def get_last_execution(self):
|
|
return self._last_execution
|
|
|
|
def test(self):
|
|
return f"I have access to Sheerka !"
|
|
|
|
def test_using_context(self, context, param1, param2):
|
|
event = context.event.get_digest()
|
|
return f"I have access to Sheerka ! {param1=}, {param2=}, {event=}."
|
|
|
|
def test_error(self):
|
|
raise Exception("I can raise an error")
|
|
|
|
@staticmethod
|
|
def is_success(obj):
|
|
if isinstance(obj, bool): # quick win
|
|
return obj
|
|
|
|
if isinstance(obj, ReturnValueConcept):
|
|
return obj.status
|
|
|
|
# other cases ?
|
|
# ...
|
|
|
|
# manage internal errors
|
|
if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors:
|
|
return False
|
|
|
|
return obj
|
|
|
|
@staticmethod
|
|
def is_known(obj):
|
|
if not isinstance(obj, Concept):
|
|
return True
|
|
|
|
return obj.key != str(BuiltinConcepts.UNKNOWN_CONCEPT)
|
|
|
|
@staticmethod
|
|
def isinstance(a, b):
|
|
"""
|
|
return true if the concept a is an instance of the concept b
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
b_key = b.key if isinstance(b, Concept) else str(b)
|
|
|
|
return a.key == b_key
|
|
|
|
@staticmethod
|
|
def _get_unknown(metadata):
|
|
"""
|
|
Returns the concept 'UnknownConcept' for a requested id or key
|
|
Note that I don't call the new() method to prevent cyclic call
|
|
:param metadata:
|
|
:return:
|
|
"""
|
|
|
|
# metadata is a list of tuple that contains the known metadata for this concept
|
|
# ex : (key, 'not_found)
|
|
# or
|
|
# (id, invalid_id)
|
|
#
|
|
# the metadata can be a list, if several attributes where given
|
|
# (key, 'not_found), (id, invalid_id)
|
|
|
|
unknown_concept = UnknownConcept() # don't use new() for prevent circular reference
|
|
unknown_concept.set_value(ConceptParts.BODY, metadata)
|
|
for meta in (metadata if isinstance(metadata, list) else [metadata]):
|
|
unknown_concept.set_value(meta[0], meta[1])
|
|
unknown_concept.metadata.is_evaluated = True
|
|
return unknown_concept
|
|
|
|
@staticmethod
|
|
def get_builtins_classes_as_dict():
|
|
res = {}
|
|
for c in core.utils.get_classes("core.builtin_concepts"):
|
|
if issubclass(c, Concept) and c != Concept:
|
|
res[c().metadata.key] = c
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def init_logging(debug, loggers):
|
|
def add_coloring_to_emit_ansi(fn):
|
|
# add methods we need to the class
|
|
def new(*args):
|
|
levelno = args[1].levelno
|
|
if levelno >= 50:
|
|
color = '\x1b[31m' # red
|
|
elif levelno >= 40:
|
|
color = '\x1b[31m' # red
|
|
elif levelno >= 30:
|
|
color = '\x1b[33m' # yellow
|
|
elif levelno >= 20:
|
|
color = '\x1b[32m' # green
|
|
elif levelno >= 10:
|
|
color = '\x1b[35m' # pink
|
|
else:
|
|
color = '\x1b[0m' # normal
|
|
args[1].msg = color + str(args[1].msg) + '\x1b[0m' # normal
|
|
# print "after"
|
|
return fn(*args)
|
|
|
|
return new
|
|
|
|
core.sheerka_logger.init_config(loggers)
|
|
if debug:
|
|
log_format = "%(asctime)s"
|
|
if "show-name" in loggers:
|
|
log_format += " %(name)s"
|
|
log_format += " [%(levelname)s] %(message)s"
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_format = "%(message)s"
|
|
log_level = logging.INFO
|
|
|
|
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
|
|
logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
|
|
# uncomment the following line to enable colors
|
|
# logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit)
|