Files
Sheerka-Old/tests/sheerkarete/test_network.py
T
2021-03-18 11:49:48 +01:00

740 lines
28 KiB
Python

import pytest
from core.builtin_concepts import ReturnValueConcept
from core.concept import Concept, DEFINITION_TYPE_DEF
from core.rule import Rule, ACTION_TYPE_EXEC
from core.sheerka.services.SheerkaEvaluateRules import SheerkaEvaluateRules
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager
from sheerkarete.beta import BetaMemory
from sheerkarete.common import V, WME, ReteToken
from sheerkarete.conditions import Condition, NegatedCondition, AndConditions
from sheerkarete.join_node import JoinNode
from sheerkarete.negative_node import NegativeNode
from sheerkarete.network import ReteNetwork, FACT_ID
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
from tests.sheerkarete.RuleForTestingRete import RuleForTestingRete
class TestReteNetwork(TestUsingMemoryBasedSheerka):
@pytest.mark.parametrize("condition, expected_key", [
(Condition("id", "attr", "value"), ("id", "attr", "value")),
(Condition(V("x"), "attr", "value"), ("*", "attr", "value")),
(Condition("id", V("x"), "value"), ("id", "*", "value")),
(Condition("id", "attr", V("x")), ("id", "attr", "*")),
(Condition(V("x"), V("x"), V("x")), ("*", "*", "*")),
])
def test_i_can_create_alpha_memory(self, condition, expected_key):
network = ReteNetwork()
amem = network.build_or_share_alpha_memory(condition)
assert amem.key == expected_key
assert amem.key in network.alpha_hash
def test_same_amem_is_reused(self):
network = ReteNetwork()
condition = Condition("id", "attr", "value")
amem1 = network.build_or_share_alpha_memory(condition)
amem2 = network.build_or_share_alpha_memory(condition)
assert amem1 == amem2
assert len(network.alpha_hash) == 1
def test_i_can_initialize_a_simple_network(self):
network = ReteNetwork()
condition = Condition("id", "attr", "value")
rule = RuleForTestingRete(AndConditions([condition]))
network.add_rule(rule)
assert condition.get_key() in network.alpha_hash
assert len(network.pnodes) == 1
join_node = network.pnodes[0].parent
assert isinstance(join_node, JoinNode)
assert isinstance(join_node.parent, BetaMemory)
assert join_node.amem == network.alpha_hash[condition.get_key()][0]
def test_i_can_add_multiple_rules_with_the_same_conditions(self):
network = ReteNetwork()
condition = Condition("id", "attr", "value")
rule1 = RuleForTestingRete(AndConditions([condition]))
rule2 = RuleForTestingRete(AndConditions([condition]))
network.add_rule(rule1)
network.add_rule(rule2)
assert len(network.pnodes) == 1
assert network.pnodes[0].rules == [rule1, rule2]
def test_i_can_update_conditions_attributes_by_id_when_constraints(self):
network = ReteNetwork()
conditions = [Condition(V("x"), "__name__", "fact_name"),
Condition(V("x"), "attr1", "value1"),
Condition(V("x"), "body", V("y")),
Condition(V("y"), "__is_concept__", True),
Condition(V("y"), "name", "SubConcept"),
Condition(V("x"), "value", V("z")),
Condition(V("z"), "status", False),
Condition(V("z"), "body", V("zz")),
Condition(V("zz"), "sub_value", "sub_value"),
]
rule = RuleForTestingRete(AndConditions(conditions))
network.add_rule(rule)
assert network.attributes_by_id == {
"fact_name": ["__name__", "attr1", "body", "value"],
"fact_name.body": ["__is_concept__", "name"],
"fact_name.value": ["status", "body"],
"fact_name.value.body": ["sub_value"],
}
def test_adding_obj_when_no_rule_has_no_effect(self):
network = ReteNetwork()
ret = ReturnValueConcept("test", True, "value")
network.add_obj("__ret", ret)
assert network.working_memory == set()
def test_adding_obj_when_rule_with_no_attribute_constraint(self):
"""
When a rule has no attribute constraint, we are forced to provide all the known properties
"""
network = ReteNetwork()
ret = ReturnValueConcept("test", True, "value")
network.add_rule(RuleForTestingRete(
AndConditions([Condition("__ret", "*", True)]))) # test only, this rule cannot be matched
network.add_obj("__ret", ret)
assert network.working_memory == {
WME("f-00000", "status", True),
WME("f-00000", "value", "value"),
WME("f-00000", "body", "value"),
WME("f-00000", "parents", None),
WME("f-00000", "id", None),
WME("f-00000", "name", "__RETURN_VALUE"),
WME("f-00000", "key", "__RETURN_VALUE"),
WME("f-00000", "self", ret),
WME("f-00000", "who", "test"),
}
def test_adding_obj_when_rule_with_attributes_constraints(self):
"""
When a rule with attribute constraint, we only add the requested attributes
"""
network = ReteNetwork()
ret = ReturnValueConcept("test", True, "value")
network.add_rule(RuleForTestingRete(
AndConditions([Condition("__ret", "status", True), Condition("__ret", "body", "value")])))
network.add_obj("__ret", ret)
assert network.working_memory == {
WME("f-00000", "status", True),
WME("f-00000", "body", "value"),
}
def test_adding_obj_when_requested_attribute_is_not_found(self):
"""
There is no error when an attribute does not exits
"""
network = ReteNetwork()
ret = ReturnValueConcept("test", True, "value")
network.add_rule(RuleForTestingRete(AndConditions([Condition("__ret", "not found", True),
Condition("__ret", "body", "value")])))
network.add_obj("__ret", ret)
assert network.working_memory == {
WME("f-00000", "body", "value"),
}
def test_adding_object_when_id_is_not_defined_but_attribute_is_known(self):
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions([
Condition(V("1"), "status", True),
]))
network.add_rule(rule)
ret1 = ReturnValueConcept("test", True, "first one")
ret2 = ReturnValueConcept("test", False, "to discard")
network.add_obj("__ret", ret1)
network.add_obj("__ret", ret2)
assert network.working_memory == {
WME("f-00000", "status", True),
WME("f-00001", "status", False),
}
def test_adding_obj_full_test_obj_is_a_concept(self):
"""
When the value is a concept, we create sub objects, to ease the requests
"""
sheerka, context, boy, the, greetings = self.init_concepts(
"boy",
Concept("the x").def_var("x"),
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
)
network = ReteNetwork()
the_boy = sheerka.new(the, x=boy)
hello_the_boy = sheerka.new(greetings, a=the_boy)
# original condition may look like
# __ret.status
# and __ret.body == $body
# and isinstance($body, Concept)
# and $body.name == "greetings"
# and $body.get_value("a") == $the
# and isinstance($the, Concept)
# and $the.id == 1002
# and $the.get_value("x") = $boy
# and instance($boy, Concept)
# and $boy.name = "boy"
ret = ReturnValueConcept("test", True, hello_the_boy)
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
Condition(V("ret"), "status", True),
Condition(V("ret"), "body", V("body")),
Condition(V("body"), "name", "greetings"),
Condition(V("body"), "a", V("the")),
Condition(V("the"), "id", '1002'),
Condition(V("the"), "x", V("boy")),
Condition(V("boy"), "name", "boy"),
]))
network.add_rule(rule)
network.add_obj("__ret", ret)
assert network.working_memory == {
WME("f-00000", "__name__", "__ret"),
WME("f-00000", "status", True),
WME("f-00000", "body", "f-00000.body"),
WME("f-00000.body", "name", "greetings"),
WME("f-00000.body", "a", "f-00000.body.a"),
WME("f-00000.body.a", "id", "1002"),
WME("f-00000.body.a", "x", "f-00000.body.a.x"),
WME("f-00000.body.a.x", "name", "boy"),
}
# sanity check that the WME produced match the condition
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule]
assert network.facts == {'f-00000': ret}
def test_i_can_add_obj_and_match_obj_when_value_is_sheerka(self):
sheerka, context, greetings = self.init_concepts(
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
)
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
Condition(V("ret"), "body", V("body")),
Condition(V("body"), "name", "greetings"),
Condition(V("body"), "a", "__sheerka__"),
]))
network.add_rule(rule)
hello_concept = sheerka.new(greetings, a=sheerka)
ret = ReturnValueConcept("test", True, hello_concept)
network.add_obj("__ret", ret)
assert network.working_memory == {
WME("f-00000", "__name__", "__ret"),
WME("f-00000", "body", "f-00000.body"),
WME("f-00000.body", "name", "greetings"),
WME("f-00000.body", "a", "__sheerka__"),
}
# sanity check that the WME produced match the condition
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule]
assert network.facts == {'f-00000': ret}
def test_i_can_distinguish_objects_with_different_value(self):
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions([
Condition(V("1"), "status", True),
Condition(V("1"), "value", "first one"),
Condition(V("2"), "status", True),
Condition(V("2"), "value", "second one")
]))
network.add_rule(rule)
ret1 = ReturnValueConcept("test", True, "first one")
ret2 = ReturnValueConcept("test", False, "to discard")
ret3 = ReturnValueConcept("test", True, "second one")
network.add_obj("__ret", ret1)
network.add_obj("__ret", ret2)
network.add_obj("__ret", ret3)
assert network.working_memory == {
WME("f-00000", "status", True),
WME("f-00000", "value", "first one"),
WME("f-00001", "status", False),
WME("f-00001", "value", "to discard"),
WME("f-00002", "status", True),
WME("f-00002", "value", "second one"),
}
# Check that the rule is fired
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule]
@pytest.mark.parametrize("conditions, expected", [
([Condition("id", "some_weird_attribute", "value")], {'id': ['some_weird_attribute']}),
([Condition("id", "attr1", V('x')), Condition("id", "attr2", "*")], {'id': ['attr1', 'attr2']}),
([Condition(V('x'), "attr", "value")], {}),
([Condition("id", V('x'), "value")], {'id': ["*"]}),
])
def test_attributes_by_identifier_are_updated_upon_condition_creation(self, conditions, expected):
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions(conditions))
network.add_rule(rule)
assert network.attributes_by_id == expected
def test_i_cannot_add_the_same_object_twice(self):
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions([Condition("__obj", "attr", "value")]))
network.add_rule(rule)
foo = Concept("foo")
network.add_obj("__obj", foo)
with pytest.raises(ValueError):
network.add_obj("__obj", foo)
def test_i_can_remove_wme(self):
network = ReteNetwork()
cond1 = Condition("id", "attr", "value")
wme = WME("id", "attr", "value")
rule = RuleForTestingRete(AndConditions([cond1]))
network.add_rule(rule)
network.add_wme(wme)
assert len(list(network.matches)) == 1
network.remove_wme(wme)
assert len(list(network.matches)) == 0
def test_i_can_remove_wme_when_multiple_p_nodes(self):
network = ReteNetwork()
cond1 = Condition("id", "attr", V("x"))
cond2 = Condition("id", "attr", V("y"))
rule1 = RuleForTestingRete(AndConditions([cond1]))
rule2 = RuleForTestingRete(AndConditions([cond2]))
network.add_rule(rule1)
network.add_rule(rule2)
assert len(network.pnodes) == 2
wme = WME("id", "attr", "value")
network.add_wme(wme)
assert len(list(network.matches)) == 2
network.remove_wme(wme)
assert len(list(network.matches)) == 0
def test_i_can_remove_and_add_again_a_wme(self):
network = ReteNetwork()
cond1 = Condition("id", "attr", "value")
wme = WME("id", "attr", "value")
rule = RuleForTestingRete(AndConditions([cond1]))
network.add_rule(rule)
network.add_wme(wme)
network.remove_wme(wme)
assert len(list(network.matches)) == 0
network.add_wme(wme)
assert len(list(network.matches)) == 1
def test_i_can_remove_a_fact(self):
sheerka, context, boy, the, greetings = self.init_concepts(
"boy",
Concept("the x").def_var("x"),
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
)
network = ReteNetwork()
the_boy = sheerka.new(the, x=boy)
hello_the_boy = sheerka.new(greetings, a=the_boy)
# to allow all attributes for '__ret'
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
Condition(V("ret"), V("x"), V("y"))]))
network.add_rule(rule)
ret = ReturnValueConcept("test", True, hello_the_boy)
network.add_obj("__ret", ret)
# sanity check
assert len(network.working_memory) > 0
assert len(list(network.matches)) > 0
# remove the object
network.remove_obj(ret)
assert len(network.working_memory) == 0
assert len(list(network.matches)) == 0
def test_i_cannot_remove_an_object_with_no_fact_id(self):
network = ReteNetwork()
with pytest.raises(ValueError):
network.remove_obj(object())
def test_i_cannot_remove_an_object_that_is_not_in_the_system(self):
network = ReteNetwork()
obj = ReturnValueConcept("who", True, "value")
setattr(obj, FACT_ID, "xxx")
with pytest.raises(ValueError):
network.remove_obj(obj)
def test_i_can_find_stack_of_two_blocks_to_the_left_of_the_red_block(self):
network = ReteNetwork()
c1 = Condition(V("x"), "on", V("y"))
c2 = Condition(V("y"), "left-of", V("z"))
c3 = Condition(V("z"), "color", "red")
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
network.add_rule(rule)
wme1 = WME("B1", "on", "B2")
wme2 = WME("B1", "on", "B3")
wme3 = WME("B1", "color", "red")
wme4 = WME("B2", "on", "table")
wme5 = WME("B2", "left-of", "B3")
wme6 = WME("B2", "color", "blue")
wme7 = WME("B3", "left-of", "B4")
wme8 = WME("B3", "on", "table")
wme9 = WME("B3", "color", "red")
network.add_wme(wme1)
network.add_wme(wme2)
network.add_wme(wme3)
network.add_wme(wme4)
network.add_wme(wme5)
network.add_wme(wme6)
network.add_wme(wme7)
network.add_wme(wme8)
network.add_wme(wme9)
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule]
assert matches[0].token.wmes == [wme1, wme5, wme9]
am1 = network.build_or_share_alpha_memory(c1)
am2 = network.build_or_share_alpha_memory(c2)
am3 = network.build_or_share_alpha_memory(c3)
assert am1.items == [wme1, wme2, wme4, wme8]
assert am2.items == [wme5, wme7]
assert am3.items == [wme3, wme9]
dummy_join = am1.successors[0]
join_on_value_y = am2.successors[0]
join_on_value_z = am3.successors[0]
match_c0 = dummy_join.children[0]
match_c0c1 = join_on_value_y.children[0]
match_c0c1c2 = join_on_value_z.children[0]
assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]]
assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]]
assert [t.wmes for t in match_c0c1c2.items] == [[wme1, wme5, wme9]]
t0 = ReteToken(ReteToken(None, None), wme1)
t1 = ReteToken(t0, wme5)
t2 = ReteToken(t1, wme9)
assert match_c0c1c2.items[0] == t2
def test_i_can_find_stack_of_two_blocks_to_the_left_of_one_that_is_not_red(self):
network = ReteNetwork()
c1 = Condition(V("x"), "on", V("y"))
c2 = Condition(V("y"), "left-of", V("z"))
c3 = NegatedCondition(V("z"), "color", "red")
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
network.add_rule(rule)
wme1 = WME("B1", "on", "B2")
wme2 = WME("B1", "on", "B3")
wme3 = WME("B1", "color", "red")
wme4 = WME("B2", "on", "table")
wme5 = WME("B2", "left-of", "B3")
wme6 = WME("B2", "color", "blue")
wme7 = WME("B3", "left-of", "B4")
wme8 = WME("B3", "on", "table")
wme9 = WME("B3", "color", "red")
network.add_wme(wme1)
network.add_wme(wme2)
network.add_wme(wme3)
network.add_wme(wme4)
network.add_wme(wme5)
network.add_wme(wme6)
network.add_wme(wme7)
network.add_wme(wme8)
network.add_wme(wme9)
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule]
assert matches[0].token.wmes == [wme2, wme7, None]
am1 = network.build_or_share_alpha_memory(c1)
am2 = network.build_or_share_alpha_memory(c2)
am3 = network.build_or_share_alpha_memory(c3)
assert am1.items == [wme1, wme2, wme4, wme8]
assert am2.items == [wme5, wme7]
assert am3.items == [wme3, wme9] # same than positif condition
dummy_join = am1.successors[0]
join_on_value_y = am2.successors[0]
join_on_value_z = am3.successors[0]
match_c0 = dummy_join.children[0]
match_c0c1 = join_on_value_y.children[0]
match_c0c1c2 = join_on_value_z.children[0]
assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]]
assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]]
assert type(join_on_value_z) == NegativeNode
assert [t.wmes for t in match_c0c1c2.items] == [[wme2, wme7, None]]
def test_i_can_remove_a_rule(self):
network = ReteNetwork()
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]))
network.add_rule(rule1)
network.remove_rule(rule1)
assert len(network.pnodes) == 0
assert len(network.alpha_hash) == 0
assert len(network.rules) == 0
# assert len(network.attributes_by_id) == 0
# assert len(network.default_attributes) == 0
def test_i_can_remove_a_rule_when_it_shares_p_node(self):
network = ReteNetwork()
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule1")
rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule2")
network.add_rule(rule1)
network.add_rule(rule2)
network.remove_rule(rule1)
assert len(network.pnodes) == 1
assert network.pnodes[0].rules == [rule2]
assert network.rules == {rule2}
# sanity
wme = WME("id", "attr", "value")
network.add_wme(wme)
matches = list(network.matches)
assert len(matches) == 1
assert matches[0].pnode.rules == [rule2]
assert matches[0].token.wmes == [wme]
@pytest.mark.skip("Need a better attribute management")
def test_i_can_manage_attributes_by_key_on_rule_removal(self):
network = ReteNetwork()
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]))
rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", V("x"))]))
rule3 = RuleForTestingRete(AndConditions([Condition("id", "attr2", V("x"))]))
network.add_rule(rule1)
network.add_rule(rule2)
network.add_rule(rule3)
# sanity
assert network.attributes_by_id == {}
network.add_rule(rule1)
assert network.attributes_by_id == {}
network.add_rule(rule3)
assert network.attributes_by_id == {}
network.add_rule(rule2)
assert network.attributes_by_id == {}
@pytest.mark.skip("Need a better attribute management")
def test_i_can_manage_default_attributes_on_rule_removal(self):
network = ReteNetwork()
rule1 = RuleForTestingRete(AndConditions([Condition("id", V("x"), "value")]))
rule2 = RuleForTestingRete(AndConditions([Condition("id", V("y"), V("z"))]))
rule3 = RuleForTestingRete(AndConditions([Condition("id2", V("x"), V("y"))]))
network.add_rule(rule1)
network.add_rule(rule2)
network.add_rule(rule3)
# sanity
assert network.default_attributes == set()
network.add_rule(rule1)
assert network.default_attributes == set()
network.add_rule(rule3)
assert network.default_attributes == set()
network.add_rule(rule2)
assert network.default_attributes == set()
def test_i_can_add_an_empty_rule(self):
network = ReteNetwork()
rule = RuleForTestingRete(AndConditions([]))
network.add_rule(rule)
assert len(network.beta_root.children) == 1
network.remove_rule(rule)
assert len(network.beta_root.children) == 0
def test_i_can_match_when_duplicate_conditions(self):
network = ReteNetwork()
c1 = Condition(V('x'), 'self', V('y'))
c2 = Condition(V('x'), 'color', 'red')
c3 = Condition(V('x'), 'color', 'red')
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
network.add_rule(rule)
wme1 = WME('B1', 'self', 'B1')
wme2 = WME('B1', 'color', 'red')
network.add_wme(wme1)
network.add_wme(wme2)
assert len(list(network.matches)) == 1
def test_i_can_match_when_duplicate_conditions2(self):
network = ReteNetwork()
c1 = Condition(V('x'), 'self', V('y'))
c2 = Condition(V('x'), 'color', 'red')
c3 = Condition(V('y'), 'color', 'red')
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
network.add_rule(rule)
wme1 = WME('B1', 'self', 'B1')
wme2 = WME('B1', 'color', 'red')
network.add_wme(wme1)
network.add_wme(wme2)
assert len(list(network.matches)) == 1
def test_multiple_rules(self):
network = ReteNetwork()
c1 = Condition(V('x'), 'on', V('y'))
c2 = Condition(V('y'), 'left-of', V('z'))
c3 = Condition(V('z'), 'color', 'red')
c4 = Condition(V('z'), 'on', 'table')
c5 = Condition(V('z'), 'left-of', 'B4')
rule1 = RuleForTestingRete(AndConditions([c1, c2, c3]))
rule2 = RuleForTestingRete(AndConditions([c1, c2, c4, c5]))
network.add_rule(rule1)
network.add_rule(rule2)
wmes = [
WME('B1', 'on', 'B2'),
WME('B1', 'on', 'B3'),
WME('B1', 'color', 'red'),
WME('B2', 'on', 'table'),
WME('B2', 'left-of', 'B3'),
WME('B2', 'color', 'blue'),
WME('B3', 'left-of', 'B4'),
WME('B3', 'on', 'table'),
WME('B3', 'color', 'red'),
]
for wme in wmes:
network.add_wme(wme)
# add rule on the fly
rule3 = RuleForTestingRete(AndConditions([c1, c2, c4, c3]))
network.add_rule(rule3)
p_node1 = rule1.rete_p_nodes[0]
p_node2 = rule2.rete_p_nodes[0]
p_node3 = rule3.rete_p_nodes[0]
assert len(list(p_node1.activations)) == 1
assert len(list(p_node2.activations)) == 1
assert len(list(p_node3.activations)) == 1
assert list(p_node1.activations)[0].wmes == [wmes[0], wmes[4], wmes[8]]
assert list(p_node2.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[6]]
assert list(p_node3.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[8]]
network.remove_rule(rule3)
assert len(list(p_node3.activations)) == 0
def test_exec_rule_is_added_to_rete_network_when_it_is_created(self):
sheerka, context, rule = self.init_test().with_exec_rules(("rule_name", "id.attr == 'value'", 'True')).unpack()
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
rete_network = evaluation_service.network
assert len(rete_network.pnodes) == 1
assert rete_network.pnodes[0].rules[0] == rule
assert rule.rete_net == rete_network
assert len(rule.rete_p_nodes) > 0
def test_format_rule_is_not_added_to_rete_network_when_it_is_created(self):
sheerka, context, rule = self.init_test().with_format_rules(
("rule_name", "id.attr == 'value'", 'True')).unpack()
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
rete_network = evaluation_service.network
assert len(rete_network.pnodes) == 0
assert rule.rete_net is None
assert len(rule.rete_p_nodes) == 0
def test_rule_is_removed_from_rete_network_when_it_is_deleted(self):
sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack()
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
rete_network = evaluation_service.network
sheerka.remove_rule(context, rule)
assert len(rete_network.pnodes) == 0
def test_rule_is_removed_from_rete_network_when_it_is_deleted_from_another_ontology(self):
sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack()
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
rete_network = evaluation_service.network
sheerka.push_ontology(context, "new ontology")
sheerka.remove_rule(context, rule)
assert len(rete_network.pnodes) == 0
@pytest.mark.skip
def test_rules_are_removed_upon_ontology_removal(self):
sheerka, context = self.init_test().unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
rete_network = evaluation_service.network
sheerka.push_ontology(context, "Testing Rule removal 1")
rule = Rule(ACTION_TYPE_EXEC, "name", "id.attr = 'value'", "True")
rule = service.init_rule(context, rule)
service.create_new_rule(context, rule)
assert len(rete_network.pnodes) == 1
assert rete_network.pnodes[0].rules[0] == rule
sheerka.push_ontology(context, "Testing Rule removal 2")
rule2 = Rule(ACTION_TYPE_EXEC, "name2", "id2.attr2 = 'value2'", "True")
rule2 = service.init_rule(context, rule2)
service.create_new_rule(context, rule2)
assert len(rete_network.pnodes) == 2
assert rete_network.pnodes[0].rules[0] == rule
assert rete_network.pnodes[1].rules[0] == rule2
sheerka.pop_ontology(context)
assert len(rete_network.pnodes) == 1
assert rete_network.pnodes[0].rules[0] == rule
sheerka.pop_ontology(context)
assert len(rete_network.pnodes) == 0