bef5f3208c
Fixed #62 : DefConceptParser: parsing error Fixed #64 : DefConceptParser: Failed to parse when too many concept keyword Fixed #65 : DefConceptParser : Add auto_eval keyword Fixed #66 : DefConceptParser : Add def_var keyword Fixed #67 : Add get_errors()
999 lines
35 KiB
Python
999 lines
35 KiB
Python
import inspect
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from operator import attrgetter
|
|
|
|
import core.builtin_helpers
|
|
import core.utils
|
|
from cache.Cache import Cache
|
|
from cache.IncCache import IncCache
|
|
from core.builtin_concepts import ErrorConcept, ReturnValueConcept, UnknownConcept
|
|
from core.builtin_concepts_ids import BuiltinErrors, BuiltinConcepts
|
|
from core.concept import Concept, ConceptParts, get_concept_attrs
|
|
from core.global_symbols import EVENT_USER_INPUT_EVALUATED, NotInit, NotFound, ErrorObj, EVENT_ONTOLOGY_CREATED
|
|
from core.profiling import profile
|
|
from core.sheerka.ExecutionContext import ExecutionContext
|
|
from core.sheerka.SheerkaOntologyManager import SheerkaOntologyManager, OntologyAlreadyExists
|
|
from core.sheerka_logger import console_handler
|
|
from core.tokenizer import Token, TokenKind
|
|
from printer.SheerkaPrinter import SheerkaPrinter
|
|
from sdp.sheerkaDataProvider import Event
|
|
|
|
BASE_NODE_PARSER_CLASS = "parsers.BaseNodeParser.BaseNodeParser"
|
|
EXIT_COMMANDS = ("quit", "exit", "bye")
|
|
EXECUTE_STEPS = [
|
|
BuiltinConcepts.BEFORE_PARSING,
|
|
BuiltinConcepts.PARSING,
|
|
BuiltinConcepts.AFTER_PARSING,
|
|
BuiltinConcepts.BEFORE_EVALUATION,
|
|
BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION
|
|
]
|
|
|
|
RULES_EVALUATE_STEPS = [
|
|
BuiltinConcepts.BEFORE_RULES_EVALUATION,
|
|
BuiltinConcepts.RULES_EVALUATION,
|
|
BuiltinConcepts.AFTER_RULES_EVALUATION,
|
|
]
|
|
|
|
RULES_EXECUTE_STEPS = [
|
|
BuiltinConcepts.BEFORE_EVALUATION,
|
|
BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION
|
|
]
|
|
|
|
# when a concept is instantiated via resolve or false_resolve
|
|
# It indicate which parameter was used to recognize the concept
|
|
RECOGNIZED_BY_ID = "by_id"
|
|
RECOGNIZED_BY_NAME = "by_name"
|
|
|
|
|
|
@dataclass
|
|
class SheerkaMethod:
|
|
"""
|
|
Wrapper to sheerka method, to indicate if it's safe to call
|
|
"""
|
|
method: object
|
|
has_side_effect: bool
|
|
|
|
|
|
class Sheerka(Concept):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
CONCEPTS_BY_ID_ENTRY = "ConceptManager:Concepts_By_ID"
|
|
CONCEPTS_BY_NAME_ENTRY = "ConceptManager:Concepts_By_Name"
|
|
|
|
CHICKEN_AND_EGG_CONCEPTS_ENTRY = "Chicken_And_Egg_Concepts"
|
|
|
|
OBJECTS_IDS_ENTRY = "Objects_Ids"
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
|
|
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
|
|
|
|
ALL_ATTRIBUTES = []
|
|
|
|
def __init__(self, cache_only=False, debug=False, loggers=None):
|
|
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
|
|
|
|
self.init_logging(debug, loggers)
|
|
self.loggers = loggers
|
|
self.cache_only = cache_only
|
|
|
|
# self.log.debug("Starting Sheerka.")
|
|
|
|
self.bnp = None # reference to the BaseNodeParser class (to compute first keyword token)
|
|
self.return_value_concept_id = None
|
|
self.error_concept_id = None
|
|
|
|
self.om: SheerkaOntologyManager = None
|
|
|
|
self.services = {} # sheerka plugins
|
|
|
|
self.builtin_cache = {} # cache for builtin concepts
|
|
self.parsers = {} # cache for builtin parsers
|
|
self.evaluators = [] # cache for builtin evaluators
|
|
|
|
self.evaluators_prefix: str = None
|
|
self.parsers_prefix: str = None
|
|
|
|
self.printer_handler = SheerkaPrinter(self)
|
|
|
|
self.during_restore = False
|
|
self.during_initialisation = False
|
|
self._builtins_classes_cache = None
|
|
|
|
self.save_execution_context = True
|
|
self.enable_process_return_values = True
|
|
self.enable_process_rules = True
|
|
self.enable_commands_backup = True
|
|
|
|
self.methods_with_context = {"test_using_context"} # only the names, the method is defined in sheerka_methods
|
|
self.sheerka_methods = {
|
|
"test": SheerkaMethod(self.test, False),
|
|
"test_using_context": SheerkaMethod(self.test_using_context, False),
|
|
"test_dict": SheerkaMethod(self.test_dict, False),
|
|
"test_error": SheerkaMethod(self.test_error, False),
|
|
"is_sheerka": SheerkaMethod(self.is_sheerka, False),
|
|
}
|
|
|
|
self.concepts_ids = None
|
|
|
|
def __copy__(self):
|
|
return self
|
|
|
|
def __deepcopy__(self, memodict={}):
|
|
return self
|
|
|
|
@property
|
|
def chicken_and_eggs(self):
|
|
return self.om.current_cache_manager().caches[self.CHICKEN_AND_EGG_CONCEPTS_ENTRY].cache
|
|
|
|
@property
|
|
def root_folder(self):
|
|
return self.om.root_folder
|
|
|
|
def bind_service_method(self, bound_method, has_side_effect, as_name=None, visible=True):
|
|
"""
|
|
Bind service method to sheerka instance for ease of use ?
|
|
:param bound_method:
|
|
:param has_side_effect: False if the method is safe
|
|
:param as_name: give another name to the method
|
|
:param visible: make the method visible to Sheerka
|
|
:return:
|
|
"""
|
|
if as_name is None:
|
|
as_name = bound_method.__name__
|
|
|
|
if visible:
|
|
signature = inspect.signature(bound_method)
|
|
if len(signature.parameters) > 0 and list(signature.parameters.keys())[0] == "context":
|
|
self.methods_with_context.add(as_name)
|
|
self.sheerka_methods[as_name] = SheerkaMethod(bound_method, has_side_effect)
|
|
|
|
setattr(self, bound_method.__name__, bound_method)
|
|
|
|
def initialize(self, root_folder: str = None, **kwargs):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param root_folder: root configuration folder
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
self.save_execution_context = kwargs.get("save_execution_context", self.save_execution_context)
|
|
self.enable_process_return_values = kwargs.get("enable_process_return_values",
|
|
self.enable_process_return_values)
|
|
self.enable_process_rules = kwargs.get("enable_process_rules", self.enable_process_rules)
|
|
self.enable_commands_backup = kwargs.get("enable_commands_backup", self.enable_commands_backup)
|
|
|
|
try:
|
|
self.during_initialisation = True
|
|
from sheerkapickle.sheerka_handlers import initialize_pickle_handlers
|
|
initialize_pickle_handlers()
|
|
|
|
self.om = SheerkaOntologyManager(self, root_folder, self.cache_only)
|
|
self.builtin_cache = self.get_builtins_classes_as_dict()
|
|
|
|
self.initialize_caching()
|
|
self.get_builtin_parsers()
|
|
self.get_builtin_evaluators()
|
|
self.initialize_services()
|
|
self.initialize_builtin_evaluators()
|
|
self.om.init_subscribers()
|
|
|
|
event = Event("Initializing Sheerka.", user_id=self.name)
|
|
self.om.save_event(event)
|
|
with ExecutionContext(self.key,
|
|
event,
|
|
self,
|
|
BuiltinConcepts.INIT_SHEERKA,
|
|
None,
|
|
desc="Initializing Sheerka.") as exec_context:
|
|
if self.om.current_sdp().first_time:
|
|
self.first_time_initialisation(exec_context)
|
|
|
|
self.initialize_builtin_concepts()
|
|
|
|
self.initialize_services_deferred(exec_context, self.om.current_sdp().first_time)
|
|
|
|
res = ReturnValueConcept(self, True, self)
|
|
exec_context.add_values(return_values=res)
|
|
|
|
if self.om.is_dirty():
|
|
self.om.commit(exec_context)
|
|
|
|
if self.save_execution_context:
|
|
self.om.save_result(exec_context, is_admin=True)
|
|
|
|
# append the other ontologies if needed
|
|
self.om.freeze()
|
|
self.initialize_ontologies(exec_context)
|
|
|
|
# self.init_log.debug(f"Sheerka successfully initialized")
|
|
|
|
except IOError as e:
|
|
res = ReturnValueConcept(self.name, False, self.new(BuiltinConcepts.ERROR, body=e))
|
|
|
|
finally:
|
|
self.during_initialisation = False
|
|
|
|
return res
|
|
|
|
def initialize_caching(self):
|
|
|
|
cache = IncCache().auto_configure(self.OBJECTS_IDS_ENTRY)
|
|
self.om.register_cache(self.OBJECTS_IDS_ENTRY, cache)
|
|
|
|
cache = Cache().auto_configure(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY)
|
|
self.om.register_cache(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY, cache, persist=False)
|
|
|
|
def initialize_services(self):
|
|
"""
|
|
Introspect to find services and bind them
|
|
:return:
|
|
"""
|
|
# self.init_log.debug("Initializing services")
|
|
|
|
core.utils.import_module_and_sub_module('core.sheerka.services')
|
|
base_class = "core.sheerka.services.sheerka_service.BaseService"
|
|
services = [service(self) for service in core.utils.get_sub_classes("core.sheerka.services", base_class)]
|
|
services.sort(key=attrgetter("order"))
|
|
for service in services:
|
|
if hasattr(service, "initialize"):
|
|
service.initialize()
|
|
self.services[service.NAME] = service
|
|
|
|
def initialize_services_deferred(self, context, is_first_time):
|
|
"""
|
|
Initialize part of services that may takes some time or that need the execution context
|
|
TODO: Create a separate thread for these initialisations as they may take time
|
|
:return:
|
|
"""
|
|
# self.init_log.debug("Initializing services (deferred)")
|
|
for service in self.services.values():
|
|
if hasattr(service, "initialize_deferred"):
|
|
service.initialize_deferred(context, is_first_time)
|
|
|
|
def first_time_initialisation(self, context):
|
|
self.record_var(context, self.name, "save_execution_context", self.save_execution_context)
|
|
|
|
def initialize_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
# self.init_log.debug("Initializing builtin concepts")
|
|
from core.sheerka.services.SheerkaConceptManager import SheerkaConceptManager
|
|
concept_service = self.services[SheerkaConceptManager.NAME]
|
|
concepts_ids = concept_service.initialize_builtin_concepts()
|
|
self.concepts_ids = concepts_ids
|
|
self.return_value_concept_id = concepts_ids[BuiltinConcepts.RETURN_VALUE]
|
|
self.error_concept_id = concepts_ids[BuiltinConcepts.ERROR]
|
|
|
|
def get_builtin_parsers(self):
|
|
"""
|
|
Init the parsers
|
|
:return:
|
|
"""
|
|
core.utils.import_module_and_sub_module("parsers")
|
|
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
modules_to_skip = ["parsers.BaseNodeParser",
|
|
"parsers.BaseCustomGrammarParser",
|
|
"parsers.BaseExpressionParser"]
|
|
|
|
temp_result = {}
|
|
for parser in core.utils.get_sub_classes("parsers", base_class):
|
|
if parser.__module__ == base_class.__module__:
|
|
continue
|
|
|
|
qualified_name = core.utils.get_full_qualified_name(parser)
|
|
# self.init_log.debug(f"Adding builtin parser '{qualified_name}'")
|
|
temp_result[qualified_name] = parser
|
|
|
|
# keep a reference to base_node_parser
|
|
self.bnp = temp_result[BASE_NODE_PARSER_CLASS]
|
|
|
|
# Now we sort the parser by name.
|
|
# It's not important for the logic of their usage as they have their priority anyway,
|
|
# We do that for the unit tests. They are to complicated to write otherwise
|
|
for name in sorted(temp_result.keys()):
|
|
parser = temp_result[name]
|
|
|
|
if parser.__module__ in modules_to_skip:
|
|
# base node parser module does not contains any valid parser
|
|
continue
|
|
|
|
self.parsers[name] = temp_result[name]
|
|
|
|
def get_builtin_evaluators(self):
|
|
"""
|
|
get all evaluators
|
|
:return:
|
|
"""
|
|
core.utils.import_module_and_sub_module("evaluators")
|
|
evaluators = core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator")
|
|
evaluators.extend(core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"))
|
|
|
|
for evaluator in evaluators:
|
|
self.evaluators.append(evaluator)
|
|
|
|
def initialize_builtin_evaluators(self):
|
|
"""
|
|
Init the evaluators
|
|
:return:
|
|
"""
|
|
for evaluator in self.evaluators:
|
|
if hasattr(evaluator, "initialize"):
|
|
evaluator.initialize(self)
|
|
|
|
def initialize_ontologies(self, context):
|
|
ontologies = self.om.current_sdp().load_ontologies()
|
|
if not ontologies:
|
|
return
|
|
for ontology_name in list(reversed(ontologies))[1:]:
|
|
self.om.push_ontology(ontology_name, False)
|
|
self.initialize_services_deferred(context, False)
|
|
|
|
# @profile(filename="profile_80")
|
|
def evaluate_user_input(self, text: str, user_name="kodjo"):
|
|
"""
|
|
Note to KSI: If you try to add execution context to this function,
|
|
You may end in an infinite loop
|
|
:param text:
|
|
:param user_name:
|
|
:return:
|
|
"""
|
|
# self.log.debug(f"Processing user input '{text}', {user_name=}.")
|
|
# my_debug(f"****************** Processing user input '{text}', {user_name=}.***********************************")
|
|
event = Event(text, user_name)
|
|
self.om.save_event(event)
|
|
|
|
with ExecutionContext(self.key,
|
|
event,
|
|
self,
|
|
BuiltinConcepts.PROCESS_INPUT,
|
|
text,
|
|
desc=f"Evaluating '{text}'") as execution_context:
|
|
|
|
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
|
|
execution_context.add_inputs(user_input=user_input)
|
|
|
|
# TODO. Must be a context hint, not a return value
|
|
reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED))
|
|
|
|
ret = self.execute(execution_context, [user_input, reduce_requested], EXECUTE_STEPS)
|
|
execution_context.add_values(return_values=ret)
|
|
|
|
# rule management
|
|
if self.enable_process_rules:
|
|
ret = self.execute_rules(execution_context, ret, RULES_EVALUATE_STEPS, RULES_EXECUTE_STEPS)
|
|
|
|
if self.om.is_dirty():
|
|
self.om.commit(execution_context)
|
|
|
|
self.publish(execution_context, EVENT_USER_INPUT_EVALUATED)
|
|
|
|
# Do not save execution contexts from process_return_values
|
|
if self.enable_process_return_values:
|
|
self.process_return_values(execution_context, ret)
|
|
|
|
return ret
|
|
|
|
def print(self, result, instructions=None):
|
|
"""
|
|
Print the result to output
|
|
:param result:
|
|
:param instructions:
|
|
:return:
|
|
"""
|
|
self.printer_handler.print(result, instructions)
|
|
|
|
def resolve(self, concept):
|
|
"""
|
|
Try to find a concept by its name, id, or c:: definition
|
|
A new instance (using new_from_template()) is returned when it's possible
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
def add_recognized_by(c, _recognized_by):
|
|
c.set_hint(BuiltinConcepts.RECOGNIZED_BY, _recognized_by)
|
|
return c
|
|
|
|
def new_instances(concepts, _recognized_by):
|
|
if hasattr(concepts, "__iter__"):
|
|
return [add_recognized_by(self.new_from_template(c, c.key), _recognized_by) for c in concepts]
|
|
return add_recognized_by(self.new_from_template(concepts, concepts.key), _recognized_by)
|
|
|
|
if concept is None:
|
|
return None
|
|
|
|
# ##############
|
|
# PREPROCESS
|
|
# ##############
|
|
# if the entry is a concept token, use its values.
|
|
if isinstance(concept, Token):
|
|
if concept.type == TokenKind.RULE: # do not recognize rules !!!
|
|
return None
|
|
concept = concept.value # concept is now a tuple
|
|
|
|
if isinstance(concept, str) and \
|
|
concept.startswith("c:") and \
|
|
(tmp := core.utils.unstr_concept(concept)) != (None, None):
|
|
concept = tmp
|
|
|
|
# ##############
|
|
# PROCESS
|
|
# ##############
|
|
# if the entry is a tuple
|
|
# concept[0] is the name
|
|
# concept[1] is the id
|
|
if isinstance(concept, tuple):
|
|
if concept[1]:
|
|
if self.is_known(found := self.get_by_id(concept[1])):
|
|
instance = self.new_from_template(found, found.key)
|
|
instance._metadata.is_evaluated = True
|
|
instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, RECOGNIZED_BY_ID)
|
|
return instance
|
|
elif concept[0]:
|
|
if self.is_known(found := self.get_by_name(concept[0])):
|
|
instances = new_instances(found, RECOGNIZED_BY_NAME)
|
|
core.builtin_helpers.set_is_evaluated(instances)
|
|
return instances
|
|
else:
|
|
return None
|
|
|
|
# otherwise search in db
|
|
if isinstance(concept, str):
|
|
if self.is_known(found := self.get_by_name(concept)):
|
|
instances = new_instances(found, RECOGNIZED_BY_NAME)
|
|
core.builtin_helpers.set_is_evaluated(instances, check_nb_variables=True)
|
|
return instances
|
|
|
|
return None
|
|
|
|
def fast_resolve(self, key, return_new=True):
|
|
def add_recognized_by(c, _recognized_by):
|
|
c.set_hint(BuiltinConcepts.RECOGNIZED_BY, _recognized_by)
|
|
return c
|
|
|
|
def new_instances(concepts, _recognized_by):
|
|
if hasattr(concepts, "__iter__"):
|
|
return [add_recognized_by(self.new_from_template(c, c.key), _recognized_by) for c in concepts]
|
|
|
|
return add_recognized_by(self.new_from_template(concepts, concepts.key), _recognized_by)
|
|
|
|
if isinstance(key, Token):
|
|
if key.type == TokenKind.RULE: # do not recognize rules !!!
|
|
return None
|
|
else:
|
|
key = key.value
|
|
elif isinstance(key, str) and key.startswith("c:"):
|
|
key = core.utils.unstr_concept(key)
|
|
|
|
if isinstance(key, tuple):
|
|
if key == (None, None):
|
|
return None
|
|
|
|
if key[1]:
|
|
concept = self.om.get(self.CONCEPTS_BY_ID_ENTRY, key[1])
|
|
recognized_by = RECOGNIZED_BY_ID
|
|
else:
|
|
concept = self.om.get(self.CONCEPTS_BY_NAME_ENTRY, key[0])
|
|
recognized_by = RECOGNIZED_BY_NAME
|
|
else:
|
|
concept = self.om.get(self.CONCEPTS_BY_NAME_ENTRY, key)
|
|
recognized_by = RECOGNIZED_BY_NAME
|
|
|
|
if concept is NotFound:
|
|
return None
|
|
return new_instances(concept, recognized_by) if return_new else concept
|
|
|
|
def new(self, concept_key, **kwargs):
|
|
"""
|
|
Returns an instance of a new concept
|
|
When the concept is supposed to be unique, returns the same instance
|
|
:param concept_key:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
if isinstance(concept_key, tuple):
|
|
concept_key, concept_id = concept_key[0], concept_key[1]
|
|
elif isinstance(concept_key, Concept):
|
|
concept_key, concept_id = concept_key.key, concept_key.id
|
|
else:
|
|
concept_id = None
|
|
|
|
template = self.get_by_id(concept_id) if not concept_key else self.get_by_key(concept_key, concept_id)
|
|
|
|
# manage concept not found
|
|
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
|
|
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
|
|
return template
|
|
|
|
if isinstance(template, list):
|
|
# if template is a list, it means that there a multiple concepts under the same key
|
|
concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template]
|
|
return concepts
|
|
else:
|
|
return self.new_from_template(template, concept_key, **kwargs)
|
|
|
|
def new_from_template(self, template, key, **kwargs):
|
|
# core.utils.my_debug(f"Created {template}, {key=}, {kwargs=}")
|
|
# manage singleton
|
|
if template.get_metadata().is_unique:
|
|
return template
|
|
|
|
# otherwise, create another instance
|
|
concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept()
|
|
concept.update_from(template, update_value=False)
|
|
# concept.freeze_definition_hash()
|
|
|
|
if len(kwargs) == 0:
|
|
return concept
|
|
|
|
# update the properties, values, attributes
|
|
# Not quite sure that this is the correct process order
|
|
for k, v in kwargs.items():
|
|
if k in get_concept_attrs(concept):
|
|
concept.set_value(k, v)
|
|
elif k == "body":
|
|
concept.set_value(ConceptParts.BODY, v)
|
|
elif hasattr(concept, k):
|
|
setattr(concept, k, v)
|
|
else:
|
|
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
|
|
|
|
# TODO : add the concept to the list of known concepts (self.instances)
|
|
concept._metadata.is_evaluated = True # because we have manually set the variables
|
|
return concept
|
|
|
|
def push_ontology(self, context, name, cache_only=False):
|
|
|
|
try:
|
|
if self.om.already_on_top(name):
|
|
return self.ret(self.name, True, self.new(BuiltinConcepts.SUCCESS))
|
|
except OntologyAlreadyExists:
|
|
return self.ret(self.name, False, self.new(BuiltinConcepts.ONTOLOGY_ALREADY_DEFINED, body=name))
|
|
|
|
# record sheerka and services states
|
|
self.om.record_sheerka_state()
|
|
for service in self.services.values():
|
|
if hasattr(service, "save_state"):
|
|
service.save_state(context)
|
|
if hasattr(service, "reset_state"):
|
|
service.reset_state()
|
|
|
|
self.om.push_ontology(name, cache_only)
|
|
|
|
# Not the first time for this ontology. Update the services
|
|
if name in self.om.current_sdp().load_ontologies():
|
|
self.initialize_services_deferred(context, False)
|
|
|
|
self.om.save_ontologies_names()
|
|
self.publish(context, EVENT_ONTOLOGY_CREATED, name)
|
|
|
|
return self.ret(self.name, True, self.new(BuiltinConcepts.SUCCESS))
|
|
|
|
def pop_ontology(self, context):
|
|
ontology = self.om.pop_ontology(context)
|
|
|
|
self.om.reset_sheerka_state()
|
|
for service in self.services.values():
|
|
if hasattr(service, "restore_state"):
|
|
service.restore_state()
|
|
if hasattr(service, "reset_state"):
|
|
service.reset_state()
|
|
|
|
self.om.save_ontologies_names()
|
|
return self.ret(self.name, True, self.new(BuiltinConcepts.ONTOLOGY_REMOVED, body=ontology))
|
|
|
|
def get_ontology(self, context):
|
|
self.om.record_sheerka_state()
|
|
for service in self.services.values():
|
|
if hasattr(service, "save_state"):
|
|
service.save_state(context)
|
|
|
|
return self.om.get_ontology()
|
|
|
|
def add_ontology(self, context, ontology):
|
|
"""
|
|
Add the previously recorded ontology on the top
|
|
"""
|
|
|
|
# save the state of the current ontology
|
|
self.om.record_sheerka_state()
|
|
for service in self.services.values():
|
|
if hasattr(service, "save_state"):
|
|
service.save_state(context)
|
|
# if hasattr(service, "reset_state"): # no need to do it twice
|
|
# service.reset_state()
|
|
|
|
self.om.add_ontology(ontology)
|
|
|
|
# update sheerka with this new ontology
|
|
self.om.reset_sheerka_state()
|
|
for service in self.services.values():
|
|
if hasattr(service, "restore_state"):
|
|
service.restore_state()
|
|
if hasattr(service, "reset_state"):
|
|
service.reset_state()
|
|
|
|
def ret(self, who: str, status: bool, value, parents=None):
|
|
"""
|
|
Creates and returns a ReturnValue concept
|
|
:param who:
|
|
:param status:
|
|
:param value:
|
|
:param parents:
|
|
:return:
|
|
"""
|
|
|
|
return ReturnValueConcept(
|
|
who=who,
|
|
status=status,
|
|
value=value,
|
|
parents=parents,
|
|
concept_id=self.return_value_concept_id
|
|
)
|
|
|
|
def err(self, body):
|
|
return ErrorConcept(body, self.error_concept_id)
|
|
|
|
def objvalue(self, obj, reduce_simple_list=False):
|
|
if obj is None:
|
|
return None
|
|
|
|
if hasattr(obj, "get_obj_value"):
|
|
return obj.get_obj_value()
|
|
|
|
if not isinstance(obj, Concept):
|
|
return obj
|
|
|
|
if obj.body is NotInit:
|
|
return obj
|
|
|
|
if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1:
|
|
body_to_use = obj.body[0]
|
|
else:
|
|
body_to_use = obj.body
|
|
|
|
return self.objvalue(body_to_use)
|
|
|
|
def objvalues(self, objs):
|
|
if not (isinstance(objs, list) or
|
|
self.isinstance(objs, BuiltinConcepts.LIST) or
|
|
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
|
|
objs = [objs]
|
|
|
|
if isinstance(objs, list):
|
|
return (self.objvalue(obj) for obj in objs)
|
|
|
|
return (self.objvalue(obj) for obj in objs.body)
|
|
|
|
def get_errors(self, obj, **kwargs):
|
|
"""
|
|
Browse obj, looking for error
|
|
:param obj:
|
|
:param kwargs: if defined, specialize the error
|
|
:return:
|
|
"""
|
|
|
|
def filter_by_type(x, name):
|
|
if isinstance(x, Concept):
|
|
return x.name == name
|
|
else:
|
|
return type(x).__name__ == name
|
|
|
|
def filter_by_attribute(x, attr_name, attr_value):
|
|
if hasattr(x, "as_bag"):
|
|
try:
|
|
return x.as_bag()[attr_name] == attr_value
|
|
except KeyError:
|
|
return False
|
|
else:
|
|
try:
|
|
return getattr(x, attr_name) == attr_value
|
|
except AttributeError:
|
|
return False
|
|
|
|
def and_filter(x, cond):
|
|
for c in cond:
|
|
if not c(x):
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_error(_obj):
|
|
if isinstance(_obj, ErrorObj):
|
|
return True
|
|
|
|
if isinstance(_obj, Concept) and _obj.get_metadata().is_builtin and _obj.key in BuiltinErrors:
|
|
return True
|
|
|
|
return False
|
|
|
|
def filter_objects(_objects):
|
|
if kwargs:
|
|
cond = []
|
|
for k, v in kwargs.items():
|
|
if k == "__type":
|
|
expected_type = v
|
|
cond.append(lambda x: filter_by_type(x, expected_type))
|
|
else:
|
|
attr_name = k
|
|
expect_value = v
|
|
cond.append(lambda x: filter_by_attribute(x, attr_name, expect_value))
|
|
|
|
if len(cond) > 1:
|
|
copy_of_conditions = cond.copy()
|
|
full_cond = lambda x: and_filter(x, copy_of_conditions)
|
|
|
|
else:
|
|
full_cond = cond[0]
|
|
|
|
return [o for o in _objects if full_cond(o)]
|
|
|
|
return _objects
|
|
|
|
def inner_get_errors(_obj):
|
|
if self.isinstance(_obj, BuiltinConcepts.RETURN_VALUE) and _obj.status:
|
|
return []
|
|
|
|
if isinstance(_obj, (list, set, tuple)):
|
|
return core.utils.flatten([inner_get_errors(o) for o in _obj])
|
|
|
|
if is_error(_obj):
|
|
if isinstance(_obj, Concept) and _obj.body not in (NotInit, None):
|
|
return [_obj] + inner_get_errors(_obj.body)
|
|
else:
|
|
return [_obj]
|
|
|
|
if isinstance(_obj, Concept) and _obj.body != NotInit:
|
|
return inner_get_errors(_obj.body)
|
|
|
|
return []
|
|
|
|
errors = inner_get_errors(obj)
|
|
return filter_objects([e for e in errors])
|
|
|
|
def has_error(self, obj, **kwargs):
|
|
errors = self.get_errors(obj, **kwargs)
|
|
return len(errors) > 0
|
|
|
|
def get_evaluator_name(self, name):
|
|
if self.evaluators_prefix is None:
|
|
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
|
|
self.evaluators_prefix = base_evaluator_class.PREFIX
|
|
|
|
return self.evaluators_prefix + name
|
|
|
|
def get_parser_name(self, name):
|
|
if self.parsers_prefix is None:
|
|
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
self.parsers_prefix = base_parser_class.PREFIX
|
|
|
|
return self.parsers_prefix + name
|
|
|
|
@staticmethod
|
|
def is_success(obj):
|
|
if isinstance(obj, bool): # quick win
|
|
return obj
|
|
|
|
if isinstance(obj, ReturnValueConcept):
|
|
return obj.status
|
|
|
|
if isinstance(obj, ErrorObj):
|
|
return False
|
|
|
|
# other cases ?
|
|
# ...
|
|
|
|
# manage internal errors
|
|
if isinstance(obj, Concept) and obj._metadata.is_builtin and obj.key in BuiltinErrors:
|
|
return False
|
|
|
|
return bool(obj)
|
|
|
|
@staticmethod
|
|
def is_error(obj):
|
|
"""
|
|
opposite of is_success
|
|
"""
|
|
return not Sheerka.is_success(obj)
|
|
|
|
@staticmethod
|
|
def is_known(obj):
|
|
if not isinstance(obj, Concept):
|
|
return True
|
|
|
|
return obj.key not in (None, BuiltinConcepts.UNKNOWN_CONCEPT, BuiltinConcepts.UNKNOWN_RULE)
|
|
|
|
@staticmethod
|
|
def isinstance(a, b):
|
|
"""
|
|
return true if the concept a is an instance of the concept b
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
b_key = b.key if isinstance(b, Concept) else str(b)
|
|
|
|
return a.key == b_key
|
|
|
|
@staticmethod
|
|
def isin(a, b):
|
|
"""
|
|
True if the concept is the list
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
return a.key in b
|
|
|
|
def is_sheerka(self, obj):
|
|
if isinstance(obj, Concept) and obj.id == self.id:
|
|
return True
|
|
|
|
from evaluators.PythonEvaluator import Expando
|
|
if isinstance(obj, Expando) and obj.get_name() == "sheerka":
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_unknown(metadata):
|
|
"""
|
|
Returns the concept 'UnknownConcept' for a requested id or key
|
|
Note that I don't call the new() method to prevent cyclic call
|
|
:param metadata:
|
|
:return:
|
|
"""
|
|
|
|
# metadata is a list of tuple that contains the known metadata for this concept
|
|
# ex : (key, 'not_found)
|
|
# or
|
|
# (id, invalid_id)
|
|
#
|
|
# the metadata can be a list, if several attributes where given
|
|
# (key, 'not_found), (id, invalid_id)
|
|
|
|
unknown_concept = UnknownConcept() # don't use new() for prevent circular reference
|
|
unknown_concept.set_value(ConceptParts.BODY, metadata)
|
|
unknown_concept._metadata.is_evaluated = True
|
|
return unknown_concept
|
|
|
|
@staticmethod
|
|
def get_builtins_classes_as_dict():
|
|
res = {}
|
|
for c in core.utils.get_classes("core.builtin_concepts"):
|
|
if issubclass(c, Concept) and c != Concept:
|
|
res[c()._metadata.key] = c
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def init_logging(debug, loggers):
|
|
def add_coloring_to_emit_ansi(fn):
|
|
# add methods we need to the class
|
|
def new(*args):
|
|
levelno = args[1].levelno
|
|
if levelno >= 50:
|
|
color = '\x1b[31m' # red
|
|
elif levelno >= 40:
|
|
color = '\x1b[31m' # red
|
|
elif levelno >= 30:
|
|
color = '\x1b[33m' # yellow
|
|
elif levelno >= 20:
|
|
color = '\x1b[32m' # green
|
|
elif levelno >= 10:
|
|
color = '\x1b[35m' # pink
|
|
else:
|
|
color = '\x1b[0m' # normal
|
|
args[1].msg = color + str(args[1].msg) + '\x1b[0m' # normal
|
|
# print "after"
|
|
return fn(*args)
|
|
|
|
return new
|
|
|
|
core.sheerka_logger.init_config(loggers)
|
|
if debug:
|
|
log_format = "%(asctime)s"
|
|
if "show-name" in loggers:
|
|
log_format += " %(name)s"
|
|
log_format += " [%(levelname)s] %(message)s"
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_format = "%(message)s"
|
|
log_level = logging.INFO
|
|
|
|
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
|
|
logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
|
|
# uncomment the following line to enable colors
|
|
# logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit)
|
|
|
|
def test_dict(self):
|
|
bag2 = {
|
|
"alpha": "value4",
|
|
"beta": ["item1", "item2", "item3", ]
|
|
}
|
|
bag = {
|
|
"a": "value1",
|
|
"baba": "value2",
|
|
"c": "value1",
|
|
"de": ["item1", "item2", "item3", ],
|
|
"e": bag2
|
|
}
|
|
return self.new(BuiltinConcepts.TO_DICT, body=bag)
|
|
|
|
def test(self):
|
|
return f"I have access to Sheerka !"
|
|
|
|
def test_using_context(self, context, param):
|
|
event = context.event.get_digest()
|
|
return f"I have access to Sheerka ! {param=}, {event=}."
|
|
|
|
def test_error(self):
|
|
raise Exception("I can raise an error")
|
|
|
|
def test_only_add_in_cache(self, concept: Concept):
|
|
"""
|
|
Adds a concept template in cache.
|
|
The cache is used as a proxy before looking at sdp
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
# sanity check
|
|
if concept.key is None:
|
|
concept.init_key()
|
|
|
|
if concept.key is None:
|
|
raise KeyError()
|
|
|
|
self.om.add_concept(concept)
|
|
|
|
return concept
|
|
|
|
@staticmethod
|
|
def deepdiff(a, b):
|
|
from deepdiff import DeepDiff
|
|
ddiff = DeepDiff(a, b, ignore_order=True)
|
|
print(ddiff)
|
|
return ddiff
|
|
|
|
|
|
def to_profile():
|
|
sheerka = Sheerka()
|
|
sheerka.initialize(save_execution_context=False, enable_process_return_values=False)
|
|
event = Event("test", "kodjo")
|
|
execution_context = ExecutionContext(sheerka.name,
|
|
event,
|
|
sheerka,
|
|
BuiltinConcepts.PROCESS_INPUT,
|
|
None)
|
|
|
|
profile_push(execution_context)
|
|
|
|
|
|
@profile(filename="profile_push")
|
|
def profile_push(execution_context):
|
|
for i in range(177942):
|
|
execution_context.push(BuiltinConcepts.NOP,
|
|
{"action": "fake"},
|
|
execution_context.sheerka.name,
|
|
desc="a proper description")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
to_profile()
|