1365 lines
65 KiB
Python
1365 lines
65 KiB
Python
import ast
|
|
|
|
import pytest
|
|
|
|
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
|
|
from core.concept import Concept, DEFINITION_TYPE_DEF, DoNotResolve
|
|
from core.global_symbols import RULE_COMPARISON_CONTEXT, NotFound, EVENT_RULE_DELETED
|
|
from core.rule import Rule, ACTION_TYPE_PRINT, ACTION_TYPE_EXEC
|
|
from core.sheerka.Sheerka import RECOGNIZED_BY_ID, RECOGNIZED_BY_NAME
|
|
from core.sheerka.services.SheerkaExecute import ParserInput
|
|
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager, FormatRuleActionParser, \
|
|
FormatAstRawText, FormatAstVariable, FormatAstSequence, FormatAstFunction, \
|
|
FormatRuleSyntaxError, FormatAstList, UnexpectedEof, FormatAstColor, RuleCompiledPredicate, FormatAstDict, \
|
|
FormatAstMulti, \
|
|
PythonCodeEmitter, NoConditionFound, FormatAstNode, ReteConditionExprVisitor
|
|
from core.sheerka.services.sheerka_service import FailedToCompileError
|
|
from core.tokenizer import Token, TokenKind
|
|
from parsers.BaseNodeParser import SourceCodeWithConceptNode, SourceCodeNode
|
|
from parsers.BaseParser import ErrorSink
|
|
from parsers.ExpressionParser import ExpressionParser
|
|
from parsers.PythonParser import PythonNode
|
|
from sheerkarete.common import V
|
|
from sheerkarete.conditions import Condition, AndConditions
|
|
from sheerkarete.network import ReteNetwork
|
|
from tests.TestUsingFileBasedSheerka import TestUsingFileBasedSheerka
|
|
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
|
|
from tests.parsers.parsers_utils import CMV, CC, compare_with_test_object, get_test_obj, get_rete_conditions
|
|
|
|
seq = FormatAstSequence
|
|
raw = FormatAstRawText
|
|
var = FormatAstVariable
|
|
func = FormatAstFunction
|
|
lst = FormatAstList
|
|
|
|
PYTHON_EVALUATOR_NAME = "Python"
|
|
CONCEPT_EVALUATOR_NAME = "Concept"
|
|
|
|
|
|
class TestSheerkaRuleManager(TestUsingMemoryBasedSheerka):
|
|
|
|
@pytest.mark.parametrize("action_type, cache_entry", [
|
|
(ACTION_TYPE_PRINT, SheerkaRuleManager.FORMAT_RULE_ENTRY),
|
|
(ACTION_TYPE_EXEC, SheerkaRuleManager.EXEC_RULE_ENTRY),
|
|
])
|
|
def test_i_can_create_a_new_rule(self, action_type, cache_entry):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
previous_rules_number = sheerka.om.get_all(sheerka.OBJECTS_IDS_ENTRY)[SheerkaRuleManager.RULE_IDS]
|
|
|
|
rule = Rule(action_type, "name", "True", "Hello world")
|
|
|
|
res = sheerka.create_new_rule(context, rule)
|
|
expected_id = str(previous_rules_number + 1)
|
|
|
|
assert res.status
|
|
assert sheerka.isinstance(res.body, BuiltinConcepts.NEW_RULE)
|
|
|
|
created_rule = res.body.body
|
|
assert created_rule.metadata.id == expected_id
|
|
assert created_rule.metadata.name == "name"
|
|
assert created_rule.metadata.predicate == "True"
|
|
assert created_rule.metadata.action_type == action_type
|
|
assert created_rule.metadata.action == "Hello world"
|
|
|
|
# saved in cache
|
|
assert len(sheerka.om.current_cache_manager().caches[cache_entry].cache) > 0
|
|
from_cache = sheerka.om.get(cache_entry, expected_id)
|
|
assert from_cache.metadata.id == expected_id
|
|
assert from_cache.metadata.name == "name"
|
|
assert from_cache.metadata.predicate == "True"
|
|
assert from_cache.metadata.action_type == action_type
|
|
assert from_cache.metadata.action == "Hello world"
|
|
|
|
# the rule is also saved by name
|
|
by_name = sheerka.get_rule_by_name("name")
|
|
assert by_name.metadata.id == expected_id
|
|
assert by_name.metadata.name == "name"
|
|
assert by_name.metadata.predicate == "True"
|
|
assert by_name.metadata.action_type == action_type
|
|
assert by_name.metadata.action == "Hello world"
|
|
|
|
sheerka.om.commit(context)
|
|
|
|
# saved in sdp
|
|
from_sdp = sheerka.om.current_sdp().get(cache_entry, expected_id)
|
|
assert from_sdp.metadata.id == expected_id
|
|
assert from_sdp.metadata.name == "name"
|
|
assert from_sdp.metadata.predicate == "True"
|
|
assert from_sdp.metadata.action_type == action_type
|
|
assert from_sdp.metadata.action == "Hello world"
|
|
|
|
by_name = sheerka.om.current_sdp().get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, "name")
|
|
assert by_name.metadata.id == expected_id
|
|
assert by_name.metadata.name == "name"
|
|
assert by_name.metadata.predicate == "True"
|
|
assert by_name.metadata.action_type == action_type
|
|
assert by_name.metadata.action == "Hello world"
|
|
|
|
def test_i_can_create_multiple_rules(self):
|
|
sheerka, context = self.init_concepts(cache_only=False)
|
|
previous_rules_number = len(
|
|
sheerka.om.current_cache_manager().caches[SheerkaRuleManager.FORMAT_RULE_ENTRY].cache)
|
|
|
|
sheerka.create_new_rule(context, Rule("print", "name1", "True", "Hello world"))
|
|
sheerka.create_new_rule(context, Rule("print", "name2", "value() is __EXPLANATION", "list(value())"))
|
|
|
|
assert len(
|
|
sheerka.om.current_cache_manager().caches[
|
|
SheerkaRuleManager.FORMAT_RULE_ENTRY].cache) == 2 + previous_rules_number
|
|
|
|
@pytest.mark.parametrize("action_type, cache_entry", [
|
|
(ACTION_TYPE_PRINT, SheerkaRuleManager.FORMAT_RULE_ENTRY),
|
|
(ACTION_TYPE_EXEC, SheerkaRuleManager.EXEC_RULE_ENTRY),
|
|
])
|
|
def test_i_can_delete_a_rule(self, action_type, cache_entry):
|
|
sheerka, context, rule = self.init_test(cache_only=False).with_rules(
|
|
action_type,
|
|
Rule(action_type, "rule_name", "id.attr == 'value'", 'True')).unpack()
|
|
rule_to_delete = Rule(rule.compiled_action, rule_id=rule.id)
|
|
|
|
event_sink = []
|
|
self.sheerka.subscribe(EVENT_RULE_DELETED, lambda c, r: event_sink.append(r))
|
|
|
|
ret = sheerka.remove_rule(context, rule_to_delete)
|
|
|
|
assert ret.status
|
|
assert sheerka.om.get(cache_entry, rule.id) is NotFound
|
|
assert sheerka.om.get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, rule.id) is NotFound
|
|
|
|
sheerka.om.commit(context)
|
|
assert sheerka.om.current_sdp().get(cache_entry, rule.id)
|
|
assert sheerka.om.current_sdp().get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, rule.id)
|
|
|
|
assert event_sink == [rule]
|
|
|
|
def test_i_can_init_rule_with_a_exec_rule(self):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(ACTION_TYPE_EXEC, "name", "__input == 'hello world'", "'Hello back at you !'")
|
|
rule = service.init_rule(context, rule)
|
|
|
|
assert rule.metadata.is_compiled
|
|
assert rule.metadata.is_enabled
|
|
assert len(rule.compiled_predicates) == 1
|
|
assert len(rule.rete_disjunctions) == 1
|
|
assert sheerka.isinstance(rule.compiled_action, BuiltinConcepts.RETURN_VALUE)
|
|
assert rule.compiled_action.status
|
|
assert rule.error_sink is None
|
|
|
|
def test_i_can_init_rule_with_a_format_rule(self):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(ACTION_TYPE_PRINT, "name", "__input == 'hello world'", "Hello back at you !")
|
|
rule = service.init_rule(context, rule)
|
|
|
|
assert rule.metadata.is_compiled
|
|
assert rule.metadata.is_enabled
|
|
assert len(rule.compiled_predicates) == 1
|
|
assert len(rule.rete_disjunctions) == 1
|
|
assert isinstance(rule.compiled_action, FormatAstNode)
|
|
assert rule.error_sink is None
|
|
|
|
def test_i_do_not_init_rule_an_already_compiled_rule(self):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(ACTION_TYPE_EXEC, "name", "cannot build", "cannot compile either")
|
|
rule.metadata.is_compiled = True
|
|
|
|
rule = service.init_rule(context, rule)
|
|
|
|
assert rule.metadata.is_compiled
|
|
assert rule.error_sink is None # no error detected
|
|
|
|
@pytest.mark.parametrize("action_type, action", [
|
|
(ACTION_TYPE_EXEC, "'Hello back at you !'"),
|
|
(ACTION_TYPE_PRINT, "Hello back at you !"),
|
|
])
|
|
def test_init_rule_returns_errors_when_cannot_build_predicate(self, action_type, action):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(ACTION_TYPE_EXEC, "name", "cannot build", "'Hello back at you !'")
|
|
rule.metadata.is_enabled = True # it should be disabled
|
|
rule = service.init_rule(context, rule)
|
|
|
|
assert len(rule.error_sink["when"]) > 0
|
|
assert sheerka.is_error(rule.error_sink["when"][0])
|
|
assert "print" not in rule.error_sink
|
|
assert "then" not in rule.error_sink
|
|
assert rule.metadata.is_compiled
|
|
assert not rule.metadata.is_enabled
|
|
assert rule.compiled_predicates is None
|
|
assert rule.rete_disjunctions is None
|
|
|
|
@pytest.mark.parametrize("action_type, action", [
|
|
(ACTION_TYPE_EXEC, "cannot build action"),
|
|
(ACTION_TYPE_PRINT, "list("),
|
|
])
|
|
def test_init_rule_returns_error_when_cannot_build_action(self, action_type, action):
|
|
sheerka, context = self.init_test(cache_only=False).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(action_type, "name", "__input == 'hello world'", action)
|
|
rule.metadata.is_enabled = True # it should be disabled
|
|
rule = service.init_rule(context, rule)
|
|
|
|
other_action_type = ACTION_TYPE_PRINT if action_type == ACTION_TYPE_EXEC else ACTION_TYPE_EXEC
|
|
|
|
assert sheerka.is_error(rule.error_sink[action_type])
|
|
assert other_action_type not in rule.error_sink
|
|
assert "when" not in rule.error_sink
|
|
assert rule.metadata.is_compiled
|
|
assert not rule.metadata.is_enabled
|
|
assert rule.compiled_action is None
|
|
|
|
@pytest.mark.parametrize("text, expected", [
|
|
("", FormatAstRawText("")),
|
|
(" ", FormatAstRawText(" ")),
|
|
(" raw text ", FormatAstRawText(" raw text ")),
|
|
("{variable}", FormatAstVariable("variable")),
|
|
("{ variable }", FormatAstVariable("variable")),
|
|
(" xy {v} z", seq([raw(" xy "), var("v"), raw(" z")])),
|
|
(r"\{variable}", FormatAstRawText("{variable}")),
|
|
(r"\\{variable}", seq([raw("\\"), var("variable")])),
|
|
(r"\\\{variable}", FormatAstRawText(r"\{variable}")),
|
|
(r"{var1}{var2}", seq([var("var1"), var("var2")])),
|
|
("func()", FormatAstFunction("func", [], {})),
|
|
("func(a, 'string value', c)", FormatAstFunction("func", ["a", "'string value'", "c"], {})),
|
|
("func(a=10, b='string value')", FormatAstFunction("func", [], {"a": "10", "b": "'string value'"})),
|
|
("func('string value'='another string value')", func("func", [], {"'string value'": "'another string value'"})),
|
|
("red(' xy {v}')", FormatAstColor("red", seq([raw(" xy "), var("v")]))),
|
|
('blue(" xy {v}")', FormatAstColor("blue", seq([raw(" xy "), var("v")]))),
|
|
('green( xy )', FormatAstColor("green", var("xy"))),
|
|
('green()', FormatAstColor("green", raw(""))),
|
|
('green("")', FormatAstColor("green", raw(""))),
|
|
("list(var_name, 2, 'children')", FormatAstList("var_name", recurse_on="children", recursion_depth=2)),
|
|
("list(var_name, recursion_depth=2, recurse_on='children')", FormatAstList("var_name",
|
|
recurse_on="children",
|
|
recursion_depth=2)),
|
|
("list(var_name, recursion_depth=2, 'children')", FormatAstList("var_name", recursion_depth=2)),
|
|
("list(var_name, 'children', recursion_depth=2)", FormatAstList("var_name", recursion_depth=2)),
|
|
("list(var_name)", FormatAstList("var_name")),
|
|
("{obj.prop1.prop2[0].prop3['value']}", FormatAstVariable("obj.prop1.prop2[0].prop3['value']")),
|
|
("[{id}]", seq([raw("["), var("id"), raw("]")])),
|
|
("{variable:format}", FormatAstVariable("variable", "format")),
|
|
("{variable:3}", FormatAstVariable("variable", "3")),
|
|
(r"\not_a_function(a={var})", seq([raw("not_a_function(a="), var("var"), raw(")")])),
|
|
("dict(var_name)", FormatAstDict("var_name")),
|
|
("dict(var_name, items_prop='props')", FormatAstDict("var_name", items_prop='props')),
|
|
("dict(var_name, debug=True)", FormatAstDict("var_name", debug=True, prefix="{", suffix="}")),
|
|
("multi(var_name)", FormatAstMulti("var_name")),
|
|
])
|
|
def test_i_can_parse_format_rule(self, text, expected):
|
|
assert FormatRuleActionParser(text).parse() == expected
|
|
|
|
@pytest.mark.parametrize("text, expected_error", [
|
|
("{", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))),
|
|
("{var_name", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))),
|
|
("{}", FormatRuleSyntaxError("variable name not found", None)),
|
|
("func(", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))),
|
|
("func(a,b,c", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))),
|
|
("func(a,,c", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))),
|
|
("func(a,,c)", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))),
|
|
("red(a,b)", FormatRuleSyntaxError("only one parameter supported", Token(TokenKind.IDENTIFIER, "b", 6, 1, 7))),
|
|
("red(a=b)", FormatRuleSyntaxError("keyword arguments are not supported", None)),
|
|
("red(xy {v})", FormatRuleSyntaxError("Invalid identifier", None)),
|
|
("list()", FormatRuleSyntaxError("variable name not found", None)),
|
|
("list(recursion_depth=2)", FormatRuleSyntaxError("variable name not found", None)),
|
|
("list(a,b,c,d,e)", FormatRuleSyntaxError("too many positional arguments",
|
|
Token(TokenKind.IDENTIFIER, "e", 13, 1, 14))),
|
|
("list(a, recursion_depth=hello)", FormatRuleSyntaxError("'hello' is not numeric", None)),
|
|
("list(a, recursion_depth='hello')", FormatRuleSyntaxError("'recursion_depth' must be an integer", None)),
|
|
("dict()", FormatRuleSyntaxError("variable name not found", None)),
|
|
])
|
|
def test_i_cannot_parse_invalid_format(self, text, expected_error):
|
|
parser = FormatRuleActionParser(text)
|
|
parser.parse()
|
|
|
|
assert parser.error_sink == expected_error
|
|
|
|
@pytest.mark.parametrize("text", [
|
|
"__action == 'some_action' and True",
|
|
"__action == 'some_action' and not a question",
|
|
"__action == 'some_action' and is a question",
|
|
])
|
|
def test_i_can_compile_predicate_when_action_is_provided(self, text):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
"a question",
|
|
Concept("is a question", pre='is_question()'),
|
|
create_new=True).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].action == 'some_action'
|
|
|
|
def test_i_can_compile___action_is_not_part_of_the_predicates(self):
|
|
sheerka, context, *concepts = self.init_concepts("foo")
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
ast_ = ast.parse("a == 5", "<source>", 'eval')
|
|
expected_python_node = PythonNode('a == 5', ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", "__action == 'some action' and a == 5")
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.parametrize("text", [
|
|
"a == 5",
|
|
"foo > 5",
|
|
"func() == 5",
|
|
"not a == 5",
|
|
"not foo > 5",
|
|
"not func() == 5",
|
|
])
|
|
def test_i_can_compile_predicate_when_pure_python(self, text):
|
|
sheerka, context, *concepts = self.init_concepts("foo")
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
ast_ = ast.parse(text, "<source>", 'eval')
|
|
expected_python_node = PythonNode(text, ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.parametrize("text, expected_type", [
|
|
("isinstance(a, int)", SourceCodeWithConceptNode),
|
|
("func()", SourceCodeNode),
|
|
("not isinstance(a, int)", PythonNode),
|
|
("not func()", PythonNode),
|
|
])
|
|
def test_i_can_compile_predicates_that_resolve_to_python(self, text, expected_type):
|
|
sheerka, context, *concepts = self.init_concepts()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
ast_ = ast.parse(text, "<source>", 'eval')
|
|
expected_python_node = PythonNode(text, ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert isinstance(sheerka.objvalue(res[0].predicate), expected_type)
|
|
assert sheerka.objvalue(res[0].predicate).get_python_node() == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.parametrize("text, expected_variables", [
|
|
("cat is an animal", ["cat", "animal"]),
|
|
("a is an animal", ["a", "animal"]),
|
|
("cat is an b", ["cat", "b"]),
|
|
])
|
|
def test_i_can_compile_predicate_when_exact_concept(self, text, expected_variables):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("cat"),
|
|
Concept("animal"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
expected = concepts[0]
|
|
expected.get_metadata().variables = [('x', expected_variables[0]), ('y', expected_variables[1])]
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected
|
|
assert res[0].concept == expected
|
|
|
|
@pytest.mark.parametrize("text, text_to_compile, expected_variables", [
|
|
("not cat is an animal", "not __C__00var0000is0an000var001__1001__C__", ["cat", "animal"]),
|
|
("not a is an animal", "not __C__00var0000is0an000var001__1001__C__", ["a", "animal"]),
|
|
("not cat is an b", "not __C__00var0000is0an000var001__1001__C__", ["cat", "b"]),
|
|
])
|
|
def test_i_can_compile_negative_predicate_when_exact_concept(self, text, text_to_compile, expected_variables):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("cat"),
|
|
Concept("animal"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
expected_concept = concepts[0]
|
|
expected_concept.get_metadata().variables = [('x', expected_variables[0]), ('y', expected_variables[1])]
|
|
ast_ = ast.parse(text_to_compile, "<source>", 'eval')
|
|
expected_python_node = PythonNode(text_to_compile, ast_)
|
|
expected_python_node.original_source = text
|
|
expected_python_node.objects = {"__C__00var0000is0an000var001__1001__C__": expected_concept}
|
|
res = service.compile_when(context, "test", text)
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert isinstance(sheerka.objvalue(res[0].predicate), PythonNode)
|
|
|
|
python_node = sheerka.objvalue(res[0].predicate).get_python_node()
|
|
assert python_node == expected_python_node
|
|
assert len(python_node.objects) == 1
|
|
assert python_node.objects["__C__00var0000is0an000var001__1001__C__"] == expected_concept
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.skip("Not managed yet")
|
|
@pytest.mark.parametrize("text, text_to_compile, expected_variables", [
|
|
("not cat is an animal", "__C__00var0000is0an000var001__1001__C__", ["not cat", "animal"])
|
|
])
|
|
def test_i_can_compile_negative_predicate_when_exact_concept(self, text, text_to_compile, expected_variables):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("cat"),
|
|
Concept("animal"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
expected_concept = concepts[0]
|
|
expected_concept.get_metadata().variables = [('x', expected_variables[0]), ('y', expected_variables[1])]
|
|
ast_ = ast.parse(text_to_compile, "<source>", 'eval')
|
|
expected_python_node = PythonNode(text_to_compile, ast_)
|
|
expected_python_node.original_source = text
|
|
expected_python_node.objects = {"__C__00var0000is0an000var001__1001__C__": expected_concept}
|
|
res = service.compile_when(context, "test", text)
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert isinstance(sheerka.objvalue(res[0].predicate), PythonNode)
|
|
|
|
python_node = sheerka.objvalue(res[0].predicate).get_python_node()
|
|
assert python_node == expected_python_node
|
|
assert len(python_node.objects) == 1
|
|
assert python_node.objects["__C__00var0000is0an000var001__1001__C__"] == expected_concept
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.parametrize("text, text_to_compile", [
|
|
("foo bar == 5", "__C__foo0bar__1001__C__ == 5"),
|
|
("not foo bar == 5", "not __C__foo0bar__1001__C__ == 5"),
|
|
])
|
|
def test_i_can_compile_predicate_when_python_and_concept(self, text, text_to_compile):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(Concept("foo bar"), create_new=True).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
ast_ = ast.parse(text_to_compile, "<source>", 'eval')
|
|
resolved_expected = PythonNode(text_to_compile, ast_, text)
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
|
|
python_node = sheerka.objvalue(res[0].predicate).get_python_node()
|
|
assert python_node == resolved_expected
|
|
assert python_node.objects == {'__C__foo0bar__1001__C__': concepts[0]}
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.parametrize("text, expected_variables", [
|
|
("a cat is an animal", ["a cat", "animal"]),
|
|
("a cat is an b", ["a cat", "b"]),
|
|
])
|
|
def test_i_can_compile_predicate_when_sya_node_parser(self, text, expected_variables):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("a cat"),
|
|
Concept("animal"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
expected = CMV(concepts[0], x=expected_variables[0], y=expected_variables[1])
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
compare_with_test_object(sheerka.objvalue(res[0].predicate)[0].concept, expected)
|
|
compare_with_test_object(res[0].concept, expected)
|
|
|
|
def test_i_can_compile_predicate_when_bnf_node_parser(self):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("animal"),
|
|
Concept("x is an y", pre="is_question()", definition="('cat'|'bird')=x 'is an' animal").def_var("x"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
expected = concepts[1]
|
|
|
|
compiled_result = service.compile_when(context, "test", "cat is an animal")
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate)[0].concept == expected
|
|
assert res[0].concept == expected
|
|
|
|
def test_i_can_compile_predicate_when_mix_of_concepts_and_python(self):
|
|
sheerka, context, animal, cat, dog, pet, is_a, is_an = self.init_test().with_concepts(
|
|
Concept("animal"),
|
|
Concept("a cat"),
|
|
Concept("dog"),
|
|
Concept("pet"),
|
|
Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("x is an y", pre="is_question()", definition="('cat'|'bird')=x 'is an' animal").def_var("x"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
text = "a cat is a pet and bird is an animal and x > 5 and dog is a pet"
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
to_compile = '__C__00var0000is0a000var001__1005__C__ and __C__00var0000is0an0y__1006__C__ and x > 5 and __C__00var0000is0a000var001__1005_1__C__'
|
|
ast_ = ast.parse(to_compile, "<source>", 'eval')
|
|
expected_python_node = PythonNode(to_compile, ast_, text)
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
python_node = res[0].predicate.body.body
|
|
assert python_node == expected_python_node
|
|
expected = {
|
|
"__C__00var0000is0a000var001__1005__C__": CC(is_a, x=cat, y=pet),
|
|
"__C__00var0000is0an0y__1006__C__": CC(is_an, exclude_body=True, x=DoNotResolve("bird"), animal=animal),
|
|
"__C__00var0000is0a000var001__1005_1__C__": CMV(is_a, x="dog", y="pet"),
|
|
}
|
|
transformed = get_test_obj(python_node.objects, expected)
|
|
assert transformed == expected
|
|
|
|
@pytest.mark.parametrize("text", [
|
|
"a and not b",
|
|
"not b and a",
|
|
"__ret and not __ret.status",
|
|
])
|
|
def test_i_can_compile_negative_conjunctions_when_pure_python(self, text):
|
|
sheerka, context, *concepts = self.init_concepts("foo")
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
ast_ = ast.parse(text, "<source>", 'eval')
|
|
expected_python_node = PythonNode(text, ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
def test_i_can_compile_negative_conjunction_of_mix_of_concepts_and_python(self):
|
|
sheerka, context, animal, cat, dog, pet, is_a, is_an = self.init_test().with_concepts(
|
|
Concept("animal"),
|
|
Concept("a cat"),
|
|
Concept("dog"),
|
|
Concept("pet"),
|
|
Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("x is an y", pre="is_question()", definition="('cat'|'bird')=x 'is an' animal").def_var("x"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
text = "not a cat is a pet and not bird is an animal and not x > 5 and not dog is a pet"
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
to_compile = 'not __C__00var0000is0a000var001__1005__C__'
|
|
to_compile += ' and not __C__00var0000is0an0y__1006__C__'
|
|
to_compile += ' and not x > 5'
|
|
to_compile += ' and not __C__00var0000is0a000var001__1005_1__C__'
|
|
ast_ = ast.parse(to_compile, "<source>", 'eval')
|
|
expected_python_node = PythonNode(to_compile, ast_, text)
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
python_node = res[0].predicate.body.body
|
|
assert python_node == expected_python_node
|
|
compare_with_test_object(python_node.objects, {
|
|
"__C__00var0000is0a000var001__1005__C__": CC(is_a, x=cat, y=pet),
|
|
"__C__00var0000is0an0y__1006__C__": CC(is_an, exclude_body=True, x=DoNotResolve("bird"), animal=animal),
|
|
"__C__00var0000is0a000var001__1005_1__C__": CMV(is_a, x="dog", y="pet"),
|
|
})
|
|
|
|
def test_i_can_compile_predicate_when_multiple_choices(self):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("x is a y", pre="is_question()", body="isa(x, y)").def_var("x").def_var("y"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
compiled_result = service.compile_when(context, "test", "a is a b")
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 2
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
compare_with_test_object(sheerka.objvalue(res[0].predicate)[0].concept, CMV(concepts[0], x="a", y="b"))
|
|
compare_with_test_object(res[0].concept, CMV(concepts[0], x="a", y="b"))
|
|
|
|
assert isinstance(res[1], RuleCompiledPredicate)
|
|
assert res[1].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[1].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
compare_with_test_object(sheerka.objvalue(res[1].predicate)[0].concept, CMV(concepts[1], x="a", y="b"))
|
|
compare_with_test_object(res[1].concept, CMV(concepts[1], x="a", y="b"))
|
|
|
|
def test_i_can_compile_predicate_when_mix_and_multiple_choices(self):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("animal"),
|
|
Concept("a cat"),
|
|
Concept("dog"),
|
|
Concept("pet"),
|
|
Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"),
|
|
Concept("x is a y", pre="is_question()", body="isa(x, y)").def_var("x").def_var("y"),
|
|
Concept("x is an y", pre="is_question()", definition="('cat'|'bird')=x 'is an' animal").def_var("x"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
text = "__action == 'value' and a cat is a pet and bird is an animal and x > 5 and dog is a pet"
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 4
|
|
trimmed_source = "a cat is a pet and bird is an animal and x > 5 and dog is a pet"
|
|
|
|
current_res = res[0]
|
|
assert isinstance(current_res, RuleCompiledPredicate)
|
|
assert current_res.evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(current_res.predicate, BuiltinConcepts.RETURN_VALUE)
|
|
python_source = "__C__00var0000is0a000var001__1005__C__ and __C__00var0000is0an0y__1007__C__ and x > 5 and __C__00var0000is0a000var001__1005_1__C__"
|
|
ast_ = ast.parse(python_source, "<source>", 'eval')
|
|
resolved_expected = PythonNode(python_source, ast_, trimmed_source)
|
|
assert sheerka.objvalue(current_res.predicate) == resolved_expected
|
|
assert current_res.concept is None
|
|
|
|
current_res = res[1]
|
|
assert isinstance(current_res, RuleCompiledPredicate)
|
|
assert current_res.evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(current_res.predicate, BuiltinConcepts.RETURN_VALUE)
|
|
python_source = "__C__00var0000is0a000var001__1005__C__ and __C__00var0000is0an0y__1007__C__ and x > 5 and __C__00var0000is0a000var001__1006__C__"
|
|
ast_ = ast.parse(python_source, "<source>", 'eval')
|
|
resolved_expected = PythonNode(python_source, ast_, trimmed_source)
|
|
assert sheerka.objvalue(current_res.predicate) == resolved_expected
|
|
assert current_res.concept is None
|
|
|
|
current_res = res[2]
|
|
assert isinstance(current_res, RuleCompiledPredicate)
|
|
assert current_res.evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(current_res.predicate, BuiltinConcepts.RETURN_VALUE)
|
|
python_source = "__C__00var0000is0a000var001__1006__C__ and __C__00var0000is0an0y__1007__C__ and x > 5 and __C__00var0000is0a000var001__1005__C__"
|
|
ast_ = ast.parse(python_source, "<source>", 'eval')
|
|
resolved_expected = PythonNode(python_source, ast_, trimmed_source)
|
|
assert sheerka.objvalue(current_res.predicate) == resolved_expected
|
|
assert current_res.concept is None
|
|
|
|
current_res = res[3]
|
|
assert isinstance(current_res, RuleCompiledPredicate)
|
|
assert current_res.evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(current_res.predicate, BuiltinConcepts.RETURN_VALUE)
|
|
python_source = "__C__00var0000is0a000var001__1006__C__ and __C__00var0000is0an0y__1007__C__ and x > 5 and __C__00var0000is0a000var001__1006_1__C__"
|
|
ast_ = ast.parse(python_source, "<source>", 'eval')
|
|
resolved_expected = PythonNode(python_source, ast_, trimmed_source)
|
|
assert sheerka.objvalue(current_res.predicate) == resolved_expected
|
|
assert current_res.concept is None
|
|
|
|
@pytest.mark.parametrize("text, mode, compiled_text", [
|
|
("greetings", "eval", f"__ret.status and isinstance(__ret.body, Concept) and __ret.body.name == 'greetings'"),
|
|
("c:|1001:", "eval", f"__ret.status and isinstance(__ret.body, Concept) and __ret.body.id == '1001'"),
|
|
("hello 'there'", "eval",
|
|
f"__ret.status and isinstance(__ret.body, Concept) and __ret.body.key == 'hello __var__0' and __ret.body.get_value('a') == 'there'"),
|
|
("hello there", "exec",
|
|
f"__x_00__ = __ret.body.get_value('a')\n__ret.status and isinstance(__ret.body, Concept) and __ret.body.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.key == 'there'"),
|
|
("hello my friend", "exec",
|
|
f"__x_00__ = __ret.body.get_value('a')\n__ret.status and isinstance(__ret.body, Concept) and __ret.body.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.key == 'my friend'"),
|
|
])
|
|
def test_i_can_compile_predicate_when_concept_is_not_a_question(self, text, mode, compiled_text):
|
|
sheerka, context, greetings, there, my_friend = self.init_test().with_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
Concept("there"),
|
|
Concept("my friend"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
ast_ = ast.parse(compiled_text, "<source>", mode)
|
|
expected_python_node = PythonNode(compiled_text, ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
def test_i_can_compile_predicate_when_concept_is_not_a_question_and_involves_sheerka(self):
|
|
sheerka, context, greetings = self.init_test().with_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
compiled_text = "__x_00__ = __ret.body.get_value('a')\n"
|
|
compiled_text += "__ret.status"
|
|
compiled_text += " and isinstance(__ret.body, Concept) and __ret.body.key == 'hello __var__0'"
|
|
compiled_text += " and isinstance(__x_00__, Expando) and __x_00__.get_name() == 'sheerka'"
|
|
ast_ = ast.parse(compiled_text, "<source>", "exec")
|
|
expected_python_node = PythonNode(compiled_text, ast_)
|
|
|
|
compiled_result = service.compile_when(context, "test", "hello sheerka")
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == PYTHON_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected_python_node
|
|
assert res[0].concept is None
|
|
|
|
@pytest.mark.skip("Not managed yet")
|
|
def test_i_can_compile_when_concept_starts_with_not(self):
|
|
sheerka, context, *concepts = self.init_test().with_concepts(
|
|
Concept("not a cheesecake", pre="is_question()"),
|
|
create_new=True).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
text = "not a cheesecake"
|
|
expected = concepts[0]
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.compiled_predicates
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], RuleCompiledPredicate)
|
|
assert res[0].evaluator == CONCEPT_EVALUATOR_NAME
|
|
assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE)
|
|
assert sheerka.objvalue(res[0].predicate) == expected
|
|
assert res[0].concept == expected
|
|
|
|
def test_i_cannot_compile_when_concept_is_not_a_question_and_has_unknown_variable(self):
|
|
sheerka, context, greetings = self.init_test().with_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
create_new=True
|
|
).unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
with pytest.raises(FailedToCompileError) as ex:
|
|
service.compile_when(context, "test", "hello there")
|
|
|
|
assert sheerka.isinstance(ex.value.cause[0], BuiltinConcepts.CONCEPT_EVAL_ERROR)
|
|
|
|
@pytest.mark.parametrize("text, expected_error", [
|
|
("__action == 'some_action'", NoConditionFound())
|
|
])
|
|
def test_i_cannot_compile_when_error(self, text, expected_error):
|
|
sheerka, context = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
with pytest.raises(FailedToCompileError) as ex:
|
|
service.compile_when(context, "test", text)
|
|
|
|
assert ex.value.cause == [expected_error]
|
|
|
|
def test_i_can_get_rule_priorities(self):
|
|
sheerka, context, rule_true, rule_false = self.init_test().with_format_rules(("True", "True"),
|
|
("False", "False")).unpack()
|
|
|
|
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
|
|
rule_true,
|
|
rule_false,
|
|
RULE_COMPARISON_CONTEXT)
|
|
|
|
rules_from_cache = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
|
|
|
|
assert rules_from_cache[rule_true.id].priority == 2
|
|
assert rules_from_cache[rule_false.id].priority == 1
|
|
|
|
def test_i_can_get_and_retrieve_rules_when_multiple_ontology_layers(self):
|
|
sheerka, context, rule_true = self.init_test().with_format_rules(("true", "True", "True")).unpack()
|
|
|
|
sheerka.push_ontology(context, "new ontology")
|
|
rule_false = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "false", "False", "False")).body.body
|
|
|
|
# All rules are visible
|
|
assert sheerka.get_rule_by_id(rule_true.id) == rule_true
|
|
assert sheerka.get_rule_by_id(rule_false.id) == rule_false
|
|
|
|
sheerka.pop_ontology(context)
|
|
assert sheerka.get_rule_by_id(rule_true.id) == rule_true
|
|
assert not sheerka.is_known(sheerka.get_rule_by_id(rule_false.id))
|
|
|
|
def test_i_can_resolve_rule(self):
|
|
sheerka, context, rule = self.init_test().with_format_rules(("my rule", "True", "True")).unpack()
|
|
context.add_to_short_term_memory("x", rule.id)
|
|
|
|
# direct access by id
|
|
assert sheerka.resolve_rule(context, rule.id) == rule
|
|
|
|
# direct access by token
|
|
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", rule.id), -1, -1, -1)) == rule
|
|
|
|
# indirect access by token
|
|
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "x"), -1, -1, -1)) == rule
|
|
|
|
# Simple returns the rule if id is resolved
|
|
assert sheerka.resolve_rule(context, rule) == rule
|
|
|
|
# look for the correct rule when id is unresolved
|
|
unresolved = Rule(rule_id="x")
|
|
unresolved.metadata.id_is_unresolved = True
|
|
assert sheerka.resolve_rule(context, unresolved) == rule
|
|
|
|
# look for the correct rule when id is unresolved
|
|
unresolved = Rule(rule_id="y")
|
|
unresolved.metadata.id_is_unresolved = True
|
|
assert sheerka.resolve_rule(context, unresolved) is None # no error raised
|
|
|
|
# I still can get the value when the indirection has the wrong type
|
|
context.add_to_short_term_memory("z", int(rule.id))
|
|
|
|
assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "z"), -1, -1, -1)) == rule
|
|
|
|
unresolved = Rule(rule_id="z")
|
|
unresolved.metadata.id_is_unresolved = True
|
|
assert sheerka.resolve_rule(context, unresolved) == rule
|
|
|
|
@pytest.mark.parametrize("obj, expected", [
|
|
("text value", "var == 'text value'"),
|
|
("text 'value'", '''var == "text 'value'"'''),
|
|
('text "value"', """var == 'text "value"'"""),
|
|
(10, "var == 10"),
|
|
(10.01, "var == 10.01"),
|
|
])
|
|
def test_i_can_test_python_code_emitter_for_basic_types(self, obj, expected):
|
|
sheerka, context = self.init_test().unpack()
|
|
|
|
assert PythonCodeEmitter(context).recognize(obj, "var").get_text() == expected
|
|
assert PythonCodeEmitter(context, "status").recognize(obj, "var").get_text() == "status and " + expected
|
|
|
|
@pytest.mark.parametrize("recognized_by, expected", [
|
|
(RECOGNIZED_BY_ID, "isinstance(var, Concept) and var.id == '1001'"),
|
|
(RECOGNIZED_BY_NAME, "isinstance(var, Concept) and var.name == 'greetings'"),
|
|
(None, "isinstance(var, Concept) and var.key == 'hello'"),
|
|
])
|
|
def test_i_can_test_python_code_emitter_for_concepts(self, recognized_by, expected):
|
|
sheerka, context, foo = self.init_concepts(
|
|
Concept("greetings", definition="hello", definition_type=DEFINITION_TYPE_DEF))
|
|
|
|
instance = sheerka.new_from_template(foo, foo.key)
|
|
if recognized_by:
|
|
instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, recognized_by)
|
|
|
|
assert PythonCodeEmitter(context).recognize(instance, "var").get_text() == expected
|
|
assert PythonCodeEmitter(context, "status").recognize(instance, "var").get_text() == "status and " + expected
|
|
|
|
def test_i_can_test_python_code_emitter_for_concepts_with_variable(self):
|
|
sheerka, context, greetings, little, foo, bar, and_concept = self.init_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
Concept("little x").def_var("x"),
|
|
"foo",
|
|
"bar",
|
|
Concept("a and b").def_var("a").def_var("b")
|
|
)
|
|
|
|
# variable is a string
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a='sheerka')
|
|
expected = "isinstance(var, Concept) and var.key == 'hello __var__0' and var.get_value('a') == 'sheerka'"
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
# variable is a concept recognized by id
|
|
foo_instance = sheerka.new_from_template(foo, foo.key)
|
|
foo_instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, RECOGNIZED_BY_ID)
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
|
|
expected = """__x_00__ = var.get_value('a')
|
|
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.id == '1003'"""
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
# variable is a concept recognized by name
|
|
foo_instance = sheerka.new_from_template(foo, foo.key)
|
|
foo_instance.set_hint(BuiltinConcepts.RECOGNIZED_BY, RECOGNIZED_BY_NAME)
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
|
|
expected = """__x_00__ = var.get_value('a')
|
|
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.name == 'foo'"""
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
# variable is a concept recognized by value
|
|
foo_instance = sheerka.new_from_template(foo, foo.key)
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=foo_instance)
|
|
expected = """__x_00__ = var.get_value('a')
|
|
isinstance(var, Concept) and var.key == 'hello __var__0' and isinstance(__x_00__, Concept) and __x_00__.key == 'foo'"""
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
# variable is a concept witch has itself some variable
|
|
foo_instance = sheerka.new_from_template(foo, foo.key)
|
|
little_instance = sheerka.new_from_template(little, little.key, x=foo_instance)
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=little_instance)
|
|
expected = """__x_00__ = var.get_value('a')
|
|
__x_01__ = __x_00__.get_value('x')
|
|
isinstance(var, Concept) and var.key == 'hello __var__0'""" + \
|
|
" and isinstance(__x_00__, Concept) and __x_00__.key == 'little __var__0'" + \
|
|
" and isinstance(__x_01__, Concept) and __x_01__.key == 'foo'"""
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
# concept with multiple variables (which are themselves concepts)
|
|
foo_instance = sheerka.new_from_template(foo, foo.key)
|
|
bar_instance = sheerka.new_from_template(bar, bar.key)
|
|
little_instance = sheerka.new_from_template(little, little.key, x=foo_instance)
|
|
and_instance = sheerka.new_from_template(and_concept, and_concept.key, a=bar_instance, b=little_instance)
|
|
greetings_instance = sheerka.new_from_template(greetings, greetings.key, a=and_instance)
|
|
expected = """__x_00__ = var.get_value('a')
|
|
__x_01__ = __x_00__.get_value('a')
|
|
__x_02__ = __x_00__.get_value('b')
|
|
__x_03__ = __x_02__.get_value('x')
|
|
isinstance(var, Concept) and var.key == 'hello __var__0'""" + \
|
|
" and isinstance(__x_00__, Concept) and __x_00__.key == '__var__0 and __var__1'" + \
|
|
" and isinstance(__x_01__, Concept) and __x_01__.key == 'bar'" + \
|
|
" and isinstance(__x_02__, Concept) and __x_02__.key == 'little __var__0'" + \
|
|
" and isinstance(__x_03__, Concept) and __x_03__.key == 'foo'"
|
|
text = PythonCodeEmitter(context).recognize(greetings_instance, "var").get_text()
|
|
assert text == expected
|
|
|
|
def test_i_can_get_format_rules(self):
|
|
sheerka, context = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
sheerka.om.current_cache_manager().clear(service.FORMAT_RULE_ENTRY)
|
|
r1 = Rule(ACTION_TYPE_PRINT, "name 1", "True", "Hello world 1", priority=1)
|
|
r2 = Rule(ACTION_TYPE_PRINT, "name 2", "False", "Hello world 2", priority=3)
|
|
r3 = Rule(ACTION_TYPE_PRINT, "name 2", "None", "Hello world 3", priority=2)
|
|
sheerka.create_new_rule(context, r1)
|
|
sheerka.create_new_rule(context, r2)
|
|
sheerka.create_new_rule(context, r3)
|
|
|
|
res = sheerka.get_format_rules()
|
|
assert res == [r2, r3, r1]
|
|
|
|
def test_i_can_get_exec_rules(self):
|
|
sheerka, context = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
sheerka.om.current_cache_manager().clear(service.EXEC_RULE_ENTRY)
|
|
r1 = Rule(ACTION_TYPE_EXEC, "name 1", "True", "'Hello world 1'", priority=1)
|
|
r2 = Rule(ACTION_TYPE_EXEC, "name 2", "False", "'Hello world 2'", priority=3)
|
|
r3 = Rule(ACTION_TYPE_EXEC, "name 2", "None", "'Hello world 3'", priority=2)
|
|
sheerka.create_new_rule(context, r1)
|
|
sheerka.create_new_rule(context, r2)
|
|
sheerka.create_new_rule(context, r3)
|
|
|
|
res = sheerka.get_exec_rules()
|
|
assert res == [r2, r3, r1]
|
|
|
|
@pytest.mark.skip
|
|
def test_i_can_compile_rete_using_name(self):
|
|
sheerka, context, *concepts = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
text = "__ret"
|
|
|
|
compiled_result = service.compile_when(context, "test", text)
|
|
res = compiled_result.rete_disjunctions
|
|
|
|
assert len(res) == 1
|
|
assert isinstance(res[0], AndConditions)
|
|
assert res[0].conditions == [Condition(V("__x_00__"), "__name__", "__ret")]
|
|
|
|
def test_i_can_properly_copy_a_rule(self):
|
|
sheerka, context = self.init_test().unpack()
|
|
service = sheerka.services[SheerkaRuleManager.NAME]
|
|
|
|
rule = Rule(ACTION_TYPE_EXEC, "name", "id.attr == 'value'", 'test()')
|
|
service.init_rule(context, rule)
|
|
service.create_new_rule(context, rule)
|
|
|
|
clone = rule.__deepcopy__({})
|
|
for k, v in vars(rule).items():
|
|
assert getattr(clone, k) == getattr(rule, k)
|
|
|
|
@pytest.mark.parametrize("expression, expected_as_str", [
|
|
(
|
|
"__ret",
|
|
["#__x_00__|__name__|'__ret'"],
|
|
),
|
|
(
|
|
"__ret.status == True",
|
|
["#__x_00__|__name__|'__ret'", "#__x_00__|status|True"],
|
|
),
|
|
(
|
|
"__ret.status",
|
|
["#__x_00__|__name__|'__ret'", "#__x_00__|status|True"],
|
|
),
|
|
(
|
|
"__ret and __ret.status",
|
|
["#__x_00__|__name__|'__ret'", "#__x_00__|status|True"],
|
|
),
|
|
])
|
|
def test_i_can_get_rete_conditions(self, expression, expected_as_str):
|
|
sheerka, context = self.init_test().unpack()
|
|
parser = ExpressionParser()
|
|
expected = get_rete_conditions(*expected_as_str)
|
|
|
|
error_sink = ErrorSink()
|
|
parser_input = ParserInput(expression)
|
|
parser.reset_parser_input(parser_input, error_sink)
|
|
parsed = parser.parse_input(context, parser_input, error_sink)
|
|
|
|
visitor = ReteConditionExprVisitor(context)
|
|
conditions = visitor.get_conditions(parsed)
|
|
|
|
assert conditions == [expected]
|
|
|
|
# check against a Rete network
|
|
network = ReteNetwork()
|
|
rule = Rule("test", expression, None)
|
|
rule.metadata.id = 9999
|
|
rule.metadata.is_compiled = True
|
|
rule.metadata.is_enabled = True
|
|
rule.rete_disjunctions = conditions
|
|
network.add_rule(rule)
|
|
|
|
network.add_obj("__ret", ReturnValueConcept("Test", True, None))
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
|
|
@pytest.mark.parametrize("test_name, expression, variable_name, expected_as_str", [
|
|
(
|
|
"recognize by name",
|
|
"recognize(__ret.body, greetings)",
|
|
None,
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'"]
|
|
),
|
|
(
|
|
"recognize by id",
|
|
"recognize(__ret.body, c:|1001:)",
|
|
None,
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|id|'1001'"]
|
|
),
|
|
(
|
|
"recognize by name using c_str",
|
|
"recognize(__ret.body, c:greetings:)",
|
|
None,
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'"]
|
|
),
|
|
(
|
|
"recognize by name and add other conditions (str)",
|
|
"recognize(__ret.body, greetings) and __ret.body.a == 'my friend'",
|
|
"my friend",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'",
|
|
"#__x_01__|a|'my friend'"]
|
|
),
|
|
(
|
|
"recognize by name and add other conditions (sheerka)",
|
|
"recognize(__ret.body, greetings) and __ret.body.a == sheerka",
|
|
"sheerka",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'",
|
|
"#__x_01__|a|'__sheerka__'"]
|
|
),
|
|
(
|
|
"recognize by name and add other conditions (concept)",
|
|
"recognize(__ret.body, greetings) and __ret.body.a == foo",
|
|
"foo",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'",
|
|
"#__x_01__|a|#__x_02__",
|
|
"#__x_02__|__is_concept__|True",
|
|
"#__x_02__|key|'foo'"]
|
|
),
|
|
(
|
|
"recognize by instance",
|
|
"recognize(__ret.body, hello sheerka)",
|
|
"sheerka",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|key|'hello __var__0'",
|
|
"#__x_01__|a|'__sheerka__'"]
|
|
),
|
|
(
|
|
"recognize by instance",
|
|
"recognize(__ret.body, hello 'my friend')",
|
|
"my friend",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|key|'hello __var__0'",
|
|
"#__x_01__|a|'my friend'"]
|
|
),
|
|
(
|
|
"recognize by instance",
|
|
"recognize(__ret.body, hello foo)",
|
|
"foo",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|key|'hello __var__0'",
|
|
"#__x_01__|a|#__x_02__",
|
|
"#__x_02__|__is_concept__|True",
|
|
"#__x_02__|key|'foo'",
|
|
]
|
|
),
|
|
|
|
])
|
|
def test_i_can_get_rete_using_recognized_function(self, test_name, expression, variable_name, expected_as_str):
|
|
sheerka, context, greetings, foo = self.init_test().with_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
Concept("foo"),
|
|
).unpack()
|
|
parser = ExpressionParser()
|
|
expected = get_rete_conditions(*expected_as_str)
|
|
|
|
error_sink = ErrorSink()
|
|
parser_input = ParserInput(expression)
|
|
parser.reset_parser_input(parser_input, error_sink)
|
|
parsed = parser.parse_input(context, parser_input, error_sink)
|
|
|
|
visitor = ReteConditionExprVisitor(context)
|
|
conditions = visitor.get_conditions(parsed)
|
|
|
|
assert conditions == [expected]
|
|
|
|
# check against a Rete network
|
|
network = ReteNetwork()
|
|
rule = Rule("test", expression, None)
|
|
rule.metadata.id = 9999
|
|
rule.metadata.is_compiled = True
|
|
rule.metadata.is_enabled = True
|
|
rule.rete_disjunctions = conditions
|
|
network.add_rule(rule)
|
|
|
|
variable = foo if variable_name == "foo" else sheerka if variable_name == "sheerka" else variable_name
|
|
to_recognize = sheerka.new_from_template(greetings, greetings.key, a=variable)
|
|
network.add_obj("__ret", ReturnValueConcept("Test", True, to_recognize))
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
|
|
@pytest.mark.parametrize("expression, variable_name, expected_as_str", [
|
|
(
|
|
"greetings",
|
|
None,
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|name|'greetings'"]
|
|
),
|
|
(
|
|
"hello foo",
|
|
"foo",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|key|'hello __var__0'",
|
|
"#__x_01__|a|#__x_02__",
|
|
"#__x_02__|__is_concept__|True",
|
|
"#__x_02__|key|'foo'",
|
|
]
|
|
),
|
|
])
|
|
def test_i_can_get_rete_when_a_concept_is_recognized(self, expression, variable_name, expected_as_str):
|
|
sheerka, context, greetings, foo = self.init_test().with_concepts(
|
|
Concept("greetings", definition="hello a", definition_type=DEFINITION_TYPE_DEF).def_var("a"),
|
|
Concept("foo"),
|
|
).unpack()
|
|
parser = ExpressionParser()
|
|
expected = get_rete_conditions(*expected_as_str)
|
|
|
|
error_sink = ErrorSink()
|
|
parser_input = ParserInput(expression)
|
|
parser.reset_parser_input(parser_input, error_sink)
|
|
parsed = parser.parse_input(context, parser_input, error_sink)
|
|
|
|
visitor = ReteConditionExprVisitor(context)
|
|
conditions = visitor.get_conditions(parsed)
|
|
|
|
assert conditions == [expected]
|
|
|
|
# check against a Rete network
|
|
network = ReteNetwork()
|
|
rule = Rule("test", expression, None)
|
|
rule.metadata.id = 9999
|
|
rule.metadata.is_compiled = True
|
|
rule.metadata.is_enabled = True
|
|
rule.rete_disjunctions = conditions
|
|
network.add_rule(rule)
|
|
|
|
variable = foo if variable_name == "foo" else sheerka if variable_name == "sheerka" else variable_name
|
|
to_recognize = sheerka.new_from_template(greetings, greetings.key, a=variable)
|
|
network.add_obj("__ret", ReturnValueConcept("Test", True, to_recognize))
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
|
|
@pytest.mark.parametrize("expression, expected_as_str", [
|
|
(
|
|
"eval(__ret.body, 'foo' starts with 'f')",
|
|
["#__x_00__|__name__|'__ret'",
|
|
"#__x_00__|body|#__x_01__",
|
|
"#__x_01__|__is_concept__|True",
|
|
"#__x_01__|key|'__var__0 starts with __var__1'",
|
|
"#__x_01__|x|'foo'",
|
|
"#__x_01__|y|'f'",
|
|
"$eval(__x_01__, a, b, c)"]
|
|
),
|
|
])
|
|
def test_i_can_get_rete_conditions_using_eval_function(self, expression, expected_as_str):
|
|
sheerka, context, start_with = self.init_test().with_concepts(
|
|
Concept("x starts with y",
|
|
pre="is_question",
|
|
body="x.startswith(y)",
|
|
where="isinstance(x, str)").def_var("x").def_var("y"),
|
|
).unpack()
|
|
|
|
parser = ExpressionParser()
|
|
expected = get_rete_conditions(*expected_as_str)
|
|
|
|
error_sink = ErrorSink()
|
|
parser_input = ParserInput(expression)
|
|
parser.reset_parser_input(parser_input, error_sink)
|
|
parsed = parser.parse_input(context, parser_input, error_sink)
|
|
|
|
visitor = ReteConditionExprVisitor(context)
|
|
conditions = visitor.get_conditions(parsed)
|
|
|
|
assert conditions == [expected]
|
|
|
|
# check against a Rete network
|
|
network = ReteNetwork()
|
|
rule = Rule("test", expression, None)
|
|
rule.metadata.id = 9999
|
|
rule.metadata.is_compiled = True
|
|
rule.metadata.is_enabled = True
|
|
rule.rete_disjunctions = conditions
|
|
network.add_rule(rule)
|
|
|
|
to_recognize = sheerka.new_from_template(start_with, start_with.key, x="foo", y="f")
|
|
network.add_obj("__ret", ReturnValueConcept("Test", True, to_recognize))
|
|
matches = list(network.matches)
|
|
assert len(matches) == 1
|
|
|
|
class TestSheerkaRuleManagerUsingFileBasedSheerka(TestUsingFileBasedSheerka):
|
|
def test_rules_are_initialized_at_startup(self):
|
|
sheerka, context, *rules = self.init_test().with_rules(
|
|
None,
|
|
Rule("print", "name1", "True", "Hello world"),
|
|
Rule("print", "name2", "value() is __EXPLANATION", "list(value())"),
|
|
Rule("exec", "name3", "True", "'Hello world'"),
|
|
Rule("exec", "name4", "value() is __EXPLANATION", "list(value())"),
|
|
).unpack()
|
|
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
|
|
rules[0],
|
|
rules[1],
|
|
RULE_COMPARISON_CONTEXT)
|
|
sheerka.set_is_less_than(context, BuiltinConcepts.PRECEDENCE,
|
|
rules[2],
|
|
rules[3],
|
|
RULE_COMPARISON_CONTEXT)
|
|
|
|
sheerka.om.commit(context)
|
|
expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
|
|
expected_rules_by_id.update(sheerka.om.get_all(SheerkaRuleManager.EXEC_RULE_ENTRY))
|
|
|
|
sheerka = self.new_sheerka_instance(False) # new instance
|
|
rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
|
|
rules_by_id.update(sheerka.om.get_all(SheerkaRuleManager.EXEC_RULE_ENTRY))
|
|
|
|
assert len(rules_by_id) == len(expected_rules_by_id)
|
|
|
|
for rule_id, rule in rules_by_id.items():
|
|
expected = expected_rules_by_id[rule_id]
|
|
|
|
assert rule.metadata.action_type == expected.metadata.action_type
|
|
assert rule.metadata.name == expected.metadata.name
|
|
assert rule.metadata.predicate == expected.metadata.predicate
|
|
assert rule.metadata.action == expected.metadata.action
|
|
assert rule.metadata.id == expected.metadata.id
|
|
assert rule.metadata.is_compiled
|
|
assert rule.metadata.is_enabled
|
|
assert rule.compiled_action == expected.compiled_action
|
|
assert rule.compiled_predicates == expected.compiled_predicates
|
|
assert rule.priority is not None
|
|
assert rule.priority == expected.priority
|
|
|
|
def test_rules_are_still_accessible_after_a_new_ontology_layer(self):
|
|
sheerka, context, *rules = self.init_format_rules(
|
|
Rule("print", "name1", "True", "Hello world"),
|
|
Rule("print", "name2", "value() is __EXPLANATION", "list(value())")
|
|
)
|
|
sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE,
|
|
rules[0],
|
|
rules[1],
|
|
RULE_COMPARISON_CONTEXT)
|
|
|
|
sheerka.om.commit(context)
|
|
expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
|
|
|
|
sheerka.push_ontology(context, "new ontology")
|
|
rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY)
|
|
|
|
assert len(rules_by_id) == len(expected_rules_by_id)
|
|
|
|
for rule_id, rule in rules_by_id.items():
|
|
expected = expected_rules_by_id[rule_id]
|
|
|
|
assert rule.metadata.action_type == expected.metadata.action_type
|
|
assert rule.metadata.name == expected.metadata.name
|
|
assert rule.metadata.predicate == expected.metadata.predicate
|
|
assert rule.metadata.action == expected.metadata.action
|
|
assert rule.metadata.id == expected.metadata.id
|
|
assert rule.metadata.is_compiled == expected.metadata.is_compiled
|
|
assert rule.metadata.is_enabled == expected.metadata.is_enabled
|
|
assert rule.compiled_action == expected.compiled_action
|
|
assert rule.compiled_predicates == expected.compiled_predicates
|
|
assert rule.priority is not None
|
|
assert rule.priority == expected.priority
|