from cache.FastCache import FastCache from core.builtin_concepts_ids import BuiltinConcepts, BuiltinContainers from core.concept import Concept, ConceptParts from core.sheerka.services.SheerkaEvaluateRules import SheerkaEvaluateRules from core.sheerka.services.sheerka_service import BaseService from core.tokenizer import TokenKind, Tokenizer from core.utils import as_bag from sheerkapython.python_wrapper import ObjectContainer, create_namespace, get_type from sheerkaql.lexer import Lexer from sheerkaql.parser import Parser class SheerkaQueryManager(BaseService): """ This class manage the queries on objects across the system """ NAME = "QueryManager" OBJECTS_ROOT_ALIAS = "__xxx__objects__xx__" QUERY_PARAMETER_PREFIX = "__xxx__query_parameter__xx__" MAPPING_PREFIX = "__xxx__map__xx__" def __init__(self, sheerka): super().__init__(sheerka, order=16) self.queries = FastCache() self.conditions = FastCache() self.lexer = Lexer() self.rule_evaluator = None def initialize(self): self.sheerka.bind_service_method(self.NAME, self.filter_objects, False) self.sheerka.bind_service_method(self.NAME, self.select_objects, False) self.sheerka.bind_service_method(self.NAME, self.collect_attributes, False) self.sheerka.bind_service_method(self.NAME, self.filter_objects, False) self.sheerka.bind_service_method(self.NAME, self.select_objects, False, as_name="pipe_select") self.sheerka.bind_service_method(self.NAME, self.collect_attributes, False, as_name="pipe_props") self.sheerka.bind_service_method(self.NAME, self.where_on_objects, False, as_name="pipe_where") self.sheerka.register_debug_vars(SheerkaQueryManager.NAME, "filter_objects", "query") def initialize_deferred(self, context, is_first_time): self.rule_evaluator = self.sheerka.services[SheerkaEvaluateRules.NAME] def get_query_by_kwargs(self, local_namespace, use_mapping, **kwargs): """ Create a predicate using kwargs and filter the result :param local_namespace: :param use_mapping: True if a mapping is to be used :param kwargs: :return: """ if not kwargs: return None self_ident = f"{self.MAPPING_PREFIX}(self)" if use_mapping else "self" objects_in_context_index = 0 conditions = [] for k, v in kwargs.items(): current_variable_name = f"{self.QUERY_PARAMETER_PREFIX}_{objects_in_context_index:02}" objects_in_context_index += 1 local_namespace[current_variable_name] = v if k == "__type": conditions.append(f"get_type({self_ident}) == {current_variable_name}") elif k in ("__self", "_"): conditions.append(f"{self_ident} == {current_variable_name}") elif k.endswith("_contains"): prop_name = k[:-9] conditions.append(f"{current_variable_name} in {self_ident}.{prop_name}") else: conditions.append(f"{self_ident}.{k} == {current_variable_name}") return ' and '.join(conditions) def get_updated_predicate(self, predicate, use_mapping): if predicate is None: return None if use_mapping: predicate = predicate.replace("self", f"{self.MAPPING_PREFIX}(self)") return predicate def filter_objects(self, context, objects, mapping=None, predicate=None, **kwargs): """ filter the given objects using the conditions from kwargs for each k,v in kwargs, the equality k == v is added for k starting with a double underscore '__', a special treatment may be done __type : get the type of object (in Sheerka world) :param context: :param objects: :param mapping: mapping to execute on each object before applying the predicate (lambda obj:obj) :param predicate: :param kwargs: :return: """ debugger = context.get_debugger(SheerkaQueryManager.NAME, "filter_objects") original_container = None if isinstance(objects, Concept) and objects.key in BuiltinContainers: original_container = objects objects = objects.body debugger.debug_entering(nb_objects=len(objects), predicate=predicate, **kwargs) local_namespace = {} if mapping is None else {self.MAPPING_PREFIX: mapping} query_by_kwargs = self.get_query_by_kwargs(local_namespace, mapping is not None, **kwargs) predicate = self.get_updated_predicate(predicate, mapping is not None) if predicate is not None and query_by_kwargs is not None: query = f"({predicate}) and ({query_by_kwargs})" elif predicate is not None: query = predicate elif query_by_kwargs is not None: query = query_by_kwargs else: query = None if debugger.is_enabled(): debugger.debug_var("query", query) for k, v in local_namespace.items(): debugger.debug_var("query_parameter", f"{k} = {v}") if query and query in self.conditions: # Then try using RuleManager objects = self.execute_conditions(context, query, objects, local_namespace) elif query: try: # Fist try with the FLWR parser full_query = f"{self.OBJECTS_ROOT_ALIAS}.items[{query}]" objects = self.execute_flwr_query(context, full_query, objects, local_namespace) except SyntaxError: # Then try using RuleManager objects = self.execute_conditions(context, query, objects, local_namespace) if original_container: original_container.set_value(ConceptParts.BODY, objects) return original_container else: return objects def select_objects(self, context, objects, *props, **kwargs): """ From the input objects, create output objects The definition of these new objects can come from a flwr query :param context: :param objects: :param query: :return: """ def sanitize_property(p): if not isinstance(p, str): raise SyntaxError(f"{p} is not the name of an attribute") tokens = list(Tokenizer(p, yield_eof=False)) if len(tokens) == 1 and tokens[0].type == TokenKind.IDENTIFIER: return f"self.{p}" else: return p original_container = None if isinstance(objects, Concept) and objects.key in BuiltinContainers: original_container = objects objects = objects.body requested_properties = [sanitize_property(prop) for prop in props] if kwargs: items = [f"'{k}': {sanitize_property(v)}" for k, v in kwargs.items()] requested_properties.append("{" + ", ".join(items) + "}") query = f"for self in {self.OBJECTS_ROOT_ALIAS}.items return " + ", ".join(requested_properties) objects = self.execute_flwr_query(context, query, objects, {}) if original_container: original_container.set_value(ConceptParts.BODY, objects) return original_container else: return objects def collect_attributes(self, objects, take=10): """ Given a list of object, returns the attributes that can be requested :param objects: :param take: no need to browse the whole list, you can just use a sample :return: """ if isinstance(objects, Concept) and objects.key in BuiltinContainers: objects = objects.body result = {} for obj in (objects if take <= 0 else objects[:take]): object_type = get_type(obj) attrs = set(p for p in as_bag(obj).keys() if p != "self") if object_type is result: result[object_type].update(attrs) else: result[object_type] = attrs result = {k: sorted(list(v)) for k, v in result.items()} return self.sheerka.new(BuiltinConcepts.TO_DICT, body=result) def execute_flwr_query(self, context, query, objects, local_namespace): """ Execute a FLWOR query on objects :param context: :param query: query to execute (as a string to compile) :param objects: list of objects to filter :param local_namespace: objects of the namespace that are already known :return: """ if query not in self.queries: parser = Parser() compiled_query = parser.parse(bytes(query, 'utf-8').decode('unicode_escape'), lexer=self.lexer) self.queries.put(query, (compiled_query, parser.names, parser.sheerka_names)) compiled_query, names, sheerka_names = self.queries.get(query) namespace = create_namespace(context, self.NAME, names, sheerka_names, {self.OBJECTS_ROOT_ALIAS: ObjectContainer(objects)}, expression_only=True, allow_builtins=True) namespace.update(local_namespace) # override if needed return compiled_query(namespace) def execute_conditions(self, context, query, objects, local_namespace): """ :param context: :param query: :param objects: :param local_namespace: :return: """ if query not in self.conditions: from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager rule_manager = self.sheerka.services[SheerkaRuleManager.NAME] compilation_results = rule_manager.compile_when(context, self.NAME, query) self.conditions.put(query, compilation_results.python_conditions) conditions = self.conditions.get(query) results = [] with context.push(BuiltinConcepts.EXEC_CODE, query) as sub_context: sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) sub_context.protected_hints.add(BuiltinConcepts.EVAL_WHERE_REQUESTED) sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED) sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED) sub_context.sheerka.add_many_to_short_term_memory(sub_context, local_namespace) sub_context.deactivate_push() for obj in objects: local_namespace["self"] = obj res = self.rule_evaluator.evaluate_conditions(sub_context, conditions, local_namespace) successful = list(filter(lambda r: r.status and type(r.body) == bool and r.body, res)) if successful: results.append(obj) return results def where_on_objects(self, context, objects, predicate=None, **kwargs): return self.filter_objects(context, objects, predicate=predicate, **kwargs)