342 lines
12 KiB
Python
342 lines
12 KiB
Python
from sheerkarete.common import WME, V
|
|
from sheerkarete.conditions import NotEqualsCondition, AndConditions, Condition, NegatedCondition, \
|
|
NegatedConjunctiveConditions, FilterCondition, BindCondition
|
|
from sheerkarete.network import ReteNetwork
|
|
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
|
|
from tests.sheerkarete.RuleForTestingRete import RuleForTestingRete
|
|
|
|
|
|
class TestReteConditions(TestUsingMemoryBasedSheerka):
|
|
def test_i_can_manage_condition(self):
|
|
network = ReteNetwork()
|
|
condition = Condition("id", "attr", "value")
|
|
rule = RuleForTestingRete(AndConditions([condition]))
|
|
network.add_rule(rule)
|
|
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme = WME("id", "attr", "value")
|
|
network.add_wme(wme)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert matches[0].token.wmes == [wme]
|
|
|
|
# remove wme
|
|
network.remove_wme(wme)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_manage_not_equals_condition(self):
|
|
network = ReteNetwork()
|
|
|
|
# I can add the condition
|
|
cond1 = NotEqualsCondition("id", "value", 5)
|
|
rule1 = RuleForTestingRete(AndConditions([cond1]), name="first id != 5")
|
|
network.add_rule(rule1)
|
|
|
|
assert list(network.alpha_hash.keys()) == [("id", "value", "*")]
|
|
assert len(network.alpha_hash[("id", "value", "*")]) == 1
|
|
assert network.alpha_hash[("id", "value", "*")][0].condition == cond1
|
|
|
|
# Another rule with the same condition
|
|
rule2 = RuleForTestingRete(AndConditions([cond1]), name="second id != 5")
|
|
network.add_rule(rule2)
|
|
assert list(network.alpha_hash.keys()) == [("id", "value", "*")]
|
|
assert len(network.alpha_hash[("id", "value", "*")]) == 1
|
|
assert network.alpha_hash[("id", "value", "*")][0].condition == cond1
|
|
|
|
cond2 = NotEqualsCondition("id", "value", 6)
|
|
rule3 = RuleForTestingRete(AndConditions([cond2]), name="id != 6")
|
|
network.add_rule(rule3)
|
|
assert list(network.alpha_hash.keys()) == [("id", "value", "*")]
|
|
assert len(network.alpha_hash[("id", "value", "*")]) == 2
|
|
assert network.alpha_hash[("id", "value", "*")][0].condition == cond1
|
|
assert network.alpha_hash[("id", "value", "*")][1].condition == cond2
|
|
|
|
# I can match
|
|
wme1 = WME("id", "value", "1")
|
|
network.add_wme(wme1)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 2
|
|
assert matches[0].pnode.rules == [rule1, rule2]
|
|
assert matches[1].pnode.rules == [rule3]
|
|
|
|
# I can remove
|
|
network.remove_wme(wme1)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
# I can add a non match
|
|
wme2 = WME("id", "value", 5)
|
|
network.add_wme(wme2)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule3]
|
|
|
|
def test_i_can_manage_negative_condition(self):
|
|
network = ReteNetwork()
|
|
condition = NegatedCondition("id", "attr", "value")
|
|
rule = RuleForTestingRete(AndConditions([condition]))
|
|
network.add_rule(rule)
|
|
|
|
assert len(network.beta_root.children) == 1
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert matches[0].token.wmes == [None]
|
|
|
|
wme = WME("id", "attr", "value")
|
|
network.add_wme(wme)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 0
|
|
|
|
# remove wme
|
|
network.remove_wme(wme)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].pnode.rules == [rule]
|
|
assert matches[0].token.wmes == [None]
|
|
|
|
def test_i_can_manage_ncc(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition(V("x"), "is a", "light")
|
|
c2 = Condition(V("x"), "is", "red")
|
|
c3 = Condition(V("x"), "is", "blue")
|
|
rule = RuleForTestingRete(AndConditions([c1, NegatedConjunctiveConditions(c2, c3)]))
|
|
network.add_rule(rule)
|
|
|
|
wme1 = WME("x1", "is a", "light")
|
|
wme2 = WME("x1", "is", "blue")
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme1, None]
|
|
|
|
wme3 = WME("x1", "is", "red")
|
|
network.add_wme(wme3)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme4 = WME("x2", "is a", "light")
|
|
network.add_wme(wme4)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme4, None]
|
|
|
|
# remove a wme
|
|
network.remove_wme(wme2)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 2
|
|
assert matches[0].token.wmes == [wme4, None]
|
|
assert matches[1].token.wmes == [wme1, None]
|
|
|
|
def test_i_can_manage_ncc_when_already_inserted_elements(self):
|
|
network = ReteNetwork()
|
|
|
|
wme1 = WME("x1", "is a", "light")
|
|
wme2 = WME("x1", "is", "blue")
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
|
|
c1 = Condition(V("x"), "is a", "light")
|
|
c2 = Condition(V("x"), "is", "red")
|
|
c3 = Condition(V("x"), "is", "blue")
|
|
rule = RuleForTestingRete(AndConditions([c1, NegatedConjunctiveConditions(c2, c3)]))
|
|
network.add_rule(rule)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme1, None]
|
|
|
|
wme3 = WME("x1", "is", "red")
|
|
network.add_wme(wme3)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
# remove back
|
|
network.remove_wme(wme2)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme1, None]
|
|
|
|
def test_i_can_manage_ncc_when_already_inserted_elements2(self):
|
|
network = ReteNetwork()
|
|
|
|
wme1 = WME("x1", "is a", "light")
|
|
wme2 = WME("x1", "is", "blue")
|
|
wme3 = WME("x1", "is", "red")
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
network.add_wme(wme3)
|
|
|
|
c1 = Condition(V("x"), "is a", "light")
|
|
c2 = Condition(V("x"), "is", "red")
|
|
c3 = Condition(V("x"), "is", "blue")
|
|
rule = RuleForTestingRete(AndConditions([c1, NegatedConjunctiveConditions(c2, c3)]))
|
|
network.add_rule(rule)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
# remove a wme
|
|
network.remove_wme(wme3)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme1, None]
|
|
|
|
def test_i_can_manage_filter_condition(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition("value", "equals", V("x"))
|
|
c2 = Condition("value", "equals", V("y"))
|
|
c3 = FilterCondition(lambda x, y: x == y + 1)
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme1 = WME("value", "equals", 5)
|
|
wme2 = WME("value", "equals", 3)
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme3 = WME("value", "equals", 4)
|
|
network.add_wme(wme3)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 2
|
|
assert matches[0].token.wmes == [wme1, wme3]
|
|
assert matches[1].token.wmes == [wme3, wme2]
|
|
|
|
# remove a WME
|
|
network.remove_wme(wme1)
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme3, wme2]
|
|
|
|
def test_i_can_manage_filter_when_it_is_the_first_condition(self):
|
|
network = ReteNetwork()
|
|
c1 = FilterCondition(lambda: True)
|
|
rule = RuleForTestingRete(AndConditions([c1]))
|
|
network.add_rule(rule)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [None]
|
|
assert matches[0].token.binding == {}
|
|
|
|
# remove wme
|
|
network.remove_rule(rule)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_filter_already_inserted_elements(self):
|
|
network = ReteNetwork()
|
|
wme1 = WME("value", "equals", 5)
|
|
wme2 = WME("value", "equals", 3)
|
|
wme3 = WME("value", "equals", 4)
|
|
network.add_wme(wme1)
|
|
network.add_wme(wme2)
|
|
network.add_wme(wme3)
|
|
|
|
c1 = Condition("value", "equals", V("x"))
|
|
c2 = Condition("value", "equals", V("y"))
|
|
c3 = FilterCondition(lambda x, y: x == y + 1)
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, c3]))
|
|
network.add_rule(rule)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 2
|
|
assert matches[0].token.wmes == [wme3, wme2]
|
|
assert matches[1].token.wmes == [wme1, wme3]
|
|
|
|
def test_i_can_manage_binding_conditions(self):
|
|
network = ReteNetwork()
|
|
c1 = Condition("value", "equals", V("x"))
|
|
c2 = BindCondition(lambda x: x + 5, V("z"))
|
|
rule = RuleForTestingRete(AndConditions([c1, c2]))
|
|
network.add_rule(rule)
|
|
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme1 = WME("value", "equals", 1)
|
|
network.add_wme(wme1)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [wme1]
|
|
assert matches[0].token.binding == {V("z"): 6, V("x"): 1}
|
|
|
|
# remove wme
|
|
network.remove_wme(wme1)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_manage_bind_when_it_is_the_first_condition(self):
|
|
network = ReteNetwork()
|
|
c1 = BindCondition(lambda: 5, V("x"))
|
|
rule = RuleForTestingRete(AndConditions([c1]))
|
|
network.add_rule(rule)
|
|
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
assert matches[0].token.wmes == [None]
|
|
assert matches[0].token.binding == {V("x"): 5}
|
|
|
|
# remove wme
|
|
network.remove_rule(rule)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
def test_i_can_match_on_the_correct_variable_name(self):
|
|
network = ReteNetwork()
|
|
condition = Condition(V("x"), "attr", V("x"))
|
|
rule = RuleForTestingRete(AndConditions([condition]))
|
|
network.add_rule(rule)
|
|
|
|
wme = WME('a', 'attr', 'c')
|
|
network.add_wme(wme)
|
|
assert len(list(network.matches)) == 0
|
|
|
|
wme = WME('a', 'attr', 'a')
|
|
network.add_wme(wme)
|
|
assert len(list(network.matches)) == 1
|
|
|
|
def test_condition_test(self):
|
|
c0 = Condition(V('x'), 'color', 'red')
|
|
w0 = WME('B1', 'color', 'red')
|
|
w1 = WME('B1', 'color', 'blue')
|
|
assert c0.test(w0)
|
|
assert not c0.test(w1)
|
|
|
|
c1 = Condition(V('x'), 'is', V('x'))
|
|
w2 = WME('B1', 'is', 'B3')
|
|
assert not c1.test(w2)
|
|
|
|
def test_black_white(self):
|
|
# KSI 2021-02-07. I simply copy this test
|
|
|
|
net = ReteNetwork()
|
|
c1 = Condition(V('item'), 'cat', V('cid'))
|
|
c2 = Condition(V('item'), 'shop', V('sid'))
|
|
white = NegatedConjunctiveConditions(
|
|
NegatedCondition(V('item'), 'cat', '100'),
|
|
NegatedCondition(V('item'), 'cat', '101'),
|
|
NegatedCondition(V('item'), 'cat', '102'),
|
|
)
|
|
n1 = NegatedCondition(V('item'), 'shop', '1')
|
|
n2 = NegatedCondition(V('item'), 'shop', '2')
|
|
n3 = NegatedCondition(V('item'), 'shop', '3')
|
|
|
|
rule = RuleForTestingRete(AndConditions([c1, c2, white, n1, n2, n3]))
|
|
net.add_rule(rule)
|
|
|
|
wmes = [
|
|
WME('item:1', 'cat', '101'),
|
|
WME('item:1', 'shop', '4'),
|
|
WME('item:2', 'cat', '100'),
|
|
WME('item:2', 'shop', '1'),
|
|
]
|
|
for wme in wmes:
|
|
net.add_wme(wme)
|
|
|
|
p0 = rule.rete_p_nodes[0]
|
|
assert len(list(p0.activations)) == 1
|
|
assert list(p0.activations)[0].binding[V('item')] == 'item:1'
|