740 lines
28 KiB
Python
740 lines
28 KiB
Python
import pytest
|
|
|
|
from core.builtin_concepts import ReturnValueConcept
|
|
from core.concept import Concept, DEFINITION_TYPE_DEF
|
|
from core.rule import Rule, ACTION_TYPE_EXEC
|
|
from core.sheerka.services.SheerkaEvaluateRules import SheerkaEvaluateRules
|
|
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager
|
|
from sheerkarete.beta import BetaMemory
|
|
from sheerkarete.common import V, WME, ReteToken
|
|
from sheerkarete.conditions import Condition, NegatedCondition, AndConditions
|
|
from sheerkarete.join_node import JoinNode
|
|
from sheerkarete.negative_node import NegativeNode
|
|
from sheerkarete.network import ReteNetwork, FACT_ID
|
|
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
|
|
from tests.sheerkarete.RuleForTestingRete import RuleForTestingRete
|
|
|
|
|
|
class TestReteNetwork(TestUsingMemoryBasedSheerka):
|
|
@pytest.mark.parametrize("condition, expected_key", [
|
|
(Condition("id", "attr", "value"), ("id", "attr", "value")),
|
|
(Condition(V("x"), "attr", "value"), ("*", "attr", "value")),
|
|
(Condition("id", V("x"), "value"), ("id", "*", "value")),
|
|
(Condition("id", "attr", V("x")), ("id", "attr", "*")),
|
|
(Condition(V("x"), V("x"), V("x")), ("*", "*", "*")),
|
|
])
|
|
def test_i_can_create_alpha_memory(self, condition, expected_key):
|
|
network = ReteNetwork()
|
|
|
|
amem = network.build_or_share_alpha_memory(condition)
|
|
assert amem.key == expected_key
|
|
assert amem.key in network.alpha_hash
|
|
|
|
def test_same_amem_is_reused(self):
|
|
network = ReteNetwork()
|
|
condition = Condition("id", "attr", "value")
|
|
|
|
amem1 = network.build_or_share_alpha_memory(condition)
|
|
amem2 = network.build_or_share_alpha_memory(condition)
|
|
|
|
assert amem1 == amem2
|
|
assert len(network.alpha_hash) == 1
|
|
|
|
def test_i_can_initialize_a_simple_network(self):
|
|
network = ReteNetwork()
|
|
condition = Condition("id", "attr", "value")
|
|
rule = RuleForTestingRete(AndConditions([condition]))
|
|
network.add_rule(rule)
|
|
|
|
assert condition.get_key() in network.alpha_hash
|
|
assert len(network.pnodes) == 1
|
|
|
|
join_node = network.pnodes[0].parent
|
|
assert isinstance(join_node, JoinNode)
|
|
assert isinstance(join_node.parent, BetaMemory)
|
|
assert join_node.amem == network.alpha_hash[condition.get_key()][0]
|
|
|
|
def test_i_can_add_multiple_rules_with_the_same_conditions(self):
|
|
network = ReteNetwork()
|
|
condition = Condition("id", "attr", "value")
|
|
rule1 = RuleForTestingRete(AndConditions([condition]))
|
|
rule2 = RuleForTestingRete(AndConditions([condition]))
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
|
|
assert len(network.pnodes) == 1
|
|
assert network.pnodes[0].rules == [rule1, rule2]
|
|
|
|
def test_i_can_update_conditions_attributes_by_id_when_constraints(self):
|
|
network = ReteNetwork()
|
|
conditions = [Condition(V("x"), "__name__", "fact_name"),
|
|
Condition(V("x"), "attr1", "value1"),
|
|
Condition(V("x"), "body", V("y")),
|
|
Condition(V("y"), "__is_concept__", True),
|
|
Condition(V("y"), "name", "SubConcept"),
|
|
Condition(V("x"), "value", V("z")),
|
|
Condition(V("z"), "status", False),
|
|
Condition(V("z"), "body", V("zz")),
|
|
Condition(V("zz"), "sub_value", "sub_value"),
|
|
]
|
|
|
|
rule = RuleForTestingRete(AndConditions(conditions))
|
|
network.add_rule(rule)
|
|
|
|
assert network.attributes_by_id == {
|
|
"fact_name": ["__name__", "attr1", "body", "value"],
|
|
"fact_name.body": ["__is_concept__", "name"],
|
|
"fact_name.value": ["status", "body"],
|
|
"fact_name.value.body": ["sub_value"],
|
|
}
|
|
|
|
def test_adding_obj_when_no_rule_has_no_effect(self):
|
|
network = ReteNetwork()
|
|
ret = ReturnValueConcept("test", True, "value")
|
|
network.add_obj("__ret", ret)
|
|
|
|
assert network.working_memory == set()
|
|
|
|
def test_adding_obj_when_rule_with_no_attribute_constraint(self):
|
|
"""
|
|
When a rule has no attribute constraint, we are forced to provide all the known properties
|
|
"""
|
|
network = ReteNetwork()
|
|
ret = ReturnValueConcept("test", True, "value")
|
|
network.add_rule(RuleForTestingRete(
|
|
AndConditions([Condition("__ret", "*", True)]))) # test only, this rule cannot be matched
|
|
|
|
network.add_obj("__ret", ret)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "status", True),
|
|
WME("f-00000", "value", "value"),
|
|
WME("f-00000", "body", "value"),
|
|
WME("f-00000", "parents", None),
|
|
WME("f-00000", "id", None),
|
|
WME("f-00000", "name", "__RETURN_VALUE"),
|
|
WME("f-00000", "key", "__RETURN_VALUE"),
|
|
WME("f-00000", "self", ret),
|
|
WME("f-00000", "who", "test"),
|
|
}
|
|
|
|
def test_adding_obj_when_rule_with_attributes_constraints(self):
|
|
"""
|
|
When a rule with attribute constraint, we only add the requested attributes
|
|
"""
|
|
network = ReteNetwork()
|
|
ret = ReturnValueConcept("test", True, "value")
|
|
network.add_rule(RuleForTestingRete(
|
|
AndConditions([Condition("__ret", "status", True), Condition("__ret", "body", "value")])))
|
|
|
|
network.add_obj("__ret", ret)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "status", True),
|
|
WME("f-00000", "body", "value"),
|
|
}
|
|
|
|
def test_adding_obj_when_requested_attribute_is_not_found(self):
|
|
"""
|
|
There is no error when an attribute does not exits
|
|
"""
|
|
network = ReteNetwork()
|
|
ret = ReturnValueConcept("test", True, "value")
|
|
network.add_rule(RuleForTestingRete(AndConditions([Condition("__ret", "not found", True),
|
|
Condition("__ret", "body", "value")])))
|
|
|
|
network.add_obj("__ret", ret)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "body", "value"),
|
|
}
|
|
|
|
def test_adding_object_when_id_is_not_defined_but_attribute_is_known(self):
|
|
network = ReteNetwork()
|
|
rule = RuleForTestingRete(AndConditions([
|
|
Condition(V("1"), "status", True),
|
|
]))
|
|
network.add_rule(rule)
|
|
|
|
ret1 = ReturnValueConcept("test", True, "first one")
|
|
ret2 = ReturnValueConcept("test", False, "to discard")
|
|
|
|
network.add_obj("__ret", ret1)
|
|
network.add_obj("__ret", ret2)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "status", True),
|
|
WME("f-00001", "status", False),
|
|
}
|
|
|
|
def test_adding_obj_full_test_obj_is_a_concept(self):
|
|
"""
|
|
When the value is a concept, we create sub objects, to ease the requests
|
|
"""
|
|
sheerka, context, boy, the, greetings = self.init_concepts(
|
|
"boy",
|
|
Concept("the x").def_var("x"),
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
|
|
)
|
|
network = ReteNetwork()
|
|
|
|
the_boy = sheerka.new(the, x=boy)
|
|
hello_the_boy = sheerka.new(greetings, a=the_boy)
|
|
|
|
# original condition may look like
|
|
# __ret.status
|
|
# and __ret.body == $body
|
|
# and isinstance($body, Concept)
|
|
# and $body.name == "greetings"
|
|
# and $body.get_value("a") == $the
|
|
# and isinstance($the, Concept)
|
|
# and $the.id == 1002
|
|
# and $the.get_value("x") = $boy
|
|
# and instance($boy, Concept)
|
|
# and $boy.name = "boy"
|
|
ret = ReturnValueConcept("test", True, hello_the_boy)
|
|
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
|
|
Condition(V("ret"), "status", True),
|
|
Condition(V("ret"), "body", V("body")),
|
|
Condition(V("body"), "name", "greetings"),
|
|
Condition(V("body"), "a", V("the")),
|
|
Condition(V("the"), "id", '1002'),
|
|
Condition(V("the"), "x", V("boy")),
|
|
Condition(V("boy"), "name", "boy"),
|
|
]))
|
|
network.add_rule(rule)
|
|
|
|
network.add_obj("__ret", ret)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "__name__", "__ret"),
|
|
WME("f-00000", "status", True),
|
|
WME("f-00000", "body", "f-00000.body"),
|
|
WME("f-00000.body", "name", "greetings"),
|
|
WME("f-00000.body", "a", "f-00000.body.a"),
|
|
WME("f-00000.body.a", "id", "1002"),
|
|
WME("f-00000.body.a", "x", "f-00000.body.a.x"),
|
|
WME("f-00000.body.a.x", "name", "boy"),
|
|
}
|
|
|
|
# sanity check that the WME produced match the condition
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert network.facts == {'f-00000': ret}
|
|
|
|
def test_i_can_add_obj_and_match_obj_when_value_is_sheerka(self):
|
|
sheerka, context, greetings = self.init_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
|
|
)
|
|
network = ReteNetwork()
|
|
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
|
|
Condition(V("ret"), "body", V("body")),
|
|
Condition(V("body"), "name", "greetings"),
|
|
Condition(V("body"), "a", "__sheerka__"),
|
|
]))
|
|
network.add_rule(rule)
|
|
|
|
hello_concept = sheerka.new(greetings, a=sheerka)
|
|
ret = ReturnValueConcept("test", True, hello_concept)
|
|
network.add_obj("__ret", ret)
|
|
|
|
assert network.working_memory == {
|
|
WME("f-00000", "__name__", "__ret"),
|
|
WME("f-00000", "body", "f-00000.body"),
|
|
WME("f-00000.body", "name", "greetings"),
|
|
WME("f-00000.body", "a", "__sheerka__"),
|
|
}
|
|
|
|
# sanity check that the WME produced match the condition
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert network.facts == {'f-00000': ret}
|
|
|
|
def test_i_can_distinguish_objects_with_different_value(self):
|
|
network = ReteNetwork()
|
|
rule = RuleForTestingRete(AndConditions([
|
|
Condition(V("1"), "status", True),
|
|
Condition(V("1"), "value", "first one"),
|
|
Condition(V("2"), "status", True),
|
|
Condition(V("2"), "value", "second one")
|
|
]))
|
|
network.add_rule(rule)
|
|
|
|
ret1 = ReturnValueConcept("test", True, "first one")
|
|
ret2 = ReturnValueConcept("test", False, "to discard")
|
|
ret3 = ReturnValueConcept("test", True, "second one")
|
|
|
|
network.add_obj("__ret", ret1)
|
|
network.add_obj("__ret", ret2)
|
|
network.add_obj("__ret", ret3)
|
|
assert network.working_memory == {
|
|
WME("f-00000", "status", True),
|
|
WME("f-00000", "value", "first one"),
|
|
WME("f-00001", "status", False),
|
|
WME("f-00001", "value", "to discard"),
|
|
WME("f-00002", "status", True),
|
|
WME("f-00002", "value", "second one"),
|
|
}
|
|
|
|
# Check that the rule is fired
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
|
|
@pytest.mark.parametrize("conditions, expected", [
|
|
([Condition("id", "some_weird_attribute", "value")], {'id': ['some_weird_attribute']}),
|
|
([Condition("id", "attr1", V('x')), Condition("id", "attr2", "*")], {'id': ['attr1', 'attr2']}),
|
|
([Condition(V('x'), "attr", "value")], {}),
|
|
([Condition("id", V('x'), "value")], {'id': ["*"]}),
|
|
])
|
|
def test_attributes_by_identifier_are_updated_upon_condition_creation(self, conditions, expected):
|
|
network = ReteNetwork()
|
|
|
|
rule = RuleForTestingRete(AndConditions(conditions))
|
|
network.add_rule(rule)
|
|
|
|
assert network.attributes_by_id == expected
|
|
|
|
def test_i_cannot_add_the_same_object_twice(self):
|
|
network = ReteNetwork()
|
|
|
|
rule = RuleForTestingRete(AndConditions([Condition("__obj", "attr", "value")]))
|
|
network.add_rule(rule)
|
|
|
|
foo = Concept("foo")
|
|
network.add_obj("__obj", foo)
|
|
|
|
with pytest.raises(ValueError):
|
|
network.add_obj("__obj", foo)
|
|
|
|
def test_i_can_remove_wme(self):
|
|
network = ReteNetwork()
|
|
|
|
cond1 = Condition("id", "attr", "value")
|
|
wme = WME("id", "attr", "value")
|
|
rule = RuleForTestingRete(AndConditions([cond1]))
|
|
network.add_rule(rule)
|
|
network.add_wme(wme)
|
|
assert len(list(network.matches)) == 1
|
|
|
|
network.remove_wme(wme)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_remove_wme_when_multiple_p_nodes(self):
|
|
network = ReteNetwork()
|
|
|
|
cond1 = Condition("id", "attr", V("x"))
|
|
cond2 = Condition("id", "attr", V("y"))
|
|
|
|
rule1 = RuleForTestingRete(AndConditions([cond1]))
|
|
rule2 = RuleForTestingRete(AndConditions([cond2]))
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
|
|
assert len(network.pnodes) == 2
|
|
|
|
wme = WME("id", "attr", "value")
|
|
network.add_wme(wme)
|
|
assert len(list(network.matches)) == 2
|
|
|
|
network.remove_wme(wme)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_remove_and_add_again_a_wme(self):
|
|
network = ReteNetwork()
|
|
|
|
cond1 = Condition("id", "attr", "value")
|
|
wme = WME("id", "attr", "value")
|
|
rule = RuleForTestingRete(AndConditions([cond1]))
|
|
network.add_rule(rule)
|
|
network.add_wme(wme)
|
|
network.remove_wme(wme)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
network.add_wme(wme)
|
|
assert len(list(network.matches)) == 1
|
|
|
|
def test_i_can_remove_a_fact(self):
|
|
sheerka, context, boy, the, greetings = self.init_concepts(
|
|
"boy",
|
|
Concept("the x").def_var("x"),
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a")
|
|
)
|
|
network = ReteNetwork()
|
|
|
|
the_boy = sheerka.new(the, x=boy)
|
|
hello_the_boy = sheerka.new(greetings, a=the_boy)
|
|
|
|
# to allow all attributes for '__ret'
|
|
rule = RuleForTestingRete(AndConditions([Condition(V("ret"), "__name__", "__ret"),
|
|
Condition(V("ret"), V("x"), V("y"))]))
|
|
network.add_rule(rule)
|
|
|
|
ret = ReturnValueConcept("test", True, hello_the_boy)
|
|
network.add_obj("__ret", ret)
|
|
|
|
# sanity check
|
|
assert len(network.working_memory) > 0
|
|
assert len(list(network.matches)) > 0
|
|
|
|
# remove the object
|
|
network.remove_obj(ret)
|
|
|
|
assert len(network.working_memory) == 0
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_cannot_remove_an_object_with_no_fact_id(self):
|
|
network = ReteNetwork()
|
|
with pytest.raises(ValueError):
|
|
network.remove_obj(object())
|
|
|
|
def test_i_cannot_remove_an_object_that_is_not_in_the_system(self):
|
|
network = ReteNetwork()
|
|
|
|
obj = ReturnValueConcept("who", True, "value")
|
|
setattr(obj, FACT_ID, "xxx")
|
|
with pytest.raises(ValueError):
|
|
network.remove_obj(obj)
|
|
|
|
def test_i_can_find_stack_of_two_blocks_to_the_left_of_the_red_block(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V("x"), "on", V("y"))
|
|
c2 = Condition(V("y"), "left-of", V("z"))
|
|
c3 = Condition(V("z"), "color", "red")
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
wme1 = WME("B1", "on", "B2")
|
|
wme2 = WME("B1", "on", "B3")
|
|
wme3 = WME("B1", "color", "red")
|
|
wme4 = WME("B2", "on", "table")
|
|
wme5 = WME("B2", "left-of", "B3")
|
|
wme6 = WME("B2", "color", "blue")
|
|
wme7 = WME("B3", "left-of", "B4")
|
|
wme8 = WME("B3", "on", "table")
|
|
wme9 = WME("B3", "color", "red")
|
|
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
network.add_wme(wme3)
|
|
network.add_wme(wme4)
|
|
network.add_wme(wme5)
|
|
network.add_wme(wme6)
|
|
network.add_wme(wme7)
|
|
network.add_wme(wme8)
|
|
network.add_wme(wme9)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert matches[0].token.wmes == [wme1, wme5, wme9]
|
|
|
|
am1 = network.build_or_share_alpha_memory(c1)
|
|
am2 = network.build_or_share_alpha_memory(c2)
|
|
am3 = network.build_or_share_alpha_memory(c3)
|
|
assert am1.items == [wme1, wme2, wme4, wme8]
|
|
assert am2.items == [wme5, wme7]
|
|
assert am3.items == [wme3, wme9]
|
|
|
|
dummy_join = am1.successors[0]
|
|
join_on_value_y = am2.successors[0]
|
|
join_on_value_z = am3.successors[0]
|
|
match_c0 = dummy_join.children[0]
|
|
match_c0c1 = join_on_value_y.children[0]
|
|
match_c0c1c2 = join_on_value_z.children[0]
|
|
assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]]
|
|
assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]]
|
|
assert [t.wmes for t in match_c0c1c2.items] == [[wme1, wme5, wme9]]
|
|
|
|
t0 = ReteToken(ReteToken(None, None), wme1)
|
|
t1 = ReteToken(t0, wme5)
|
|
t2 = ReteToken(t1, wme9)
|
|
assert match_c0c1c2.items[0] == t2
|
|
|
|
def test_i_can_find_stack_of_two_blocks_to_the_left_of_one_that_is_not_red(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V("x"), "on", V("y"))
|
|
c2 = Condition(V("y"), "left-of", V("z"))
|
|
c3 = NegatedCondition(V("z"), "color", "red")
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
wme1 = WME("B1", "on", "B2")
|
|
wme2 = WME("B1", "on", "B3")
|
|
wme3 = WME("B1", "color", "red")
|
|
wme4 = WME("B2", "on", "table")
|
|
wme5 = WME("B2", "left-of", "B3")
|
|
wme6 = WME("B2", "color", "blue")
|
|
wme7 = WME("B3", "left-of", "B4")
|
|
wme8 = WME("B3", "on", "table")
|
|
wme9 = WME("B3", "color", "red")
|
|
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
network.add_wme(wme3)
|
|
network.add_wme(wme4)
|
|
network.add_wme(wme5)
|
|
network.add_wme(wme6)
|
|
network.add_wme(wme7)
|
|
network.add_wme(wme8)
|
|
network.add_wme(wme9)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert matches[0].token.wmes == [wme2, wme7, None]
|
|
|
|
am1 = network.build_or_share_alpha_memory(c1)
|
|
am2 = network.build_or_share_alpha_memory(c2)
|
|
am3 = network.build_or_share_alpha_memory(c3)
|
|
assert am1.items == [wme1, wme2, wme4, wme8]
|
|
assert am2.items == [wme5, wme7]
|
|
assert am3.items == [wme3, wme9] # same than positif condition
|
|
|
|
dummy_join = am1.successors[0]
|
|
join_on_value_y = am2.successors[0]
|
|
join_on_value_z = am3.successors[0]
|
|
match_c0 = dummy_join.children[0]
|
|
match_c0c1 = join_on_value_y.children[0]
|
|
match_c0c1c2 = join_on_value_z.children[0]
|
|
|
|
assert [t.wmes for t in match_c0.items] == [[wme1], [wme2], [wme4], [wme8]]
|
|
assert [t.wmes for t in match_c0c1.items] == [[wme1, wme5], [wme2, wme7]]
|
|
assert type(join_on_value_z) == NegativeNode
|
|
assert [t.wmes for t in match_c0c1c2.items] == [[wme2, wme7, None]]
|
|
|
|
def test_i_can_remove_a_rule(self):
|
|
network = ReteNetwork()
|
|
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]))
|
|
|
|
network.add_rule(rule1)
|
|
network.remove_rule(rule1)
|
|
|
|
assert len(network.pnodes) == 0
|
|
assert len(network.alpha_hash) == 0
|
|
assert len(network.rules) == 0
|
|
# assert len(network.attributes_by_id) == 0
|
|
# assert len(network.default_attributes) == 0
|
|
|
|
def test_i_can_remove_a_rule_when_it_shares_p_node(self):
|
|
network = ReteNetwork()
|
|
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule1")
|
|
rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]), name="rule2")
|
|
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
|
|
network.remove_rule(rule1)
|
|
|
|
assert len(network.pnodes) == 1
|
|
assert network.pnodes[0].rules == [rule2]
|
|
assert network.rules == {rule2}
|
|
|
|
# sanity
|
|
wme = WME("id", "attr", "value")
|
|
network.add_wme(wme)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule2]
|
|
assert matches[0].token.wmes == [wme]
|
|
|
|
@pytest.mark.skip("Need a better attribute management")
|
|
def test_i_can_manage_attributes_by_key_on_rule_removal(self):
|
|
network = ReteNetwork()
|
|
rule1 = RuleForTestingRete(AndConditions([Condition("id", "attr", "value")]))
|
|
rule2 = RuleForTestingRete(AndConditions([Condition("id", "attr", V("x"))]))
|
|
rule3 = RuleForTestingRete(AndConditions([Condition("id", "attr2", V("x"))]))
|
|
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
network.add_rule(rule3)
|
|
|
|
# sanity
|
|
assert network.attributes_by_id == {}
|
|
|
|
network.add_rule(rule1)
|
|
assert network.attributes_by_id == {}
|
|
|
|
network.add_rule(rule3)
|
|
assert network.attributes_by_id == {}
|
|
|
|
network.add_rule(rule2)
|
|
assert network.attributes_by_id == {}
|
|
|
|
@pytest.mark.skip("Need a better attribute management")
|
|
def test_i_can_manage_default_attributes_on_rule_removal(self):
|
|
network = ReteNetwork()
|
|
rule1 = RuleForTestingRete(AndConditions([Condition("id", V("x"), "value")]))
|
|
rule2 = RuleForTestingRete(AndConditions([Condition("id", V("y"), V("z"))]))
|
|
rule3 = RuleForTestingRete(AndConditions([Condition("id2", V("x"), V("y"))]))
|
|
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
network.add_rule(rule3)
|
|
|
|
# sanity
|
|
assert network.default_attributes == set()
|
|
|
|
network.add_rule(rule1)
|
|
assert network.default_attributes == set()
|
|
|
|
network.add_rule(rule3)
|
|
assert network.default_attributes == set()
|
|
|
|
network.add_rule(rule2)
|
|
assert network.default_attributes == set()
|
|
|
|
def test_i_can_add_an_empty_rule(self):
|
|
network = ReteNetwork()
|
|
|
|
rule = RuleForTestingRete(AndConditions([]))
|
|
network.add_rule(rule)
|
|
assert len(network.beta_root.children) == 1
|
|
|
|
network.remove_rule(rule)
|
|
assert len(network.beta_root.children) == 0
|
|
|
|
def test_i_can_match_when_duplicate_conditions(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V('x'), 'self', V('y'))
|
|
c2 = Condition(V('x'), 'color', 'red')
|
|
c3 = Condition(V('x'), 'color', 'red')
|
|
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
wme1 = WME('B1', 'self', 'B1')
|
|
wme2 = WME('B1', 'color', 'red')
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
|
|
assert len(list(network.matches)) == 1
|
|
|
|
def test_i_can_match_when_duplicate_conditions2(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V('x'), 'self', V('y'))
|
|
c2 = Condition(V('x'), 'color', 'red')
|
|
c3 = Condition(V('y'), 'color', 'red')
|
|
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
wme1 = WME('B1', 'self', 'B1')
|
|
wme2 = WME('B1', 'color', 'red')
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
|
|
assert len(list(network.matches)) == 1
|
|
|
|
def test_multiple_rules(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V('x'), 'on', V('y'))
|
|
c2 = Condition(V('y'), 'left-of', V('z'))
|
|
c3 = Condition(V('z'), 'color', 'red')
|
|
c4 = Condition(V('z'), 'on', 'table')
|
|
c5 = Condition(V('z'), 'left-of', 'B4')
|
|
|
|
rule1 = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
rule2 = RuleForTestingRete(AndConditions([c1, c2, c4, c5]))
|
|
|
|
network.add_rule(rule1)
|
|
network.add_rule(rule2)
|
|
|
|
wmes = [
|
|
WME('B1', 'on', 'B2'),
|
|
WME('B1', 'on', 'B3'),
|
|
WME('B1', 'color', 'red'),
|
|
WME('B2', 'on', 'table'),
|
|
WME('B2', 'left-of', 'B3'),
|
|
WME('B2', 'color', 'blue'),
|
|
WME('B3', 'left-of', 'B4'),
|
|
WME('B3', 'on', 'table'),
|
|
WME('B3', 'color', 'red'),
|
|
]
|
|
for wme in wmes:
|
|
network.add_wme(wme)
|
|
|
|
# add rule on the fly
|
|
rule3 = RuleForTestingRete(AndConditions([c1, c2, c4, c3]))
|
|
network.add_rule(rule3)
|
|
|
|
p_node1 = rule1.rete_p_nodes[0]
|
|
p_node2 = rule2.rete_p_nodes[0]
|
|
p_node3 = rule3.rete_p_nodes[0]
|
|
|
|
assert len(list(p_node1.activations)) == 1
|
|
assert len(list(p_node2.activations)) == 1
|
|
assert len(list(p_node3.activations)) == 1
|
|
assert list(p_node1.activations)[0].wmes == [wmes[0], wmes[4], wmes[8]]
|
|
assert list(p_node2.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[6]]
|
|
assert list(p_node3.activations)[0].wmes == [wmes[0], wmes[4], wmes[7], wmes[8]]
|
|
|
|
network.remove_rule(rule3)
|
|
assert len(list(p_node3.activations)) == 0
|
|
|
|
def test_exec_rule_is_added_to_rete_network_when_it_is_created(self):
|
|
sheerka, context, rule = self.init_test().with_exec_rules(("rule_name", "id.attr == 'value'", 'True')).unpack()
|
|
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
|
|
rete_network = evaluation_service.network
|
|
|
|
assert len(rete_network.pnodes) == 1
|
|
assert rete_network.pnodes[0].rules[0] == rule
|
|
assert rule.rete_net == rete_network
|
|
assert len(rule.rete_p_nodes) > 0
|
|
|
|
def test_format_rule_is_not_added_to_rete_network_when_it_is_created(self):
|
|
sheerka, context, rule = self.init_test().with_format_rules(
|
|
("rule_name", "id.attr == 'value'", 'True')).unpack()
|
|
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
|
|
rete_network = evaluation_service.network
|
|
|
|
assert len(rete_network.pnodes) == 0
|
|
assert rule.rete_net is None
|
|
assert len(rule.rete_p_nodes) == 0
|
|
|
|
def test_rule_is_removed_from_rete_network_when_it_is_deleted(self):
|
|
sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack()
|
|
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
|
|
rete_network = evaluation_service.network
|
|
|
|
sheerka.remove_rule(context, rule)
|
|
assert len(rete_network.pnodes) == 0
|
|
|
|
def test_rule_is_removed_from_rete_network_when_it_is_deleted_from_another_ontology(self):
|
|
sheerka, context, rule = self.init_test().with_exec_rules(("id.attr == 'value'", 'True')).unpack()
|
|
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
|
|
rete_network = evaluation_service.network
|
|
|
|
sheerka.push_ontology(context, "new ontology")
|
|
sheerka.remove_rule(context, rule)
|
|
assert len(rete_network.pnodes) == 0
|
|
|
|
@pytest.mark.skip
|
|
def test_rules_are_removed_upon_ontology_removal(self):
|
|
sheerka, context = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
evaluation_service = sheerka.services[SheerkaEvaluateRules.NAME]
|
|
rete_network = evaluation_service.network
|
|
|
|
sheerka.push_ontology(context, "Testing Rule removal 1")
|
|
rule = Rule(ACTION_TYPE_EXEC, "name", "id.attr = 'value'", "True")
|
|
rule = service.init_rule(context, rule)
|
|
service.create_new_rule(context, rule)
|
|
|
|
assert len(rete_network.pnodes) == 1
|
|
assert rete_network.pnodes[0].rules[0] == rule
|
|
|
|
sheerka.push_ontology(context, "Testing Rule removal 2")
|
|
rule2 = Rule(ACTION_TYPE_EXEC, "name2", "id2.attr2 = 'value2'", "True")
|
|
rule2 = service.init_rule(context, rule2)
|
|
service.create_new_rule(context, rule2)
|
|
|
|
assert len(rete_network.pnodes) == 2
|
|
assert rete_network.pnodes[0].rules[0] == rule
|
|
assert rete_network.pnodes[1].rules[0] == rule2
|
|
|
|
sheerka.pop_ontology(context)
|
|
assert len(rete_network.pnodes) == 1
|
|
assert rete_network.pnodes[0].rules[0] == rule
|
|
|
|
sheerka.pop_ontology(context)
|
|
assert len(rete_network.pnodes) == 0
|