639 lines
22 KiB
Python
639 lines
22 KiB
Python
from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors, BuiltinUnique, \
|
|
UnknownConcept
|
|
from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW
|
|
from core.sheerka.ExecutionContext import ExecutionContext
|
|
from core.sheerka.Services.SheerkaCreateNewConcept import SheerkaCreateNewConcept
|
|
from core.sheerka.Services.SheerkaDump import SheerkaDump
|
|
from core.sheerka.Services.SheerkaEvaluateConcept import SheerkaEvaluateConcept
|
|
from core.sheerka.Services.SheerkaExecute import SheerkaExecute
|
|
from core.sheerka.Services.SheerkaHistoryManager import SheerkaHistoryManager
|
|
from core.sheerka.Services.SheerkaSetsManager import SheerkaSetsManager
|
|
from sdp.sheerkaDataProvider import SheerkaDataProvider, Event
|
|
import core.utils
|
|
import core.builtin_helpers
|
|
|
|
from core.sheerka_logger import console_handler
|
|
|
|
import logging
|
|
|
|
# CONCEPT_EVALUATION_STEPS = [
|
|
# BuiltinConcepts.BEFORE_EVALUATION,
|
|
# BuiltinConcepts.EVALUATION,
|
|
# BuiltinConcepts.AFTER_EVALUATION]
|
|
|
|
CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser"
|
|
CONCEPTS_FILE = "_concepts.txt"
|
|
|
|
|
|
class Sheerka(Concept):
|
|
"""
|
|
Main controller for the project
|
|
"""
|
|
|
|
CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts
|
|
CONCEPTS_BY_ID_ENTRY = "Concepts_By_ID"
|
|
CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts
|
|
BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts
|
|
USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts
|
|
|
|
def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None):
|
|
self.init_logging(debug, loggers)
|
|
|
|
super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA)
|
|
self.log.debug("Starting Sheerka.")
|
|
|
|
# cache of the most used concepts
|
|
# Note that these are only templates
|
|
# They are used as a footprint for instantiation
|
|
# Except of source when the concept is supposed to be unique
|
|
# key is the key of the concept (not the name or the id)
|
|
self.cache_by_key = {}
|
|
self.cache_by_id = {}
|
|
|
|
# cache for concept definitions,
|
|
# Primarily used for unit test that does not have access to sdp
|
|
self.concepts_definition_cache = {}
|
|
|
|
#
|
|
# cache for concepts grammars
|
|
# a grammar is a resolved BNF
|
|
self.concepts_grammars = {}
|
|
|
|
# a concept can be instantiated
|
|
# ex: File is a concept, but File('foo.txt') is an instance
|
|
# TODO: manage contexts
|
|
self.instances = []
|
|
|
|
# List of the known rules by the system
|
|
# ex: hello => say('hello')
|
|
self.rules = []
|
|
|
|
self.sdp: SheerkaDataProvider = None # SheerkaDataProvider
|
|
self.builtin_cache = {} # cache for builtin concepts
|
|
self.parsers = {} # cache for builtin parsers
|
|
self.evaluators = [] # cache for builtin evaluators
|
|
|
|
self.evaluators_prefix: str = None
|
|
self.parsers_prefix: str = None
|
|
|
|
self.skip_builtins_in_db = skip_builtins_in_db
|
|
|
|
self.execute_handler = SheerkaExecute(self)
|
|
self.create_new_concept_handler = SheerkaCreateNewConcept(self)
|
|
self.dump_handler = SheerkaDump(self)
|
|
self.sets_handler = SheerkaSetsManager(self)
|
|
self.evaluate_concept_handler = SheerkaEvaluateConcept(self)
|
|
self.history_handler = SheerkaHistoryManager(self)
|
|
|
|
self.during_restore = False
|
|
|
|
def initialize(self, root_folder: str = None):
|
|
"""
|
|
Starting Sheerka
|
|
Loads the current configuration
|
|
Notes that when it's the first time, it also create the needed working folders
|
|
:param root_folder: root configuration folder
|
|
:return: ReturnValue(Success or Error)
|
|
"""
|
|
|
|
try:
|
|
from sheerkapickle.sheerka_handlers import initialize_pickle_handlers
|
|
initialize_pickle_handlers()
|
|
|
|
self.sdp = SheerkaDataProvider(root_folder)
|
|
if self.sdp.first_time:
|
|
self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000)
|
|
|
|
event = Event("Initializing Sheerka.", user=self.name)
|
|
self.sdp.save_event(event)
|
|
exec_context = ExecutionContext(self.key, event, self)
|
|
|
|
self.initialize_builtin_concepts()
|
|
self.initialize_builtin_parsers()
|
|
self.initialize_builtin_evaluators()
|
|
self.initialize_concepts_definitions(exec_context)
|
|
res = ReturnValueConcept(self, True, self)
|
|
|
|
exec_context.add_values(return_values=res)
|
|
if not self.skip_builtins_in_db:
|
|
self.sdp.save_result(self, exec_context)
|
|
|
|
except IOError as e:
|
|
res = ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e)
|
|
|
|
return res
|
|
|
|
def initialize_builtin_concepts(self):
|
|
"""
|
|
Initializes the builtin concepts
|
|
:return: None
|
|
"""
|
|
self.init_log.debug("Initializing builtin concepts")
|
|
builtins_classes = self.get_builtins_classes_as_dict()
|
|
|
|
# this all initialization of the builtins seems to be little bit complicated
|
|
# why do we need to update it from DB ?
|
|
for key in BuiltinConcepts:
|
|
concept = self if key == BuiltinConcepts.SHEERKA \
|
|
else builtins_classes[str(key)]() if str(key) in builtins_classes \
|
|
else Concept(key, True, False, key)
|
|
|
|
if key in BuiltinUnique:
|
|
concept.metadata.is_unique = True
|
|
concept.metadata.is_evaluated = True
|
|
|
|
if not concept.metadata.is_unique and str(key) in builtins_classes:
|
|
self.builtin_cache[key] = builtins_classes[str(key)]
|
|
|
|
if not self.skip_builtins_in_db:
|
|
from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key)
|
|
if from_db is None:
|
|
self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.")
|
|
self.set_id_if_needed(concept, True)
|
|
self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True)
|
|
else:
|
|
self.init_log.debug(f"Found concept '{from_db}' in db. Updating.")
|
|
concept.update_from(from_db)
|
|
|
|
self.add_in_cache(concept)
|
|
|
|
def initialize_builtin_parsers(self):
|
|
"""
|
|
Init the parsers
|
|
:return:
|
|
"""
|
|
core.utils.init_package_import("parsers")
|
|
base_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
for parser in core.utils.get_sub_classes("parsers", base_class):
|
|
if parser.__module__ == base_class.__module__:
|
|
continue
|
|
|
|
self.init_log.debug(f"Adding builtin parser '{parser.__name__}'")
|
|
self.parsers[core.utils.get_full_qualified_name(parser)] = parser
|
|
|
|
def initialize_builtin_evaluators(self):
|
|
"""
|
|
Init the evaluators
|
|
:return:
|
|
"""
|
|
core.utils.init_package_import("evaluators")
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"):
|
|
self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'")
|
|
self.evaluators.append(evaluator)
|
|
|
|
def initialize_concepts_definitions(self, execution_context):
|
|
self.init_log.debug("Initializing concepts definitions")
|
|
definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False)
|
|
|
|
if definitions is None:
|
|
self.init_log.debug("No BNF defined")
|
|
return
|
|
|
|
lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]()
|
|
ret_val = lexer_parser.initialize(execution_context, definitions)
|
|
if not ret_val.status:
|
|
self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body))
|
|
return
|
|
|
|
self.concepts_grammars = lexer_parser.concepts_grammars
|
|
|
|
def reset_cache(self, filter_to_use=None):
|
|
"""
|
|
reset the different cache that exists
|
|
:param filter_to_use:
|
|
:return:
|
|
"""
|
|
if filter_to_use is None:
|
|
self.cache_by_key = {}
|
|
self.cache_by_id = {}
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
return self
|
|
|
|
def evaluate_user_input(self, text: str, user_name="kodjo"):
|
|
"""
|
|
Note to KSI: If you try to add execution context to this function,
|
|
You may end in an infinite loop
|
|
:param text:
|
|
:param user_name:
|
|
:return:
|
|
"""
|
|
self.log.debug(f"Processing user input '{text}', {user_name=}.")
|
|
event = Event(text, user_name)
|
|
evt_digest = self.sdp.save_event(event)
|
|
self.log.debug(f"{evt_digest=}")
|
|
|
|
with ExecutionContext(self.key, event, self, f"Evaluating '{text}'") as execution_context:
|
|
user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name))
|
|
reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED))
|
|
|
|
steps = [
|
|
BuiltinConcepts.BEFORE_PARSING,
|
|
BuiltinConcepts.PARSING,
|
|
BuiltinConcepts.AFTER_PARSING,
|
|
BuiltinConcepts.BEFORE_EVALUATION,
|
|
BuiltinConcepts.EVALUATION,
|
|
BuiltinConcepts.AFTER_EVALUATION
|
|
]
|
|
|
|
ret = self.execute(execution_context, [user_input, reduce_requested], steps)
|
|
execution_context.add_values(return_values=ret)
|
|
|
|
if not self.skip_builtins_in_db:
|
|
self.sdp.save_result(self, execution_context)
|
|
|
|
# hack to save valid concept definition
|
|
if not self.during_restore:
|
|
if len(ret) == 1 and ret[0].status and self.isinstance(ret[0].value, BuiltinConcepts.NEW_CONCEPT):
|
|
with open(CONCEPTS_FILE, "a") as f:
|
|
f.write(text + "\n")
|
|
return ret
|
|
|
|
def execute(self, execution_context, return_values, execution_steps, logger=None):
|
|
"""
|
|
Executes process for all initial contexts
|
|
:param execution_context:
|
|
:param return_values:
|
|
:param execution_steps:
|
|
:param logger: logger to use (if not directly called by sheerka)
|
|
:return:
|
|
"""
|
|
return self.execute_handler.execute(execution_context, return_values, execution_steps, logger)
|
|
|
|
def set_id_if_needed(self, obj: Concept, is_builtin: bool):
|
|
"""
|
|
Set the key for the concept if needed
|
|
For test purpose only !!!!!
|
|
:param obj:
|
|
:param is_builtin:
|
|
:return:
|
|
"""
|
|
if obj.metadata.id is not None:
|
|
return
|
|
|
|
entry = self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS
|
|
obj.metadata.id = self.sdp.get_next_key(entry)
|
|
self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.")
|
|
|
|
def create_new_concept(self, context, concept: Concept, logger=None):
|
|
"""
|
|
Adds a new concept to the system
|
|
:param context:
|
|
:param concept: DefConceptNode
|
|
:param logger
|
|
:return: digest of the new concept
|
|
"""
|
|
|
|
return self.create_new_concept_handler.create_new_concept(context, concept, logger)
|
|
|
|
def add_concept_to_set(self, context, concept, concept_set, logger=None):
|
|
"""
|
|
Add an entry in sdp to tell that concept isa concept_set
|
|
:param context:
|
|
:param concept:
|
|
:param concept_set:
|
|
:param logger:
|
|
:return:
|
|
"""
|
|
return self.sets_handler.add_concept_to_set(context, concept, concept_set, logger)
|
|
|
|
def get_set_elements(self, concept):
|
|
"""
|
|
Concept is supposed to be a set
|
|
Returns all elements if the set
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
return self.sets_handler.get_set_elements(concept)
|
|
|
|
def evaluate_concept(self, context, concept: Concept, logger=None):
|
|
"""
|
|
Evaluation a concept
|
|
It means that if the where clause is True, will evaluate the body
|
|
:param context:
|
|
:param concept:
|
|
:param logger:
|
|
:return: value of the evaluation or error
|
|
"""
|
|
return self.evaluate_concept_handler.evaluate_concept(context, concept, logger)
|
|
|
|
def add_in_cache(self, concept: Concept):
|
|
"""
|
|
Adds a concept template in cache.
|
|
The cache is used as a proxy before looking at sdp
|
|
:param concept:
|
|
:return:
|
|
"""
|
|
|
|
# sanity check
|
|
if concept.key is None:
|
|
concept.init_key()
|
|
|
|
if concept.key is None:
|
|
raise KeyError()
|
|
|
|
self.cache_by_key[concept.key] = concept
|
|
|
|
if concept.id:
|
|
self.cache_by_id[concept.id] = concept
|
|
|
|
return concept
|
|
|
|
def get(self, concept_key, concept_id=None):
|
|
"""
|
|
Tries to find a concept
|
|
What is return must be used a template for another concept.
|
|
You must not modify the returned concept
|
|
:param concept_key: key of the concept
|
|
:param concept_id: when multiple concepts with the same key, use the id
|
|
:return:
|
|
"""
|
|
|
|
if concept_key is None:
|
|
return ErrorConcept("Concept key is undefined.")
|
|
|
|
if isinstance(concept_key, BuiltinConcepts):
|
|
concept_key = str(concept_key)
|
|
|
|
# first search in cache
|
|
result = self.cache_by_key[concept_key] if concept_key in self.cache_by_key else \
|
|
self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key)
|
|
|
|
if result and (concept_id is None or not isinstance(result, list)):
|
|
return result
|
|
|
|
if isinstance(result, list):
|
|
if concept_id:
|
|
for c in result:
|
|
if c.id == concept_id:
|
|
return c
|
|
else:
|
|
return result
|
|
|
|
metadata = [("key", concept_key), ("id", concept_id)] if concept_id else ("key", concept_key)
|
|
return self._get_unknown(metadata)
|
|
|
|
def get_by_id(self, concept_id):
|
|
if concept_id is None:
|
|
return ErrorConcept("Concept id is undefined.")
|
|
|
|
# first search in cache
|
|
result = self.cache_by_id[concept_id] if concept_id in self.cache_by_id else \
|
|
self.sdp.get_safe(self.CONCEPTS_BY_ID_ENTRY, concept_id)
|
|
|
|
return result or self._get_unknown(('id', concept_id))
|
|
|
|
def get_concept_definition(self):
|
|
if self.concepts_definition_cache:
|
|
return self.concepts_definition_cache
|
|
|
|
self.concepts_definition_cache = self.sdp.get_safe(
|
|
self.CONCEPTS_DEFINITIONS_ENTRY,
|
|
load_origin=False) or {}
|
|
return self.concepts_definition_cache
|
|
|
|
def new(self, concept_key, **kwargs):
|
|
"""
|
|
Returns an instance of a new concept
|
|
When the concept is supposed to be unique, returns the same instance
|
|
:param concept_key:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
if isinstance(concept_key, tuple):
|
|
concept_key, concept_id = concept_key[0], concept_key[1]
|
|
else:
|
|
concept_id = None
|
|
|
|
template = self.get(concept_key, concept_id)
|
|
|
|
# manage concept not found
|
|
if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \
|
|
concept_key != BuiltinConcepts.UNKNOWN_CONCEPT:
|
|
return template
|
|
|
|
if isinstance(template, list):
|
|
# if template is a list, it means that there a multiple concepts under the same key
|
|
concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template]
|
|
return concepts
|
|
else:
|
|
return self.new_from_template(template, concept_key, **kwargs)
|
|
|
|
def new_from_template(self, template, key, **kwargs):
|
|
# manage singleton
|
|
if template.metadata.is_unique:
|
|
return template
|
|
|
|
# otherwise, create another instance
|
|
concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept()
|
|
concept.update_from(template)
|
|
|
|
if len(kwargs) == 0:
|
|
return concept
|
|
|
|
# update the properties, values, attributes
|
|
# Not quite sure that this is the correct process order
|
|
for k, v in kwargs.items():
|
|
if k in concept.props:
|
|
concept.set_prop(k, v)
|
|
elif k in PROPERTIES_FOR_NEW:
|
|
concept.values[ConceptParts(k)] = v
|
|
elif hasattr(concept, k):
|
|
setattr(concept, k, v)
|
|
else:
|
|
return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept)
|
|
|
|
# TODO : add the concept to the list of known concepts (self.instances)
|
|
concept.metadata.is_evaluated = True
|
|
return concept
|
|
|
|
def ret(self, who: str, status: bool, value, message=None, parents=None):
|
|
"""
|
|
Creates and returns a ReturnValue concept
|
|
:param who:
|
|
:param status:
|
|
:param value:
|
|
:param message:
|
|
:param parents:
|
|
:return:
|
|
"""
|
|
return self.new(
|
|
BuiltinConcepts.RETURN_VALUE,
|
|
who=who,
|
|
status=status,
|
|
value=value,
|
|
message=message,
|
|
parents=parents)
|
|
|
|
def value(self, obj, reduce_simple_list=False):
|
|
if obj is None:
|
|
return None
|
|
|
|
if hasattr(obj, "get_value"):
|
|
return obj.get_value()
|
|
|
|
if not isinstance(obj, Concept):
|
|
return obj
|
|
|
|
if obj.body is None:
|
|
return obj
|
|
|
|
if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1:
|
|
body_to_use = obj.body[0]
|
|
else:
|
|
body_to_use = obj.body
|
|
|
|
return self.value(body_to_use)
|
|
|
|
def get_values(self, objs):
|
|
if not (isinstance(objs, list) or
|
|
self.isinstance(objs, BuiltinConcepts.LIST) or
|
|
self.isinstance(objs, BuiltinConcepts.ENUMERATION)):
|
|
objs = [objs]
|
|
|
|
return (self.value(obj) for obj in objs)
|
|
|
|
def is_success(self, obj):
|
|
if isinstance(obj, bool): # quick win
|
|
return obj
|
|
|
|
if isinstance(obj, ReturnValueConcept):
|
|
return obj.status
|
|
|
|
if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors:
|
|
return False
|
|
|
|
return obj
|
|
|
|
def is_known(self, obj):
|
|
if not isinstance(obj, Concept):
|
|
return True
|
|
|
|
return obj.key != str(BuiltinConcepts.UNKNOWN_CONCEPT)
|
|
|
|
def isinstance(self, a, b):
|
|
"""
|
|
return true if the concept a is an instance of the concept b
|
|
:param a:
|
|
:param b:
|
|
:return:
|
|
"""
|
|
|
|
if isinstance(a, BuiltinConcepts): # common KSI error ;-)
|
|
raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept")
|
|
|
|
if not isinstance(a, Concept):
|
|
return False
|
|
|
|
b_key = b.key if isinstance(b, Concept) else str(b)
|
|
|
|
return a.key == b_key
|
|
|
|
def isa(self, a, b):
|
|
return self.sets_handler.isa(a, b)
|
|
|
|
def isaset(self, concept):
|
|
return self.sets_handler.isaset(concept)
|
|
|
|
def get_evaluator_name(self, name):
|
|
if self.evaluators_prefix is None:
|
|
base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator")
|
|
self.evaluators_prefix = base_evaluator_class.PREFIX
|
|
|
|
return self.evaluators_prefix + name
|
|
|
|
def get_parser_name(self, name):
|
|
if self.parsers_prefix is None:
|
|
base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser")
|
|
self.parsers_prefix = base_parser_class.PREFIX
|
|
|
|
return self.parsers_prefix + name
|
|
|
|
def concepts(self):
|
|
res = []
|
|
lst = self.sdp.list(self.CONCEPTS_ENTRY)
|
|
for item in lst:
|
|
if isinstance(item, list):
|
|
res.extend(item)
|
|
else:
|
|
res.append(item)
|
|
|
|
return sorted(res, key=lambda i: int(i.id))
|
|
|
|
def history(self, page=10, start=0):
|
|
"""Gets the history of all commands"""
|
|
return self.history_handler.history(page, start)
|
|
|
|
def restore(self):
|
|
"""
|
|
Restore the state with all previous valid concept definitions
|
|
:return:
|
|
"""
|
|
try:
|
|
self.during_restore = True
|
|
with open(CONCEPTS_FILE, "r") as f:
|
|
for line in f.readlines():
|
|
self.log.info(line.strip())
|
|
self.evaluate_user_input(line)
|
|
self.during_restore = False
|
|
except IOError:
|
|
pass
|
|
|
|
def test(self):
|
|
return f"I have access to Sheerka !"
|
|
|
|
def test_error(self):
|
|
raise Exception("I can raise an error")
|
|
|
|
@staticmethod
|
|
def _get_unknown(metadata):
|
|
"""
|
|
Returns the concept 'UnknownConcept' for a requested id or key
|
|
Note that I don't call the new() method to prevent cyclic call
|
|
:param metadata:
|
|
:return:
|
|
"""
|
|
|
|
# metadata is a list of tuple that contains the known metadata for this concept
|
|
# ex : (key, 'not_found)
|
|
# or
|
|
# (id, invalid_id)
|
|
#
|
|
# the metadata can be a list, if several attributes where given
|
|
# (key, 'not_found), (id, invalid_id)
|
|
|
|
unknown_concept = UnknownConcept()
|
|
unknown_concept.set_metadata_value(ConceptParts.BODY, metadata)
|
|
for meta in (metadata if isinstance(metadata, list) else [metadata]):
|
|
unknown_concept.set_prop(meta[0], meta[1])
|
|
unknown_concept.metadata.is_evaluated = True
|
|
return unknown_concept
|
|
|
|
@staticmethod
|
|
def get_builtins_classes_as_dict():
|
|
res = {}
|
|
for c in core.utils.get_classes("core.builtin_concepts"):
|
|
if issubclass(c, Concept) and c != Concept:
|
|
res[c().metadata.key] = c
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def init_logging(debug, loggers):
|
|
core.sheerka_logger.set_enabled(loggers)
|
|
if debug:
|
|
# log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
|
|
log_format = "%(asctime)s [%(levelname)s] %(message)s"
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_format = "%(message)s"
|
|
log_level = logging.INFO
|
|
|
|
logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler])
|