from dataclasses import dataclass from cache.Cache import Cache from cache.ListCache import ListCache from core.builtin_concepts import BuiltinConcepts from core.concept import ensure_concept, Concept from core.sheerka.services.sheerka_service import ServiceObj, BaseService @dataclass class ComparisonObj(ServiceObj): """ Order to store """ property: str # property to compare a: int # id of concept a b: int # id of concept b op: str # comparison operation context: str = "#" # context when the comparison is right class SheerkaComparisonManager(BaseService): """ Manage partitioning of concepts """ NAME = "ComparisonManager" COMPARISON_ENTRY = "ComparisonManager:Comparison" RESOLVED_COMPARISON_ENTRY = "ComparisonManager:Resolved_Comparison" # to use an initialisation value for attributes that will make use of computed weights # the lesser and the greatest weight will be given relatively to this value DEFAULT_COMPARISON_VALUE = 1 def __init__(self, sheerka): super().__init__(sheerka) @staticmethod def _compute_key(prop_name, comparison_context): """ Key to use to store the comparisons :param prop_name: :param comparison_context: :return: """ if isinstance(prop_name, Concept): prefix = prop_name.key if prop_name.metadata.is_builtin else prop_name.id else: prefix = prop_name return f"{prefix}|{comparison_context}" @staticmethod def _get_weights(comparison_objs): """ For every element in greater_than_s, give it a weight if weight(a) > weight(b) it means that a > b :param comparison_objs: list of greater than objects :return: """ values = {} for comparison_obj in comparison_objs: values[comparison_obj.a] = SheerkaComparisonManager.DEFAULT_COMPARISON_VALUE values[comparison_obj.b] = SheerkaComparisonManager.DEFAULT_COMPARISON_VALUE for _ in range(len(comparison_objs)): for comparison_obj in comparison_objs: if comparison_obj.op == ">": values[comparison_obj.a] = values[comparison_obj.b] + 1 else: values[comparison_obj.b] = values[comparison_obj.a] + 1 return values def _compute_weights(self, comparison_objs, lesser_objs_ids=None, greatest_objs_ids=None): """ :param comparison_objs: :return: """ def is_not_in_objs(obj, objs_ids): return obj.op in ("<", ">") and obj.a not in objs_ids and obj.b not in objs_ids def is_in_objs(obj, objs_ids): return obj.op in ("<", ">") and (obj.a in objs_ids or obj.b in objs_ids) lesser_objs_ids = lesser_objs_ids or {co.a for co in comparison_objs if co.op == "<<"} greatest_objs_ids = greatest_objs_ids or {co.a for co in comparison_objs if co.op == ">>"} default_weight = SheerkaComparisonManager.DEFAULT_COMPARISON_VALUE # get the weights for all the lesser lesser_objs = [co for co in comparison_objs if is_in_objs(co, lesser_objs_ids)] lesser_objs_weights = self._get_weights(lesser_objs) # rearrange the weight to have the highest equals to DEFAULT_COMPARISON_VALUE - 1 highest_weight = len(lesser_objs_weights) for co_id in lesser_objs_weights: lesser_objs_weights[co_id] = lesser_objs_weights[co_id] - highest_weight + default_weight - 1 for concept_id in lesser_objs_ids: if concept_id not in lesser_objs_weights: lesser_objs_weights[concept_id] = default_weight - 1 # get the weights for concepts that are not lesser or greatest in_between_objs = [o for o in comparison_objs if is_not_in_objs(o, lesser_objs_ids | greatest_objs_ids)] in_between_weights = self._get_weights(in_between_objs) # get the weights for all the greatest greatest_objs = [co for co in comparison_objs if is_in_objs(co, greatest_objs_ids)] greatest_objs_weights = self._get_weights(greatest_objs) # rearrange the weight to have the lowest equals to DEFAULT_COMPARISON_VALUE + 1 highest_weight = max(default_weight, len(in_between_weights)) for co_id in greatest_objs_weights: greatest_objs_weights[co_id] = greatest_objs_weights[co_id] + highest_weight for concept_id in greatest_objs_ids: if concept_id not in greatest_objs_weights: greatest_objs_weights[concept_id] = highest_weight + 1 return {**lesser_objs_weights, **in_between_weights, **greatest_objs_weights} @staticmethod def _get_partition(weighted_concepts): res = {} for k, v in weighted_concepts.items(): res.setdefault(v, []).append(k) return res def _add_comparison(self, comparison_obj): key = self._compute_key(comparison_obj.property, comparison_obj.context) previous = self.sheerka.cache_manager.get(self.COMPARISON_ENTRY, key) new = previous.copy() if previous else [] for co in new: if co.property == comparison_obj.property and \ co.a == comparison_obj.a and \ co.b == comparison_obj.b and \ co.op == comparison_obj.op and \ co.context == comparison_obj.context: return self.sheerka.ret(self.NAME, False, self.sheerka.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED)) new.append(comparison_obj) lesser_objs_ids = {co.a for co in new if co.op == "<<"} greatest_objs_ids = {co.a for co in new if co.op == ">>"} # check if it is a valid operation regarding other lesser and greatest concepts if comparison_obj.op in ("<", ">"): a_is_lesser = comparison_obj.a in lesser_objs_ids b_is_lesser = comparison_obj.b in lesser_objs_ids if a_is_lesser != b_is_lesser: # XOR operation return self.sheerka.ret(self.NAME, False, self.sheerka.new(BuiltinConcepts.INVALID_LESSER_OPERATION)) a_is_greatest = comparison_obj.a in greatest_objs_ids b_is_greatest = comparison_obj.b in greatest_objs_ids if a_is_greatest != b_is_greatest: # XOR operation return self.sheerka.ret(self.NAME, False, self.sheerka.new(BuiltinConcepts.INVALID_GREATEST_OPERATION)) if comparison_obj.op == "<<" and comparison_obj.a in greatest_objs_ids: return self.sheerka.ret(self.NAME, False, self.sheerka.new(BuiltinConcepts.INVALID_GREATEST_OPERATION)) if comparison_obj.op == ">>" and comparison_obj.a in lesser_objs_ids: return self.sheerka.ret(self.NAME, False, self.sheerka.new(BuiltinConcepts.INVALID_LESSER_OPERATION)) cycles = self.detect_cycles(new) if cycles: concepts_in_cycle = [self.sheerka.get_by_id(c) for c in cycles] chicken_an_egg = self.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=concepts_in_cycle) return self.sheerka.ret(self.NAME, False, chicken_an_egg) self.sheerka.cache_manager.put(self.COMPARISON_ENTRY, key, comparison_obj) self.sheerka.cache_manager.put(self.RESOLVED_COMPARISON_ENTRY, key, self._compute_weights(new, lesser_objs_ids, greatest_objs_ids)) return self.sheerka.ret(self.NAME, True, self.sheerka.new(BuiltinConcepts.SUCCESS)) def initialize(self): cache = ListCache(default=lambda k: self.sheerka.sdp.get(self.COMPARISON_ENTRY, k)) self.sheerka.cache_manager.register_cache(self.COMPARISON_ENTRY, cache, True, True) cache = Cache() self.sheerka.cache_manager.register_cache(self.RESOLVED_COMPARISON_ENTRY, cache, persist=False) self.sheerka.bind_service_method(self.set_is_greater_than, True) self.sheerka.bind_service_method(self.set_is_less_than, True) self.sheerka.bind_service_method(self.set_is_lesser, True) self.sheerka.bind_service_method(self.set_is_greatest, True) self.sheerka.bind_service_method(self.get_partition, False) self.sheerka.bind_service_method(self.get_concepts_weights, False) def set_is_greater_than(self, context, prop_name, concept_a, concept_b, comparison_context="#"): """ Records that the property of concept a is greater than concept b's one :param context: :param prop_name: :param concept_a: :param concept_b: :param comparison_context: :return: """ context.log(f"Setting concept {concept_a} is greater than {concept_b}", who=self.NAME) ensure_concept(concept_a, concept_b) event_digest = context.event.get_digest() comparison_obj = ComparisonObj(event_digest, prop_name, concept_a.id, concept_b.id, ">", comparison_context) return self._add_comparison(comparison_obj) def set_is_less_than(self, context, prop_name, concept_a, concept_b, comparison_context="#"): """ Records that the property of concept a is lesser than concept b's one :param context: :param prop_name: :param concept_a: :param concept_b: :param comparison_context: :return: """ context.log(f"Setting concept {concept_a} is less than {concept_b}", who=self.NAME) ensure_concept(concept_a, concept_b) event_digest = context.event.get_digest() comparison_obj = ComparisonObj(event_digest, prop_name, concept_a.id, concept_b.id, "<", comparison_context) return self._add_comparison(comparison_obj) def set_is_lesser(self, context, prop_name, concept, comparison_context="#"): """ Records that the concept is less than any other concept if no direct comparison is given * A lesser concept has a weight smaller than any other concept that is not a lesser * When two concepts are lesser, you can compare (using set_is_less_than or set_is_greater_than) * If a concept is lesser, you cannot compare it with a non lesser concept * All lesser concepts that have no comparison directive are greater than the others (and share the same weight) :param context: :param prop_name: :param concept: :param comparison_context: :return: """ context.log(f"Setting concept {concept} is lesser", who=self.NAME) ensure_concept(concept) event_digest = context.event.get_digest() comparison_obj = ComparisonObj(event_digest, prop_name, concept.id, None, "<<", comparison_context) return self._add_comparison(comparison_obj) def set_is_greatest(self, context, prop_name, concept, comparison_context="#"): """ Records that the concept is greater than any other concept if no direct comparison is given * A greatest concept has a weight bigger than any other concept that is not a greatest * When two concepts are greatest, you can compare them (using set_is_less_than or set_is_greater_than) * If a concept is greatest, you cannot compare it with a non greatest concept * All greatest concepts that have no comparison directive are less than the others (and share the same weight) :param context: :param prop_name: :param concept: :param comparison_context: :return: """ context.log(f"Setting concept {concept} is greatest", who=self.NAME) ensure_concept(concept) event_digest = context.event.get_digest() comparison_obj = ComparisonObj(event_digest, prop_name, concept.id, None, ">>", comparison_context) return self._add_comparison(comparison_obj) def set_are_equivalent(self, context, prop_name, concept_a, concept_b, comparison_context="#"): """ Records that two concepts have the same weight * You cannot set the weight :param context: :param prop_name: :param concept_a: :param concept_b: :param comparison_context: :return: """ pass def set_are_equiv(self, context, prop_name, concept_a, concept_b, comparison_context="#"): pass def get_partition(self, prop_name, comparison_context="#"): """ Returns the equivalent classes for the property, using the comparison_context :param prop_name: :param comparison_context: :return: """ weighted_concept = self.get_concepts_weights(prop_name, comparison_context) return self._get_partition(weighted_concept) def get_concepts_weights(self, prop_name, comparison_context="#"): weighted_concepts = self.sheerka.cache_manager.get( self.RESOLVED_COMPARISON_ENTRY, self._compute_key(prop_name, comparison_context)) if weighted_concepts is None: key = self._compute_key(prop_name, comparison_context) entries = self.sheerka.cache_manager.get(self.COMPARISON_ENTRY, key) if entries is None: return {} else: weighted_concepts = self._compute_weights(entries) self.sheerka.cache_manager.put(self.RESOLVED_COMPARISON_ENTRY, key, weighted_concepts) return weighted_concepts @staticmethod def detect_cycles(comparison_objs): """ # Thanks to Divyanshu Mehta for contributing this code # https://www.geeksforgeeks.org/detect-cycle-in-a-graph/?ref=lbp :param comparison_objs: :return: """ latest = comparison_objs[-1] if latest.op == "=": return None def get_graph_and_vertices(): _graph = {} _vertices = set() for obj in comparison_objs: if obj.op == "=": continue _vertices.add(obj.a) _vertices.add(obj.b) if obj.op == ">": _graph.setdefault(obj.a, []).append(obj.b) else: _graph.setdefault(obj.b, []).append(obj.a) return _graph, _vertices def is_cyclic(v): # Mark current node as visited and # adds to recursion stack visited[v] = True rec_stack[v] = True # Recur for all neighbours # if any neighbour is visited and in # recStack then graph is cyclic if v in graph: for neighbour in graph[v]: if not visited[neighbour]: if is_cyclic(neighbour): return True elif rec_stack[neighbour]: return True # The node needs to be poped from # recursion stack before function ends rec_stack[v] = False return False graph, vertices = get_graph_and_vertices() visited = {k: False for k in vertices} rec_stack = {k: False for k in vertices} if is_cyclic(latest.a): # only need to check from the latest add, since the graph was not cyclic before return [k for k, v in rec_stack.items() if v] return None