import logging import time from core.builtin_concepts import BuiltinConcepts from core.concept import Concept from core.sheerka.services.SheerkaExecute import NO_MATCH from core.sheerka_logger import get_logger from sdp.sheerkaDataProvider import Event DEBUG_TAB_SIZE = 4 PROPERTIES_TO_SERIALIZE = ("_id", "_bag", "_start", "_stop", "who", "desc", "children", "inputs", "values", "obj", "concepts") class ExecutionContext: """ To keep track of the execution of a request """ ids = {} @staticmethod def get_id(event_digest): if event_digest in ExecutionContext.ids: ExecutionContext.ids[event_digest] += 1 else: ExecutionContext.ids[event_digest] = 0 return ExecutionContext.ids[event_digest] def __init__(self, who, event: Event, sheerka, action: BuiltinConcepts, action_context, desc: str = None, logger=None, global_hints=None, global_errors=None, **kwargs): self._parent = None self._id = ExecutionContext.get_id(event.get_digest()) if event else None self._tab = "" self._bag = {} # context variables self._start = 0 # when the execution starts (to measure elapsed time) self._stop = 0 # when the execution stops (to measure elapses time) self._format_instructions = None # how to print the execution context self.who = who # who is asking self.event = event # what was the (original) trigger self.sheerka = sheerka # sheerka self.action = action self.action_context = action_context self.desc = desc # human description of what is going on self.children = [] self.preprocess = None self.logger = logger self.local_hints = set() self.global_hints = set() if global_hints is None else global_hints self.global_errors = [] if global_errors is None else global_errors self.inputs = {} # what was the parameters of the execution context self.values = {} # what was produced by the execution context self.obj = kwargs.pop("obj", None) # current obj we are working on self.concepts = kwargs.pop("concepts", {}) # known concepts specific to this context # update the other elements for k, v in kwargs.items(): self._bag[k] = v self.stat_log = get_logger("stats") self.show_stats = False @property def elapsed(self): if self._start == 0: return 0 return (self._stop if self._stop > 0 else time.time_ns()) - self._start @property def elapsed_str(self): nano_sec = self.elapsed dt = nano_sec / 1e6 return f"{dt} ms" if dt < 1000 else f"{dt / 1000} s" @property def id(self): return self._id def __getattr__(self, item): if item in self._bag: return self._bag[item] raise AttributeError(f"'ExecutionContext' object has no attribute '{item}'") def __enter__(self): self._start = time.time_ns() self.log_new() return self def __exit__(self, exc_type, exc_val, exc_tb): self._stop = time.time_ns() if self.show_stats: self.stat_log.debug(f"[{self._id:2}]" + self._tab + "Execution time: " + self.elapsed_str) def __repr__(self): msg = f"ExecutionContext(who={self.who}, id={self._id}, action={self.action}, context={self.action_context}" if self.desc: msg += f", desc='{self.desc}'" msg += ")" return msg # def __str__(self): # msg = self.desc or "New Context" # msg += f", who={self.who}, id={self.id}" # return msg def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, ExecutionContext): return False for prop in PROPERTIES_TO_SERIALIZE: if prop == "who": value = str(getattr(self, prop)) other_value = str(getattr(other, prop)) else: value = getattr(self, prop) other_value = getattr(other, prop) if value != other_value: return False return True def push(self, action: BuiltinConcepts, action_context, who=None, desc=None, logger=None, **kwargs): who = who or self.who logger = logger or self.logger _kwargs = {"obj": self.obj, "concepts": self.concepts} _kwargs.update(self._bag) _kwargs.update(kwargs) new = ExecutionContext( who, self.event, self.sheerka, action, action_context, desc, logger, self.global_hints, self.global_errors, **_kwargs) new._parent = self new._tab = self._tab + " " * DEBUG_TAB_SIZE new.preprocess = self.preprocess new.local_hints.update(self.local_hints) self.children.append(new) return new def add_preprocess(self, name, **kwargs): preprocess = self.sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS) preprocess.set_value("name", name) for k, v in kwargs.items(): preprocess.set_value(k, v) if not self.preprocess: self.preprocess = [] self.preprocess.append(preprocess) return self def add_inputs(self, **kwargs): for k, v in kwargs.items(): self.inputs[k] = v return self def add_values(self, **kwargs): for k, v in kwargs.items(): self.values[k] = v return self def get_concept(self, key): # search in obj if isinstance(self.obj, Concept): if self.obj.key == key: return self.obj for var_name in self.obj.values: if var_name == key: value = self.obj.get_value(var_name) if isinstance(value, Concept): return value # search in concepts if self.concepts: for k, c in self.concepts.items(): if k == key: return c return self.sheerka.get_by_key(key) def new_concept(self, key, **kwargs): # search in obj if self.obj: if self.obj.key == key: return self.sheerka.new_from_template(self.obj, key, **kwargs) for var_name in self.obj.values: if var_name == key: value = self.obj.get_value(var_name) if isinstance(value, Concept): return self.sheerka.new_from_template(value, key, **kwargs) else: return value if self.concepts: for k, c in self.concepts.items(): if k == key: return self.sheerka.new_from_template(c, key, **kwargs) return self.sheerka.new(key, **kwargs) def log_new(self): if self.logger and not self.logger.disabled: self.logger.debug(f"[{self._id:2}]" + self._tab + str(self)) self.show_stats = True def log(self, message, who=None): if self.logger and not self.logger.disabled: self.logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_error(self, message, who=None, exc=None): self.global_errors.append(exc or message) if self.logger and not self.logger.disabled: self.logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_result(self, return_values): if not self.logger or not self.logger.isEnabledFor(logging.DEBUG): return if len(return_values) == 0: self.logger.debug(self._tab + "No return value") for r in return_values: to_str = self.return_value_to_str(r) self.logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str) def get_parent(self): return self._parent def in_context(self, concept_key): if concept_key in self.local_hints: return True if concept_key in self.global_hints: return True return False def in_current_context(self, concept_key): return concept_key in self.local_hints @staticmethod def _is_return_value(obj): return isinstance(obj, Concept) and obj.key == str(BuiltinConcepts.RETURN_VALUE) def _at_least_one_success(self, return_values): status = False for ret_val in return_values: if not self._is_return_value(ret_val): return None status |= ret_val.status return status def _all_success(self, return_values): status = True for ret_val in return_values: if not self._is_return_value(ret_val): return None status &= ret_val.status return status def get_status(self): # In the function, I cannot use sheerka.isinstance() as self.sheerka may not be initialized # This is the case when ExecutionContext is deserialized if "return_values" not in self.values: return None if hasattr(self.values["return_values"], "__iter__"): values = self.values["return_values"] if len(values) == 0: return None if isinstance(values, str): return "No Match" if values == NO_MATCH else values if isinstance(values[0], dict): for result in values: if "return_value" not in result: return None if self._is_return_value(result["return_value"]): return result["return_value"].status return "No Match" else: return self._at_least_one_success(self.values["return_values"]) else: ret_val = self.values["return_values"] if not isinstance(ret_val, Concept) or not ret_val.key == str(BuiltinConcepts.RETURN_VALUE): return None return ret_val.status def as_bag(self): """ Creates a dictionary with the useful properties of the concept It quicker to implement than creating the actual property mechanism with @property And it removes the visibility from the other attributes/methods """ bag = {} for k, v in self._bag.items(): bag[k] = v bag["bag." + k] = v for prop in ("id", "who", "desc", "obj", "inputs", "values", "concepts"): bag[prop] = getattr(self, prop) bag["status"] = self.get_status() bag["elapsed"] = self.elapsed bag["elapsed_str"] = self.elapsed_str bag["digest"] = self.event.get_digest() if self.event else None return bag @staticmethod def return_value_to_str(r): value = str(r.value) if len(value) > 50: value = value[:47] + "..." to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})" return to_str def get_format_instructions(self): return self._format_instructions def set_format_instructions(self, instructions): self._format_instructions = instructions def get_parents(self, predicate=None): """ Gets all the parents that match the given predicate :param predicate: :return: """ res = [] current = self while True: parent = current._parent if parent: if predicate is None or predicate(parent): res.append(parent) current = parent else: break return res