import pytest from core.builtin_concepts import ReturnValueConcept from core.concept import Concept, DEFINITION_TYPE_DEF from core.rule import Rule, ACTION_TYPE_EXEC from core.sheerka.services.SheerkaEvaluateRules import SheerkaEvaluateRules from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager from sheerkarete.beta import BetaMemory from sheerkarete.common import V, WME, ReteToken from sheerkarete.conditions import Condition, NegatedCondition, AndConditions from sheerkarete.join_node import JoinNode from sheerkarete.negative_node import NegativeNode from sheerkarete.network import ReteNetwork, FACT_ID, FACT_NAME, FACT_SELF, FactObj from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka from tests.sheerkarete.RuleForTestingRete import RuleForTestingRete class TestReteNetwork(TestUsingMemoryBasedSheerka): @pytest.mark.parametrize("condition, expected_key", [ (Condition("id", "attr", "value"), ("id", "attr", "value")), (Condition(V("x"), "attr", "value"), ("*", "attr", "value")), (Condition("id", V("x"), "value"), ("id", "*", "value")), (Condition("id", "attr", V("x")), ("id", "attr", "*")), (Condition(V("x"), V("x"), V("x")), ("*", "*", "*")), ]) def test_i_can_create_alpha_memory(self, condition, expected_key): network = ReteNetwork() amem = network.build_or_share_alpha_memory(condition) assert amem.key == expected_key assert amem.key in network.alpha_hash def test_same_amem_is_reused(self): network = ReteNetwork() condition = Condition("id", "attr", "value") amem1 = network.build_or_share_alpha_memory(condition) amem2 = network.build_or_share_alpha_memory(condition) assert amem1 == amem2 assert len(network.alpha_hash) == 1 def test_i_can_initialize_a_simple_network(self): network = ReteNetwork() condition = Condition("id", "attr", "value") rule = RuleForTestingRete(AndConditions([condition])) network.add_rule(rule) assert condition.get_key() in network.alpha_hash assert len(network.pnodes) == 1 join_node = network.pnodes[0].parent assert isinstance(join_node, JoinNode) assert isinstance(join_node.parent, BetaMemory) assert join_node.amem == network.alpha_hash[condition.get_key()][0] def test_i_can_add_multiple_rules_with_the_same_conditions(self): network = ReteNetwork() condition = Condition("id", "attr", "value") rule1 = RuleForTestingRete(AndConditions([condition])) rule2 = RuleForTestingRete(AndConditions([condition])) network.add_rule(rule1) network.add_rule(rule2) assert len(network.pnodes) == 1 assert network.pnodes[0].rules == [rule1, rule2] def test_i_can_update_conditions_attributes_by_id_when_constraints(self): network = ReteNetwork() conditions = [Condition(V("x"), "__name__", "fact_name"), Condition(V("x"), "attr1", "value1"), Condition(V("x"), "body", V("y")), Condition(V("y"), "__is_concept__", True), Condition(V("y"), "name", "SubConcept"), Condition(V("x"), "value", V("z")), Condition(V("z"), "status", False), Condition(V("z"), "body", V("zz")), Condition(V("zz"), "sub_value", "sub_value"), ] rule = RuleForTestingRete(AndConditions(conditions)) network.add_rule(rule) assert network.attributes_by_id == { "fact_name": ["__name__", "attr1", "body", "value"], "fact_name.body": ["__is_concept__", "name"], "fact_name.value": ["status", "body"], "fact_name.value.body": ["sub_value"], } def test_i_can_update_conditions_attributes_when_value_is_a_variable(self): network = ReteNetwork() conditions = [Condition(V("x"), "__name__", "x"), Condition(V("y"), "__name__", "y"), Condition(V("x"), "__self__", V("y"))] rule = RuleForTestingRete(AndConditions(conditions)) network.add_rule(rule) assert network.attributes_by_id == { "x": ["__name__", "__self__"], "y": ["__name__", "__self__"], } def test_adding_obj_when_no_rule_has_no_effect(self): network = ReteNetwork() ret = ReturnValueConcept("test", True, "value") network.add_obj("__ret", ret) assert network.working_memory == set() def test_adding_obj_when_rule_with_no_attribute_constraint(self): """ When a rule has no attribute constraint, we are forced to provide all the known properties """ network = ReteNetwork() ret = ReturnValueConcept("test", True, "value") network.add_rule(RuleForTestingRete( AndConditions([Condition("__ret", "*", True)]))) # test only, this rule cannot be matched network.add_obj("__ret", ret) assert network.working_memory == { WME("f-00000", "status", True), WME("f-00000", "value", "value"), WME("f-00000", "body", "value"), WME("f-00000", "parents", None), WME("f-00000", "id", None), WME("f-00000", "name", "__RETURN_VALUE"), WME("f-00000", "key", "__RETURN_VALUE"), WME("f-00000", "self", ret), WME("f-00000", "who", "test"), } def test_adding_obj_when_rule_with_attributes_constraints(self): """ When a rule with attribute constraint, we only add the requested attributes """ network = ReteNetwork() ret = ReturnValueConcept("test", True, "value") network.add_rule(RuleForTestingRete( AndConditions([Condition("__ret", "status", True), Condition("__ret", "body", "value")]))) network.add_obj("__ret", ret) assert network.working_memory == { WME("f-00000", "status", True), WME("f-00000", "body", "value"), } def test_adding_obj_when_requested_attribute_is_not_found(self): """ There is no error when an attribute does not exits """ network = ReteNetwork() ret = ReturnValueConcept("test", True, "value") network.add_rule(RuleForTestingRete(AndConditions([Condition("__ret", "not found", True), Condition("__ret", "body", "value")]))) network.add_obj("__ret", ret) assert network.working_memory == { WME("f-00000", "body", "value"), } def test_adding_object_when_id_is_not_defined_but_attribute_is_known(self): network = ReteNetwork() rule = RuleForTestingRete(AndConditions([ Condition(V("1"), "status", True), ])) network.add_rule(rule) ret1 = ReturnValueConcept("test", True, "first one") ret2 = ReturnValueConcept("test", False, "to discard") network.add_obj("__ret", ret1) network.add_obj("__ret", ret2) assert network.working_memory == { WME("f-00000", "status", True), WME("f-00001", "status", False), } def test_adding_obj_full_test_obj_is_a_concept(self): """ When the value is a concept, we create sub objects, to ease the requests """ sheerka, context, boy, the, greetings = self.init_concepts( "boy", Concept("the x").def_var("x"), Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a") ) network = ReteNetwork() the_boy = sheerka.new(the, x=boy) hello_the_boy = sheerka.new(greetings, a=the_boy) # original condition may look like # __ret.status # and __ret.body == $body # and isinstance($body, Concept) # and $body.name == "greetings" # and $body.get_value("a") == $the # and isinstance($the, Concept) # and $the.id == 1002 # and $the.get_value("x") = $boy # and instance($boy, Concept) # and $boy.name = "boy" ret = ReturnValueConcept("test", True, hello_the_boy) rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"), Condition(V("ret"), "status", True), Condition(V("ret"), "body", V("body")), Condition(V("body"), "name", "greetings"), Condition(V("body"), "a", V("the")), Condition(V("the"), "id", '1002'), Condition(V("the"), "x", V("boy")), Condition(V("boy"), "name", "boy"), ])) network.add_rule(rule) network.add_obj("__ret", ret) assert network.working_memory == { WME("f-00000", "__name__", "__ret"), WME("f-00000", "status", True), WME("f-00000", "body", "f-00000.body"), WME("f-00000.body", "name", "greetings"), WME("f-00000.body", "a", "f-00000.body.a"), WME("f-00000.body.a", "id", "1002"), WME("f-00000.body.a", "x", "f-00000.body.a.x"), WME("f-00000.body.a.x", "name", "boy"), } # sanity check that the WME produced match the condition matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] assert network.facts == {'f-00000': ret} def test_i_can_add_obj_and_match_obj_when_value_is_sheerka(self): sheerka, context, greetings = self.init_concepts( Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a") ) network = ReteNetwork() rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"), Condition(V("ret"), "body", V("body")), Condition(V("body"), "name", "greetings"), Condition(V("body"), "a", "__sheerka__"), ])) network.add_rule(rule) hello_concept = sheerka.new(greetings, a=sheerka) ret = ReturnValueConcept("test", True, hello_concept) network.add_obj("__ret", ret) assert network.working_memory == { WME("f-00000", "__name__", "__ret"), WME("f-00000", "body", "f-00000.body"), WME("f-00000.body", "name", "greetings"), WME("f-00000.body", "a", "__sheerka__"), } # sanity check that the WME produced match the condition matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] assert network.facts == {'f-00000': ret} def test_i_can_add_primitive(self): network = ReteNetwork() rule = RuleForTestingRete(AndConditions([Condition(V("a"), FACT_NAME, "a"), Condition(V("a"), FACT_SELF, 10), ])) network.add_rule(rule) network.add_obj("a", 10) assert network.working_memory == { WME("f-00000", "__name__", "a"), WME("f-00000", "__self__", 10), } # sanity check that the WME produced match the condition matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] assert network.facts == {'f-00000': FactObj(10)} def test_i_can_distinguish_objects_with_different_value(self): network = ReteNetwork() rule = RuleForTestingRete(AndConditions([ Condition(V("1"), "status", True), Condition(V("1"), "value", "first one"), Condition(V("2"), "status", True), Condition(V("2"), "value", "second one") ])) network.add_rule(rule) ret1 = ReturnValueConcept("test", True, "first one") ret2 = ReturnValueConcept("test", False, "to discard") ret3 = ReturnValueConcept("test", True, "second one") network.add_obj("__ret", ret1) network.add_obj("__ret", ret2) network.add_obj("__ret", ret3) assert network.working_memory == { WME("f-00000", "status", True), WME("f-00000", "value", "first one"), WME("f-00001", "status", False), WME("f-00001", "value", "to discard"), WME("f-00002", "status", True), WME("f-00002", "value", "second one"), } # Check that the rule is fired matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] @pytest.mark.parametrize("conditions, expected", [ ([Condition("id", "some_weird_attribute", "value")], {'id': ['some_weird_attribute']}), ([Condition("id", "attr1", V('x')), Condition("id", "attr2", "*")], {'id': ['attr1', 'attr2']}), ([Condition(V('x'), "attr", "value")], {}), ([Condition("id", V('x'), "value")], {'id': ["*"]}), ]) def test_attributes_by_identifier_are_updated_upon_condition_creation(self, conditions, expected): network = ReteNetwork() rule = RuleForTestingRete(AndConditions(conditions)) network.add_rule(rule) assert network.attributes_by_id == expected def test_i_cannot_add_the_same_object_twice(self): network = ReteNetwork() rule = RuleForTestingRete(AndConditions([Condition("__obj", "attr", "value")])) network.add_rule(rule) foo = Concept("foo") network.add_obj("__obj", foo) with pytest.raises(ValueError): network.add_obj("__obj", foo) def test_i_can_remove_wme(self): network = ReteNetwork() cond1 = Condition("id", "attr", "value") wme = WME("id", "attr", "value") rule = RuleForTestingRete(AndConditions([cond1])) network.add_rule(rule) network.add_wme(wme) assert len(list(network.matches)) == 1 network.remove_wme(wme) assert len(list(network.matches)) == 0 def test_i_can_remove_wme_when_multiple_p_nodes(self): network = ReteNetwork() cond1 = Condition("id", "attr", V("x")) cond2 = Condition("id", "attr", V("y")) rule1 = RuleForTestingRete(AndConditions([cond1])) rule2 = RuleForTestingRete(AndConditions([cond2])) network.add_rule(rule1) network.add_rule(rule2) assert len(network.pnodes) == 2 wme = WME("id", "attr", "value") network.add_wme(wme) assert len(list(network.matches)) == 2 network.remove_wme(wme) assert len(list(network.matches)) == 0 def test_i_can_remove_and_add_again_a_wme(self): network = ReteNetwork() cond1 = Condition("id", "attr", "value") wme = WME("id", "attr", "value") rule = RuleForTestingRete(AndConditions([cond1])) network.add_rule(rule) network.add_wme(wme) network.remove_wme(wme) assert len(list(network.matches)) == 0 network.add_wme(wme) assert len(list(network.matches)) == 1 def test_i_can_remove_a_fact(self): sheerka, context, boy, the, greetings = self.init_concepts( "boy", Concept("the x").def_var("x"), Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a") ) network = ReteNetwork() the_boy = sheerka.new(the, x=boy) hello_the_boy = sheerka.new(greetings, a=the_boy) # to allow all attributes for '__ret' rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"), Condition(V("ret"), V("x"), V("y"))])) network.add_rule(rule) ret = ReturnValueConcept("test", True, hello_the_boy) network.add_obj("__ret", ret) # sanity check assert len(network.working_memory) > 0 assert len(list(network.matches)) > 0 # remove the object network.remove_obj(ret) assert len(network.working_memory) == 0 assert len(list(network.matches)) == 0 def test_i_cannot_remove_an_object_with_no_fact_id(self): network = ReteNetwork() with pytest.raises(ValueError): network.remove_obj(object()) def test_i_cannot_remove_an_object_that_is_not_in_the_system(self): network = ReteNetwork() obj = ReturnValueConcept("who", True, "value") setattr(obj, FACT_ID, "xxx") with pytest.raises(ValueError): network.remove_obj(obj) def test_i_can_find_stack_of_two_blocks_to_the_left_of_the_red_block(self): network = ReteNetwork() c1 = Condition(V("x"), "on", V("y")) c2 = Condition(V("y"), "left-of", V("z")) c3 = Condition(V("z"), "color", "red") rule = RuleForTestingRete(AndConditions([c1, c2, c3])) network.add_rule(rule) wme1 = WME("B1", "on", "B2") wme2 = WME("B1", "on", "B3") wme3 = WME("B1", "color", "red") wme4 = WME("B2", "on", "table") wme5 = WME("B2", "left-of", "B3") wme6 = WME("B2", "color", "blue") wme7 = WME("B3", "left-of", "B4") wme8 = WME("B3", "on", "table") wme9 = WME("B3", "color", "red") network.add_wme(wme1) network.add_wme(wme2) network.add_wme(wme3) network.add_wme(wme4) network.add_wme(wme5) network.add_wme(wme6) network.add_wme(wme7) network.add_wme(wme8) network.add_wme(wme9) matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] assert matches[0].token.wmes == [wme1, wme5, wme9] am1 = network.build_or_share_alpha_memory(c1) am2 = network.build_or_share_alpha_memory(c2) am3 = network.build_or_share_alpha_memory(c3) assert am1.items == [wme1, wme2, wme4, wme8] assert am2.items == [wme5, wme7] assert am3.items == [wme3, wme9] dummy_join = am1.successors[0] join_on_value_y = am2.successors[0] join_on_value_z = am3.successors[0] match_c0 = dummy_join.children[0] match_c0c1 = join_on_value_y.children[0] match_c0c1c2 = join_on_value_z.children[0] assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]] assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]] assert [t.wmes for t in match_c0c1c2.items] == [[wme1, wme5, wme9]] t0 = ReteToken(ReteToken(None, None), wme1) t1 = ReteToken(t0, wme5) t2 = ReteToken(t1, wme9) assert match_c0c1c2.items[0] == t2 def test_i_can_find_stack_of_two_blocks_to_the_left_of_one_that_is_not_red(self): network = ReteNetwork() c1 = Condition(V("x"), "on", V("y")) c2 = Condition(V("y"), "left-of", V("z")) c3 = NegatedCondition(V("z"), "color", "red") rule = RuleForTestingRete(AndConditions([c1, c2, c3])) network.add_rule(rule) wme1 = WME("B1", "on", "B2") wme2 = WME("B1", "on", "B3") wme3 = WME("B1", "color", "red") wme4 = WME("B2", "on", "table") wme5 = WME("B2", "left-of", "B3") wme6 = WME("B2", "color", "blue") wme7 = WME("B3", "left-of", "B4") wme8 = WME("B3", "on", "table") wme9 = WME("B3", "color", "red") network.add_wme(wme1) network.add_wme(wme2) network.add_wme(wme3) network.add_wme(wme4) network.add_wme(wme5) network.add_wme(wme6) network.add_wme(wme7) network.add_wme(wme8) network.add_wme(wme9) matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule] assert matches[0].token.wmes == [wme2, wme7, None] am1 = network.build_or_share_alpha_memory(c1) am2 = network.build_or_share_alpha_memory(c2) am3 = network.build_or_share_alpha_memory(c3) assert am1.items == [wme1, wme2, wme4, wme8] assert am2.items == [wme5, wme7] assert am3.items == [wme3, wme9] # same than positif condition dummy_join = am1.successors[0] join_on_value_y = am2.successors[0] join_on_value_z = am3.successors[0] match_c0 = dummy_join.children[0] match_c0c1 = join_on_value_y.children[0] match_c0c1c2 = join_on_value_z.children[0] assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]] assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]] assert type(join_on_value_z) == NegativeNode assert [t.wmes for t in match_c0c1c2.items] == [[wme2, wme7, None]] def test_i_can_remove_a_rule(self): network = ReteNetwork() rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")])) network.add_rule(rule1) network.remove_rule(rule1) assert len(network.pnodes) == 0 assert len(network.alpha_hash) == 0 assert len(network.rules) == 0 # assert len(network.attributes_by_id) == 0 # assert len(network.default_attributes) == 0 def test_i_can_remove_a_rule_when_it_shares_p_node(self): network = ReteNetwork() rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule1") rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule2") network.add_rule(rule1) network.add_rule(rule2) network.remove_rule(rule1) assert len(network.pnodes) == 1 assert network.pnodes[0].rules == [rule2] assert network.rules == {rule2} # sanity wme = WME("id", "attr", "value") network.add_wme(wme) matches = list(network.matches) assert len(matches) == 1 assert matches[0].pnode.rules == [rule2] assert matches[0].token.wmes == [wme] @pytest.mark.skip("Need a better attribute management") def test_i_can_manage_attributes_by_key_on_rule_removal(self): network = ReteNetwork() rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")])) rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", V("x"))])) rule3 = RuleForTestingRete(AndConditions([Condition("id", "attr2", V("x"))])) network.add_rule(rule1) network.add_rule(rule2) network.add_rule(rule3) # sanity assert network.attributes_by_id == {} network.add_rule(rule1) assert network.attributes_by_id == {} network.add_rule(rule3) assert network.attributes_by_id == {} network.add_rule(rule2) assert network.attributes_by_id == {} @pytest.mark.skip("Need a better attribute management") def test_i_can_manage_default_attributes_on_rule_removal(self): network = ReteNetwork() rule1 = RuleForTestingRete(AndConditions([Condition("id", V("x"), "value")])) rule2 = RuleForTestingRete(AndConditions([Condition("id", V("y"), V("z"))])) rule3 = RuleForTestingRete(AndConditions([Condition("id2", V("x"), V("y"))])) network.add_rule(rule1) network.add_rule(rule2) network.add_rule(rule3) # sanity assert network.default_attributes == set() network.add_rule(rule1) assert network.default_attributes == set() network.add_rule(rule3) assert network.default_attributes == set() network.add_rule(rule2) assert network.default_attributes == set() def test_i_can_add_an_empty_rule(self): network = ReteNetwork() rule = RuleForTestingRete(AndConditions([])) network.add_rule(rule) assert len(network.beta_root.children) == 1 network.remove_rule(rule) assert len(network.beta_root.children) == 0 def test_i_can_match_when_duplicate_conditions(self): network = ReteNetwork() c1 = Condition(V('x'), 'self', V('y')) c2 = Condition(V('x'), 'color', 'red') c3 = Condition(V('x'), 'color', 'red') rule = RuleForTestingRete(AndConditions([c1, c2, c3])) network.add_rule(rule) wme1 = WME('B1', 'self', 'B1') wme2 = WME('B1', 'color', 'red') network.add_wme(wme1) network.add_wme(wme2) assert len(list(network.matches)) == 1 def test_i_can_match_when_duplicate_conditions2(self): network = ReteNetwork() c1 = Condition(V('x'), 'self', V('y')) c2 = Condition(V('x'), 'color', 'red') c3 = Condition(V('y'), 'color', 'red') rule = RuleForTestingRete(AndConditions([c1, c2, c3])) network.add_rule(rule) wme1 = WME('B1', 'self', 'B1') wme2 = WME('B1', 'color', 'red') network.add_wme(wme1) network.add_wme(wme2) assert len(list(network.matches)) == 1 def test_multiple_rules(self): network = ReteNetwork() c1 = Condition(V('x'), 'on', V('y')) c2 = Condition(V('y'), 'left-of', V('z')) c3 = Condition(V('z'), 'color', 'red') c4 = Condition(V('z'), 'on', 'table') c5 = Condition(V('z'), 'left-of', 'B4') rule1 = RuleForTestingRete(AndConditions([c1, c2, c3])) rule2 = RuleForTestingRete(AndConditions([c1, c2, c4, c5])) network.add_rule(rule1) network.add_rule(rule2) wmes = [ WME('B1', 'on', 'B2'), WME('B1', 'on', 'B3'), WME('B1', 'color', 'red'), WME('B2', 'on', 'table'), WME('B2', 'left-of', 'B3'), WME('B2', 'color', 'blue'), WME('B3', 'left-of', 'B4'), WME('B3', 'on', 'table'), WME('B3', 'color', 'red'), ] for wme in wmes: network.add_wme(wme) # add rule on the fly rule3 = RuleForTestingRete(AndConditions([c1, c2, c4, c3])) network.add_rule(rule3) p_node1 = rule1.rete_p_nodes[0] p_node2 = rule2.rete_p_nodes[0] p_node3 = rule3.rete_p_nodes[0] assert len(list(p_node1.activations)) == 1 assert len(list(p_node2.activations)) == 1 assert len(list(p_node3.activations)) == 1 assert list(p_node1.activations)[0].wmes == [wmes[0], wmes[4], wmes[8]] assert list(p_node2.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[6]] assert list(p_node3.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[8]] network.remove_rule(rule3) assert len(list(p_node3.activations)) == 0 def test_exec_rule_is_added_to_rete_network_when_it_is_created(self): sheerka, context, rule = self.init_test().with_exec_rules(("rule_name", "id.attr == 'value'", 'True')).unpack() evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME] rete_network = evaluation_service.network assert len(rete_network.pnodes) == 1 assert rete_network.pnodes[0].rules[0] == rule assert rule.rete_net == rete_network assert len(rule.rete_p_nodes) > 0 def test_format_rule_is_not_added_to_rete_network_when_it_is_created(self): sheerka, context, rule = self.init_test().with_format_rules( ("rule_name", "id.attr == 'value'", 'True')).unpack() evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME] rete_network = evaluation_service.network assert len(rete_network.pnodes) == 0 assert rule.rete_net is None assert len(rule.rete_p_nodes) == 0 def test_rule_is_removed_from_rete_network_when_it_is_deleted(self): sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack() evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME] rete_network = evaluation_service.network sheerka.remove_rule(context, rule) assert len(rete_network.pnodes) == 0 def test_rule_is_removed_from_rete_network_when_it_is_deleted_from_another_ontology(self): sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack() evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME] rete_network = evaluation_service.network sheerka.push_ontology(context, "new ontology") sheerka.remove_rule(context, rule) assert len(rete_network.pnodes) == 0 @pytest.mark.skip def test_rules_are_removed_upon_ontology_removal(self): sheerka, context = self.init_test().unpack() service = sheerka.services[SheerkaRuleManager.NAME] evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME] rete_network = evaluation_service.network sheerka.push_ontology(context, "Testing Rule removal 1") rule = Rule(ACTION_TYPE_EXEC, "name", "id.attr = 'value'", "True") rule = service.init_rule(context, rule) service.create_new_rule(context, rule) assert len(rete_network.pnodes) == 1 assert rete_network.pnodes[0].rules[0] == rule sheerka.push_ontology(context, "Testing Rule removal 2") rule2 = Rule(ACTION_TYPE_EXEC, "name2", "id2.attr2 = 'value2'", "True") rule2 = service.init_rule(context, rule2) service.create_new_rule(context, rule2) assert len(rete_network.pnodes) == 2 assert rete_network.pnodes[0].rules[0] == rule assert rete_network.pnodes[1].rules[0] == rule2 sheerka.pop_ontology(context) assert len(rete_network.pnodes) == 1 assert rete_network.pnodes[0].rules[0] == rule sheerka.pop_ontology(context) assert len(rete_network.pnodes) == 0