Files
Sheerka-Old/tests/core/test_SheerkaRuleManager.py
T
kodjo 1059ce25c5 Fixed #68: Implement SheerkaQL
Fixed #70: SheerkaFilterManager : Pipe functions
Fixed #71: SheerkaFilterManager : filter_objects
Fixed #75: SheerkaMemory: Enhance memory() to use the filtering capabilities
Fixed #76: SheerkaEvaluateConcept: Concepts that modify the state of the system must not be evaluated during question
2021-04-26 19:13:47 +02:00

677 lines
34 KiB
Python

import pytest
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
from core.concept import Concept, DEFINITION_TYPE_DEF
from core.global_symbols import RULE_COMPARISON_CONTEXT, NotFound, EVENT_RULE_DELETED
from core.rule import Rule, ACTION_TYPE_PRINT, ACTION_TYPE_EXEC
from core.sheerka.Sheerka import RECOGNIZED_BY_ID, RECOGNIZED_BY_NAME
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager, FormatRuleActionParser, \
FormatAstRawText, FormatAstVariable, FormatAstSequence, FormatAstFunction, \
FormatRuleSyntaxError, FormatAstList, UnexpectedEof, FormatAstColor, FormatAstDict, \
FormatAstMulti, \
PythonCodeEmitter, FormatAstNode, ReteConditionExprVisitor
from core.tokenizer import Token, TokenKind
from parsers.BaseParser import ErrorSink
from parsers.ExpressionParser import ExpressionParser
from sheerkarete.conditions import FilterCondition
from sheerkarete.network import ReteNetwork
from tests.TestUsingFileBasedSheerka import TestUsingFileBasedSheerka
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
from tests.parsers.parsers_utils import get_rete_conditions, NEGCOND, NCCOND
seq = FormatAstSequence
raw = FormatAstRawText
var = FormatAstVariable
func = FormatAstFunction
lst = FormatAstList
PYTHON_EVALUATOR_NAME = "Python"
CONCEPT_EVALUATOR_NAME = "Concept"
class TestSheerkaRuleManager(TestUsingMemoryBasedSheerka):
@pytest.mark.parametrize("action_type, cache_entry", [
(ACTION_TYPE_PRINT, SheerkaRuleManager.FORMAT_RULE_ENTRY),
(ACTION_TYPE_EXEC, SheerkaRuleManager.EXEC_RULE_ENTRY),
])
def test_i_can_create_a_new_rule(self, action_type, cache_entry):
sheerka, context = self.init_test(cache_only=False).unpack()
previous_rules_number = sheerka.om.get_all(sheerka.OBJECTS_IDS_ENTRY)[SheerkaRuleManager.RULE_IDS]
rule = Rule(action_type, "name", "True", "Hello world")
res = sheerka.create_new_rule(context, rule)
expected_id = str(previous_rules_number + 1)
assert res.status
assert sheerka.isinstance(res.body, BuiltinConcepts.NEW_RULE)
created_rule = res.body.body
assert created_rule.metadata.id == expected_id
assert created_rule.metadata.name == "name"
assert created_rule.metadata.predicate == "True"
assert created_rule.metadata.action_type == action_type
assert created_rule.metadata.action == "Hello world"
# saved in cache
assert len(sheerka.om.current_cache_manager().caches[cache_entry].cache) > 0
from_cache = sheerka.om.get(cache_entry, expected_id)
assert from_cache.metadata.id == expected_id
assert from_cache.metadata.name == "name"
assert from_cache.metadata.predicate == "True"
assert from_cache.metadata.action_type == action_type
assert from_cache.metadata.action == "Hello world"
# the rule is also saved by name
by_name = sheerka.get_rule_by_name("name")
assert by_name.metadata.id == expected_id
assert by_name.metadata.name == "name"
assert by_name.metadata.predicate == "True"
assert by_name.metadata.action_type == action_type
assert by_name.metadata.action == "Hello world"
sheerka.om.commit(context)
# saved in sdp
from_sdp = sheerka.om.current_sdp().get(cache_entry, expected_id)
assert from_sdp.metadata.id == expected_id
assert from_sdp.metadata.name == "name"
assert from_sdp.metadata.predicate == "True"
assert from_sdp.metadata.action_type == action_type
assert from_sdp.metadata.action == "Hello world"
by_name = sheerka.om.current_sdp().get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, "name")
assert by_name.metadata.id == expected_id
assert by_name.metadata.name == "name"
assert by_name.metadata.predicate == "True"
assert by_name.metadata.action_type == action_type
assert by_name.metadata.action == "Hello world"
def test_i_can_create_multiple_rules(self):
sheerka, context = self.init_concepts(cache_only=False)
previous_rules_number = len(
sheerka.om.current_cache_manager().caches[SheerkaRuleManager.FORMAT_RULE_ENTRY].cache)
sheerka.create_new_rule(context, Rule("print", "name1", "True", "Hello world"))
sheerka.create_new_rule(context, Rule("print", "name2", "value() is __EXPLANATION", "list(value())"))
assert len(
sheerka.om.current_cache_manager().caches[
SheerkaRuleManager.FORMAT_RULE_ENTRY].cache) == 2 + previous_rules_number
@pytest.mark.parametrize("action_type, cache_entry", [
(ACTION_TYPE_PRINT, SheerkaRuleManager.FORMAT_RULE_ENTRY),
(ACTION_TYPE_EXEC, SheerkaRuleManager.EXEC_RULE_ENTRY),
])
def test_i_can_delete_a_rule(self, action_type, cache_entry):
sheerka, context, rule = self.init_test(cache_only=False).with_rules(
action_type,
Rule(action_type, "rule_name", "id.attr == 'value'", 'True')).unpack()
rule_to_delete = Rule(rule.compiled_action, rule_id=rule.id)
event_sink = []
self.sheerka.subscribe(EVENT_RULE_DELETED, lambda c, r: event_sink.append(r))
ret = sheerka.remove_rule(context, rule_to_delete)
assert ret.status
assert sheerka.om.get(cache_entry, rule.id) is NotFound
assert sheerka.om.get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, rule.id) is NotFound
sheerka.om.commit(context)
assert sheerka.om.current_sdp().get(cache_entry, rule.id)
assert sheerka.om.current_sdp().get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, rule.id)
assert event_sink == [rule]
def test_i_can_init_rule_with_a_exec_rule(self):
sheerka, context = self.init_test(cache_only=False).unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(ACTION_TYPE_EXEC, "name", "__input == 'hello world'", "'Hello back at you !'")
rule = service.init_rule(context, rule)
assert rule.metadata.is_compiled
assert rule.metadata.is_enabled
assert len(rule.compiled_conditions) == 1
assert len(rule.rete_disjunctions) == 1
assert sheerka.isinstance(rule.compiled_action, BuiltinConcepts.RETURN_VALUE)
assert rule.compiled_action.status
assert rule.error_sink is None
def test_i_can_init_rule_with_a_format_rule(self):
sheerka, context = self.init_test(cache_only=False).unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(ACTION_TYPE_PRINT, "name", "__input == 'hello world'", "Hello back at you !")
rule = service.init_rule(context, rule)
assert rule.metadata.is_compiled
assert rule.metadata.is_enabled
assert len(rule.compiled_conditions) == 1
assert len(rule.rete_disjunctions) == 1
assert isinstance(rule.compiled_action, FormatAstNode)
assert rule.error_sink is None
def test_i_do_not_init_rule_an_already_compiled_rule(self):
sheerka, context = self.init_test(cache_only=False).unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(ACTION_TYPE_EXEC, "name", "cannot build", "cannot compile either")
rule.metadata.is_compiled = True
rule = service.init_rule(context, rule)
assert rule.metadata.is_compiled
assert rule.error_sink is None # no error detected
@pytest.mark.parametrize("action_type, action", [
(ACTION_TYPE_EXEC, "'Hello back at you !'"),
(ACTION_TYPE_PRINT, "Hello back at you !"),
])
def test_init_rule_returns_errors_when_cannot_build_predicate(self, action_type, action):
sheerka, context = self.init_test(cache_only=False).unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(ACTION_TYPE_EXEC, "name", "cannot build = False", "'Hello back at you !'")
rule.metadata.is_enabled = True # it should be disabled
rule = service.init_rule(context, rule)
assert len(rule.error_sink["when"]) > 0
assert sheerka.has_error(context, rule.error_sink["when"][0])
assert "print" not in rule.error_sink
assert "then" not in rule.error_sink
assert rule.metadata.is_compiled
assert not rule.metadata.is_enabled
assert rule.compiled_conditions is None
assert rule.rete_disjunctions is None
@pytest.mark.parametrize("action_type, action", [
(ACTION_TYPE_EXEC, "cannot build action"),
(ACTION_TYPE_PRINT, "list("),
])
def test_init_rule_returns_error_when_cannot_build_action(self, action_type, action):
sheerka, context = self.init_test(cache_only=False).unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(action_type, "name", "__input == 'hello world'", action)
rule.metadata.is_enabled = True # it should be disabled
rule = service.init_rule(context, rule)
other_action_type = ACTION_TYPE_PRINT if action_type == ACTION_TYPE_EXEC else ACTION_TYPE_EXEC
assert sheerka.has_error(context, rule.error_sink[action_type])
assert other_action_type not in rule.error_sink
assert "when" not in rule.error_sink
assert rule.metadata.is_compiled
assert not rule.metadata.is_enabled
assert rule.compiled_action is None
@pytest.mark.parametrize("text, expected", [
("", FormatAstRawText("")),
(" ", FormatAstRawText(" ")),
(" raw text ", FormatAstRawText(" raw text ")),
("{variable}", FormatAstVariable("variable")),
("{ variable }", FormatAstVariable("variable")),
(" xy {v} z", seq([raw(" xy "), var("v"), raw(" z")])),
(r"\{variable}", FormatAstRawText("{variable}")),
(r"\\{variable}", seq([raw("\\"), var("variable")])),
(r"\\\{variable}", FormatAstRawText(r"\{variable}")),
(r"{var1}{var2}", seq([var("var1"), var("var2")])),
("func()", FormatAstFunction("func", [], {})),
("func(a, 'string value', c)", FormatAstFunction("func", ["a", "'string value'", "c"], {})),
("func(a=10, b='string value')", FormatAstFunction("func", [], {"a": "10", "b": "'string value'"})),
("func('string value'='another string value')", func("func", [], {"'string value'": "'another string value'"})),
("red(' xy {v}')", FormatAstColor("red", seq([raw(" xy "), var("v")]))),
('blue(" xy {v}")', FormatAstColor("blue", seq([raw(" xy "), var("v")]))),
('green( xy )', FormatAstColor("green", var("xy"))),
('green()', FormatAstColor("green", raw(""))),
('green("")', FormatAstColor("green", raw(""))),
("list(var_name, 2, 'children')", FormatAstList("var_name", recurse_on="children", recursion_depth=2)),
("list(var_name, recursion_depth=2, recurse_on='children')", FormatAstList("var_name",
recurse_on="children",
recursion_depth=2)),
("list(var_name, recursion_depth=2, 'children')", FormatAstList("var_name", recursion_depth=2)),
("list(var_name, 'children', recursion_depth=2)", FormatAstList("var_name", recursion_depth=2)),
("list(var_name)", FormatAstList("var_name")),
("{obj.prop1.prop2[0].prop3['value']}", FormatAstVariable("obj.prop1.prop2[0].prop3['value']")),
("[{id}]", seq([raw("["), var("id"), raw("]")])),
("{variable:format}", FormatAstVariable("variable", "format")),
("{variable:3}", FormatAstVariable("variable", "3")),
(r"\not_a_function(a={var})", seq([raw("not_a_function(a="), var("var"), raw(")")])),
("dict(var_name)", FormatAstDict("var_name")),
("dict(var_name, items_prop='props')", FormatAstDict("var_name", items_prop='props')),
("dict(var_name, debug=True)", FormatAstDict("var_name", debug=True, prefix="{", suffix="}")),
("multi(var_name)", FormatAstMulti("var_name")),
])
def test_i_can_parse_format_rule(self, text, expected):
assert FormatRuleActionParser(text).parse() == expected
@pytest.mark.parametrize("text, expected_error", [
("{", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))),
("{var_name", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))),
("{}", FormatRuleSyntaxError("variable name not found", None)),
("func(", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))),
("func(a,b,c", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))),
("func(a,,c", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))),
("func(a,,c)", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))),
("red(a,b)", FormatRuleSyntaxError("only one parameter supported", Token(TokenKind.IDENTIFIER, "b", 6, 1, 7))),
("red(a=b)", FormatRuleSyntaxError("keyword arguments are not supported", None)),
("red(xy {v})", FormatRuleSyntaxError("Invalid identifier", None)),
("list()", FormatRuleSyntaxError("variable name not found", None)),
("list(recursion_depth=2)", FormatRuleSyntaxError("variable name not found", None)),
("list(a,b,c,d,e)", FormatRuleSyntaxError("too many positional arguments",
Token(TokenKind.IDENTIFIER, "e", 13, 1, 14))),
("list(a, recursion_depth=hello)", FormatRuleSyntaxError("'hello' is not numeric", None)),
("list(a, recursion_depth='hello')", FormatRuleSyntaxError("'recursion_depth' must be an integer", None)),
("dict()", FormatRuleSyntaxError("variable name not found", None)),
])
def test_i_cannot_parse_invalid_format(self, text, expected_error):
parser = FormatRuleActionParser(text)
parser.parse()
assert parser.error_sink == expected_error
def test_i_can_get_rule_priorities(self):
sheerka, context, rule_true, rule_false = self.init_test().with_format_rules(("True", "True"),
("False", "False")).unpack()
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
rule_true,
rule_false,
RULE_COMPARISON_CONTEXT)
rules_from_cache = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
assert rules_from_cache[rule_true.id].priority == 2
assert rules_from_cache[rule_false.id].priority == 1
def test_i_can_get_and_retrieve_rules_when_multiple_ontology_layers(self):
sheerka, context, rule_true = self.init_test().with_format_rules(("true", "True", "True")).unpack()
sheerka.push_ontology(context, "new ontology")
rule_false = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "false", "False", "False")).body.body
# All rules are visible
assert sheerka.get_rule_by_id(rule_true.id) == rule_true
assert sheerka.get_rule_by_id(rule_false.id) == rule_false
sheerka.pop_ontology(context)
assert sheerka.get_rule_by_id(rule_true.id) == rule_true
assert not sheerka.is_known(sheerka.get_rule_by_id(rule_false.id))
def test_i_can_resolve_rule(self):
sheerka, context, rule = self.init_test().with_format_rules(("my rule", "True", "True")).unpack()
context.add_to_short_term_memory("x", rule.id)
# direct access by id
assert sheerka.resolve_rule(context, rule.id) == rule
# direct access by token
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", rule.id), -1, -1, -1)) == rule
# indirect access by token
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "x"), -1, -1, -1)) == rule
# Simple returns the rule if id is resolved
assert sheerka.resolve_rule(context, rule) == rule
# look for the correct rule when id is unresolved
unresolved = Rule(rule_id="x")
unresolved.metadata.id_is_unresolved = True
assert sheerka.resolve_rule(context, unresolved) == rule
# look for the correct rule when id is unresolved
unresolved = Rule(rule_id="y")
unresolved.metadata.id_is_unresolved = True
assert sheerka.resolve_rule(context, unresolved) is None # no error raised
# I still can get the value when the indirection has the wrong type
context.add_to_short_term_memory("z", int(rule.id))
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "z"), -1, -1, -1)) == rule
unresolved = Rule(rule_id="z")
unresolved.metadata.id_is_unresolved = True
assert sheerka.resolve_rule(context, unresolved) == rule
@pytest.mark.parametrize("obj, expected", [
("text value", "var == 'text value'"),
("text 'value'", '''var == "text 'value'"'''),
('text "value"', """var == 'text "value"'"""),
(10, "var == 10"),
(10.01, "var == 10.01"),
])
def test_i_can_test_python_code_emitter_for_basic_types(self, obj, expected):
sheerka, context = self.init_test().unpack()
assert PythonCodeEmitter(context).recognize(obj, "var").get_text() == expected
assert PythonCodeEmitter(context, "status").recognize(obj, "var").get_text() == "status and " + expected
@pytest.mark.parametrize("recognized_by, expected", [
(RECOGNIZED_BY_ID, "isinstance(var, Concept) and var.id == '1001'"),
(RECOGNIZED_BY_NAME, "isinstance(var, Concept) and var.name == 'greetings'"),
(None, "isinstance(var, Concept) and var.key == 'hello'"),
])
def test_i_can_test_python_code_emitter_for_concepts(self, recognized_by, expected):
sheerka, context, foo = self.init_concepts(
Concept("greetings", definition="hello", definition_type=DEFINITION_TYPE_DEF))
instance = sheerka.new_from_template(foo, foo.key)
if recognized_by:
instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, recognized_by)
assert PythonCodeEmitter(context).recognize(instance, "var").get_text() == expected
assert PythonCodeEmitter(context, "status").recognize(instance, "var").get_text() == "status and " + expected
def test_i_can_test_python_code_emitter_for_concepts_with_variable(self):
sheerka, context, greetings, little, foo, bar, and_concept = self.init_concepts(
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
Concept("little x").def_var("x"),
"foo",
"bar",
Concept("a and b").def_var("a").def_var("b")
)
# variable is a string
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a='sheerka')
expected = "isinstance(var, Concept) and var.key == 'hello __var__0' and var.get_value('a') == 'sheerka'"
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
# variable is a concept recognized by id
foo_instance = sheerka.new_from_template(foo, foo.key)
foo_instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, RECOGNIZED_BY_ID)
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
expected = """__x_00__ = var.get_value('a')
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.id == '1003'"""
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
# variable is a concept recognized by name
foo_instance = sheerka.new_from_template(foo, foo.key)
foo_instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, RECOGNIZED_BY_NAME)
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
expected = """__x_00__ = var.get_value('a')
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.name == 'foo'"""
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
# variable is a concept recognized by value
foo_instance = sheerka.new_from_template(foo, foo.key)
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
expected = """__x_00__ = var.get_value('a')
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.key == 'foo'"""
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
# variable is a concept witch has itself some variable
foo_instance = sheerka.new_from_template(foo, foo.key)
little_instance = sheerka.new_from_template(little, little.key, x=foo_instance)
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=little_instance)
expected = """__x_00__ = var.get_value('a')
__x_01__ = __x_00__.get_value('x')
isinstance(var, Concept) and var.key == 'hello __var__0'""" + \
" and isinstance(__x_00__, Concept) and __x_00__.key == 'little __var__0'" + \
" and isinstance(__x_01__, Concept) and __x_01__.key == 'foo'"""
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
# concept with multiple variables (which are themselves concepts)
foo_instance = sheerka.new_from_template(foo, foo.key)
bar_instance = sheerka.new_from_template(bar, bar.key)
little_instance = sheerka.new_from_template(little, little.key, x=foo_instance)
and_instance = sheerka.new_from_template(and_concept, and_concept.key, a=bar_instance, b=little_instance)
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=and_instance)
expected = """__x_00__ = var.get_value('a')
__x_01__ = __x_00__.get_value('a')
__x_02__ = __x_00__.get_value('b')
__x_03__ = __x_02__.get_value('x')
isinstance(var, Concept) and var.key == 'hello __var__0'""" + \
" and isinstance(__x_00__, Concept) and __x_00__.key == '__var__0 and __var__1'" + \
" and isinstance(__x_01__, Concept) and __x_01__.key == 'bar'" + \
" and isinstance(__x_02__, Concept) and __x_02__.key == 'little __var__0'" + \
" and isinstance(__x_03__, Concept) and __x_03__.key == 'foo'"
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
assert text == expected
def test_i_can_get_format_rules(self):
sheerka, context = self.init_test().unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
sheerka.om.current_cache_manager().clear(service.FORMAT_RULE_ENTRY)
r1 = Rule(ACTION_TYPE_PRINT, "name 1", "True", "Hello world 1", priority=1)
r2 = Rule(ACTION_TYPE_PRINT, "name 2", "False", "Hello world 2", priority=3)
r3 = Rule(ACTION_TYPE_PRINT, "name 2", "None", "Hello world 3", priority=2)
sheerka.create_new_rule(context, r1)
sheerka.create_new_rule(context, r2)
sheerka.create_new_rule(context, r3)
res = sheerka.get_format_rules()
assert res == [r2, r3, r1]
def test_i_can_get_exec_rules(self):
sheerka, context = self.init_test().unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
sheerka.om.current_cache_manager().clear(service.EXEC_RULE_ENTRY)
r1 = Rule(ACTION_TYPE_EXEC, "name 1", "True", "'Hello world 1'", priority=1)
r2 = Rule(ACTION_TYPE_EXEC, "name 2", "False", "'Hello world 2'", priority=3)
r3 = Rule(ACTION_TYPE_EXEC, "name 2", "None", "'Hello world 3'", priority=2)
sheerka.create_new_rule(context, r1)
sheerka.create_new_rule(context, r2)
sheerka.create_new_rule(context, r3)
res = sheerka.get_exec_rules()
assert res == [r2, r3, r1]
def test_i_can_properly_copy_a_rule(self):
sheerka, context = self.init_test().unpack()
service = sheerka.services[SheerkaRuleManager.NAME]
rule = Rule(ACTION_TYPE_EXEC, "name", "id.attr == 'value'", 'test()')
service.init_rule(context, rule)
service.create_new_rule(context, rule)
clone = rule.__deepcopy__({})
for k, v in vars(rule).items():
assert getattr(clone, k) == getattr(rule, k)
@pytest.mark.skip("No ready yet for SheerkaFilterCondition")
def test_i_can_get_rete_conditions_when_function(self):
sheerka, context, greetings = self.init_test().with_concepts(
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
).unpack()
expression = "isinstance(a, greetings)"
expected = get_rete_conditions(["#__x_00__|__name__|'a'",
FilterCondition(lambda a: isinstance(a, greetings))])
parser = ExpressionParser()
error_sink = ErrorSink()
parser_input = ParserInput(expression)
parser.reset_parser_input(parser_input, error_sink)
parsed = parser.parse_input(context, parser_input, error_sink)
visitor = ReteConditionExprVisitor(context)
conditions = visitor.get_conditions(parsed)
assert conditions == [expected]
# check against a Rete network
network = ReteNetwork()
rule = Rule("test", expression, None)
rule.metadata.id = 9999
rule.metadata.is_compiled = True
rule.metadata.is_enabled = True
rule.rete_disjunctions = conditions
network.add_rule(rule)
# variable = foo if variable_name == "foo" else sheerka if variable_name == "sheerka" else variable_name
# to_recognize = sheerka.new_from_template(greetings, greetings.key, a=variable)
# network.add_obj("__ret", ReturnValueConcept("Test", True, to_recognize))
# matches = list(network.matches)
# assert len(matches) == 1
@pytest.mark.parametrize("expression, bag_key, expected", [
("not __ret", "__other", [NEGCOND("#__x_00__|__name__|'__ret'")]),
("not not __ret", "__ret", ["#__x_00__|__name__|'__ret'"]),
("not __ret.status == True", "__ret", ["#__x_00__|__name__|'__ret'", NEGCOND("#__x_00__|status|True")]),
("not __ret.status", "__ret", [NEGCOND("#__x_00__|__name__|'__ret.status'")]),
("__ret and not __ret.status", "__ret",
["#__x_00__|__name__|'__ret'", NEGCOND("#__x_01__|__name__|'__ret.status'")]),
("not recognize(__ret.body, hello sheerka)", "__ret", ["#__x_00__|__name__|'__ret'",
NCCOND(["#__x_00__|body|#__x_01__",
"#__x_01__|__is_concept__|True",
"#__x_01__|key|'hello __var__0'",
"#__x_01__|a|'__sheerka__'"])]),
("__ret and not __error", "__ret", ["#__x_00__|__name__|'__ret'", NEGCOND("#__x_01__|__name__|'__error'")]),
("not recognize(self, hello sheerka)", "__ret", ["#__x_00__|__name__|'self'",
NCCOND(["#__x_00__|__is_concept__|True",
"#__x_00__|key|'hello __var__0'",
"#__x_00__|a|'__sheerka__'"])]),
])
def test_i_can_get_rete_using_not(self, expression, bag_key, expected):
sheerka, context, greetings, foo = self.init_test().with_concepts(
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
Concept("foo"),
).unpack()
parser = ExpressionParser()
expected_conditions = get_rete_conditions(*expected)
error_sink = ErrorSink()
parser_input = ParserInput(expression)
parser.reset_parser_input(parser_input, error_sink)
parsed = parser.parse_input(context, parser_input, error_sink)
visitor = ReteConditionExprVisitor(context)
conditions = visitor.get_conditions(parsed)
assert conditions == [expected_conditions]
@pytest.mark.skip("I am not sure yet of what I want to get")
@pytest.mark.parametrize("expression, expected_as_list_of_str", [
(
"eval(__ret.body, 'foo' starts with 'f')",
["#__x_00__|__name__|'__ret'",
"#__x_00__|body|#__x_01__",
"#__x_01__|__is_concept__|True",
"#__x_01__|key|'__var__0 starts with __var__1'",
"#__x_01__|x|'foo'",
"#__x_01__|y|'f'",
"$eval(__x_01__, a, b, c)"]
),
])
def test_i_can_get_rete_conditions_using_eval_function(self, expression, expected_as_list_of_str):
sheerka, context, start_with = self.init_test().with_concepts(
Concept("x starts with y",
pre="is_question",
body="x.startswith(y)",
where="isinstance(x, str)").def_var("x").def_var("y"),
).unpack()
parser = ExpressionParser()
expected = get_rete_conditions(*expected_as_list_of_str)
error_sink = ErrorSink()
parser_input = ParserInput(expression)
parser.reset_parser_input(parser_input, error_sink)
parsed = parser.parse_input(context, parser_input, error_sink)
visitor = ReteConditionExprVisitor(context)
conditions = visitor.get_conditions(parsed)
assert conditions == [expected]
# check against a Rete network
network = ReteNetwork()
rule = Rule("test", expression, None)
rule.metadata.id = 9999
rule.metadata.is_compiled = True
rule.metadata.is_enabled = True
rule.rete_disjunctions = conditions
network.add_rule(rule)
to_recognize = sheerka.new_from_template(start_with, start_with.key, x="foo", y="f")
network.add_obj("__ret", ReturnValueConcept("Test", True, to_recognize))
matches = list(network.matches)
assert len(matches) == 1
class TestSheerkaRuleManagerUsingFileBasedSheerka(TestUsingFileBasedSheerka):
def test_rules_are_initialized_at_startup(self):
sheerka, context, *rules = self.init_test().with_rules(
None,
Rule("print", "name1", "True", "Hello world"),
Rule("print", "name2", "__rets", "list(rets)"),
Rule("exec", "name3", "True", "'Hello world'"),
Rule("exec", "name4", "__ret.body", "list(__ret.body)"),
).unpack()
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
rules[0],
rules[1],
RULE_COMPARISON_CONTEXT)
sheerka.set_is_less_than(context, BuiltinConcepts.PRECEDENCE,
rules[2],
rules[3],
RULE_COMPARISON_CONTEXT)
sheerka.om.commit(context)
expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
expected_rules_by_id.update(sheerka.om.get_all(SheerkaRuleManager.EXEC_RULE_ENTRY))
sheerka = self.new_sheerka_instance(False) # new instance
rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
rules_by_id.update(sheerka.om.get_all(SheerkaRuleManager.EXEC_RULE_ENTRY))
assert len(rules_by_id) == len(expected_rules_by_id)
for rule_id, rule in rules_by_id.items():
expected = expected_rules_by_id[rule_id]
assert rule.metadata.action_type == expected.metadata.action_type
assert rule.metadata.name == expected.metadata.name
assert rule.metadata.predicate == expected.metadata.predicate
assert rule.metadata.action == expected.metadata.action
assert rule.metadata.id == expected.metadata.id
assert rule.metadata.is_compiled
assert rule.metadata.is_enabled
assert rule.compiled_action == expected.compiled_action
assert rule.compiled_conditions == expected.compiled_conditions
assert rule.priority is not None
assert rule.priority == expected.priority
def test_rules_are_still_accessible_after_a_new_ontology_layer(self):
sheerka, context, *rules = self.init_format_rules(
Rule("print", "name1", "True", "Hello world"),
Rule("print", "name2", "value() is __EXPLANATION", "list(value())")
)
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
rules[0],
rules[1],
RULE_COMPARISON_CONTEXT)
sheerka.om.commit(context)
expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
sheerka.push_ontology(context, "new ontology")
rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
assert len(rules_by_id) == len(expected_rules_by_id)
for rule_id, rule in rules_by_id.items():
expected = expected_rules_by_id[rule_id]
assert rule.metadata.action_type == expected.metadata.action_type
assert rule.metadata.name == expected.metadata.name
assert rule.metadata.predicate == expected.metadata.predicate
assert rule.metadata.action == expected.metadata.action
assert rule.metadata.id == expected.metadata.id
assert rule.metadata.is_compiled == expected.metadata.is_compiled
assert rule.metadata.is_enabled == expected.metadata.is_enabled
assert rule.compiled_action == expected.compiled_action
assert rule.compiled_conditions == expected.compiled_conditions
assert rule.priority is not None
assert rule.priority == expected.priority