import ast import pytest from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, CMV from core.global_symbols import RULE_COMPARISON_CONTEXT from core.rule import Rule, ACTION_TYPE_PRINT, ACTION_TYPE_EXEC from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager, FormatRuleParser, \ FormatAstRawText, FormatAstVariable, FormatAstSequence, FormatAstFunction, \ FormatRuleSyntaxError, FormatAstList, UnexpectedEof, FormatAstColor, RulePredicate, FormatAstDict, FormatAstMulti from core.tokenizer import Token, TokenKind from parsers.BaseNodeParser import SourceCodeWithConceptNode, SourceCodeNode from parsers.PythonParser import PythonNode from tests.TestUsingFileBasedSheerka import TestUsingFileBasedSheerka from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka seq = FormatAstSequence raw = FormatAstRawText var = FormatAstVariable func = FormatAstFunction lst = FormatAstList PYTHON_EVALUATOR_NAME = "Python" CONCEPT_EVALUATOR_NAME = "Concept" class TestSheerkaRuleManager(TestUsingMemoryBasedSheerka): @pytest.mark.parametrize("action_type, cache_entry", [ (ACTION_TYPE_PRINT, SheerkaRuleManager.FORMAT_RULE_ENTRY), (ACTION_TYPE_EXEC, SheerkaRuleManager.EXEC_RULE_ENTRY), ]) def test_i_can_create_a_new_rule(self, action_type, cache_entry): sheerka, context = self.init_concepts(cache_only=False) previous_rules_number = sheerka.om.get_all(sheerka.OBJECTS_IDS_ENTRY)[SheerkaRuleManager.RULE_IDS] rule = Rule(action_type, "name", "True", "Hello world") res = sheerka.create_new_rule(context, rule) expected_id = str(previous_rules_number + 1) assert res.status assert sheerka.isinstance(res.body, BuiltinConcepts.NEW_RULE) created_rule = res.body.body assert created_rule.metadata.id == expected_id assert created_rule.metadata.name == "name" assert created_rule.metadata.predicate == "True" assert created_rule.metadata.action_type == action_type assert created_rule.metadata.action == "Hello world" # saved in cache assert len(sheerka.om.current_cache_manager().caches[cache_entry].cache) > 0 from_cache = sheerka.om.get(cache_entry, expected_id) assert from_cache.metadata.id == expected_id assert from_cache.metadata.name == "name" assert from_cache.metadata.predicate == "True" assert from_cache.metadata.action_type == action_type assert from_cache.metadata.action == "Hello world" # the rule is also saved by name by_name = sheerka.get_rule_by_name("name") assert by_name.metadata.id == expected_id assert by_name.metadata.name == "name" assert by_name.metadata.predicate == "True" assert by_name.metadata.action_type == action_type assert by_name.metadata.action == "Hello world" sheerka.om.commit(context) # saved in sdp from_sdp = sheerka.om.current_sdp().get(cache_entry, expected_id) assert from_sdp.metadata.id == expected_id assert from_sdp.metadata.name == "name" assert from_sdp.metadata.predicate == "True" assert from_sdp.metadata.action_type == action_type assert from_sdp.metadata.action == "Hello world" by_name = sheerka.om.current_sdp().get(SheerkaRuleManager.RULES_BY_NAME_ENTRY, "name") assert by_name.metadata.id == expected_id assert by_name.metadata.name == "name" assert by_name.metadata.predicate == "True" assert by_name.metadata.action_type == action_type assert by_name.metadata.action == "Hello world" def test_i_can_create_multiple_rules(self): sheerka, context = self.init_concepts(cache_only=False) previous_rules_number = len( sheerka.om.current_cache_manager().caches[SheerkaRuleManager.FORMAT_RULE_ENTRY].cache) sheerka.create_new_rule(context, Rule("print", "name1", "True", "Hello world")) sheerka.create_new_rule(context, Rule("print", "name2", "value() is __EXPLANATION", "list(value())")) assert len( sheerka.om.current_cache_manager().caches[ SheerkaRuleManager.FORMAT_RULE_ENTRY].cache) == 2 + previous_rules_number @pytest.mark.parametrize("text, expected", [ ("", FormatAstRawText("")), (" ", FormatAstRawText(" ")), (" raw text ", FormatAstRawText(" raw text ")), ("{variable}", FormatAstVariable("variable")), ("{ variable }", FormatAstVariable("variable")), (" xy {v} z", seq([raw(" xy "), var("v"), raw(" z")])), (r"\{variable}", FormatAstRawText("{variable}")), (r"\\{variable}", seq([raw("\\"), var("variable")])), (r"\\\{variable}", FormatAstRawText(r"\{variable}")), (r"{var1}{var2}", seq([var("var1"), var("var2")])), ("func()", FormatAstFunction("func", [], {})), ("func(a, 'string value', c)", FormatAstFunction("func", ["a", "'string value'", "c"], {})), ("func(a=10, b='string value')", FormatAstFunction("func", [], {"a": "10", "b": "'string value'"})), ("func('string value'='another string value')", func("func", [], {"'string value'": "'another string value'"})), ("red(' xy {v}')", FormatAstColor("red", seq([raw(" xy "), var("v")]))), ('blue(" xy {v}")', FormatAstColor("blue", seq([raw(" xy "), var("v")]))), ('green( xy )', FormatAstColor("green", var("xy"))), ('green()', FormatAstColor("green", raw(""))), ('green("")', FormatAstColor("green", raw(""))), ("list(var_name, 2, 'children')", FormatAstList("var_name", recurse_on="children", recursion_depth=2)), ("list(var_name, recursion_depth=2, recurse_on='children')", FormatAstList("var_name", recurse_on="children", recursion_depth=2)), ("list(var_name, recursion_depth=2, 'children')", FormatAstList("var_name", recursion_depth=2)), ("list(var_name, 'children', recursion_depth=2)", FormatAstList("var_name", recursion_depth=2)), ("list(var_name)", FormatAstList("var_name")), ("{obj.prop1.prop2[0].prop3['value']}", FormatAstVariable("obj.prop1.prop2[0].prop3['value']")), ("[{id}]", seq([raw("["), var("id"), raw("]")])), ("{variable:format}", FormatAstVariable("variable", "format")), ("{variable:3}", FormatAstVariable("variable", "3")), (r"\not_a_function(a={var})", seq([raw("not_a_function(a="), var("var"), raw(")")])), ("dict(var_name)", FormatAstDict("var_name")), ("dict(var_name, items_prop='props')", FormatAstDict("var_name", items_prop='props')), ("dict(var_name, debug=True)", FormatAstDict("var_name", debug=True, prefix="{", suffix="}")), ("multi(var_name)", FormatAstMulti("var_name")), ]) def test_i_can_parse_format_rule(self, text, expected): assert FormatRuleParser(text).parse() == expected @pytest.mark.parametrize("text, expected_error", [ ("{", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))), ("{var_name", UnexpectedEof("while parsing variable", Token(TokenKind.LBRACE, "{", 0, 1, 1))), ("{}", FormatRuleSyntaxError("variable name not found", None)), ("func(", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))), ("func(a,b,c", UnexpectedEof("while parsing function", Token(TokenKind.IDENTIFIER, "func", 0, 1, 1))), ("func(a,,c", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))), ("func(a,,c)", FormatRuleSyntaxError("no parameter found", Token(TokenKind.COMMA, ",", 7, 1, 8))), ("red(a,b)", FormatRuleSyntaxError("only one parameter supported", Token(TokenKind.IDENTIFIER, "b", 6, 1, 7))), ("red(a=b)", FormatRuleSyntaxError("keyword arguments are not supported", None)), ("red(xy {v})", FormatRuleSyntaxError("Invalid identifier", None)), ("list()", FormatRuleSyntaxError("variable name not found", None)), ("list(recursion_depth=2)", FormatRuleSyntaxError("variable name not found", None)), ("list(a,b,c,d,e)", FormatRuleSyntaxError("too many positional arguments", Token(TokenKind.IDENTIFIER, "e", 13, 1, 14))), ("list(a, recursion_depth=hello)", FormatRuleSyntaxError("'hello' is not numeric", None)), ("list(a, recursion_depth='hello')", FormatRuleSyntaxError("'recursion_depth' must be an integer", None)), ("dict()", FormatRuleSyntaxError("variable name not found", None)), ]) def test_i_cannot_parse_invalid_format(self, text, expected_error): parser = FormatRuleParser(text) parser.parse() assert parser.error_sink == expected_error @pytest.mark.parametrize("text", [ "a == 5", "foo == 5", "func() == 5", ]) def test_i_can_compile_predicate_when_pure_python(self, text): sheerka, context, *concepts = self.init_concepts("foo") service = sheerka.services[SheerkaRuleManager.NAME] ast_ = ast.parse(text, "", 'eval') expected_python_node = PythonNode(text, ast_) res = service.compile_when(context, "test", text) assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == PYTHON_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate) == expected_python_node assert res[0].concept is None @pytest.mark.parametrize("text, expected_type", [ ("isinstance(a, int)", SourceCodeWithConceptNode), ("func()", SourceCodeNode), ]) def test_i_can_compile_predicates_that_resolve_to_python(self, text, expected_type): sheerka, context, *concepts = self.init_concepts() service = sheerka.services[SheerkaRuleManager.NAME] ast_ = ast.parse(text, "", 'eval') expected_python_node = PythonNode(text, ast_) res = service.compile_when(context, "test", text) assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == PYTHON_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert isinstance(sheerka.objvalue(res[0].predicate), expected_type) assert sheerka.objvalue(res[0].predicate).python_node == expected_python_node assert res[0].concept is None def test_i_can_compile_predicate_when_python_and_concept(self): sheerka, context, *concepts = self.init_test().with_concepts(Concept("foo bar"), create_new=True).unpack() service = sheerka.services[SheerkaRuleManager.NAME] text = "foo bar == 5" ast_ = ast.parse("__C__foo0bar__1001__C__ == 5", "", 'eval') resolved_expected = PythonNode(text, ast_) res = service.compile_when(context, "test", text) assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == PYTHON_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate) == resolved_expected assert res[0].concept is None @pytest.mark.parametrize("text, expected_variables", [ ("cat is an animal", ["cat", "animal"]), ("a is an animal", ["a", "animal"]), ("cat is an b", ["cat", "b"]), ]) def test_i_can_compile_predicate_when_exact_concept(self, text, expected_variables): sheerka, context, *concepts = self.init_test().with_concepts( Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"), Concept("cat"), Concept("animal"), create_new=True ).unpack() service = sheerka.services[SheerkaRuleManager.NAME] expected = concepts[0] expected.get_metadata().variables = [('x', expected_variables[0]), ('y', expected_variables[1])] res = service.compile_when(context, "test", text) assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == CONCEPT_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate) == expected assert res[0].concept == expected @pytest.mark.parametrize("text, expected_variables", [ ("a cat is an animal", ["a cat", "animal"]), ("a cat is an b", ["a cat", "b"]), ]) def test_i_can_compile_predicate_when_sya_node_parser(self, text, expected_variables): sheerka, context, *concepts = self.init_test().with_concepts( Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"), Concept("a cat"), Concept("animal"), create_new=True ).unpack() service = sheerka.services[SheerkaRuleManager.NAME] expected = CMV(concepts[0], x=expected_variables[0], y=expected_variables[1]) res = service.compile_when(context, "test", text) assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == CONCEPT_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate)[0].concept == expected assert res[0].concept == expected def test_i_can_compile_predicate_when_bnf_node_parser(self): sheerka, context, *concepts = self.init_test().with_concepts( Concept("animal"), Concept("x is an y", pre="is_question()", definition="('cat'|'bird')=x 'is an' animal").def_var("x"), create_new=True ).unpack() service = sheerka.services[SheerkaRuleManager.NAME] expected = concepts[1] res = service.compile_when(context, "test", "cat is an animal") assert len(res) == 1 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == CONCEPT_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate)[0].concept == expected assert res[0].concept == expected def test_i_can_compile_predicate_when_multiple_choices(self): sheerka, context, *concepts = self.init_test().with_concepts( Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"), Concept("x is a y", pre="is_question()", body="isa(x, y)").def_var("x").def_var("y"), create_new=True ).unpack() service = sheerka.services[SheerkaRuleManager.NAME] res = service.compile_when(context, "test", "a is a b") assert len(res) == 2 assert isinstance(res[0], RulePredicate) assert res[0].evaluator == CONCEPT_EVALUATOR_NAME assert sheerka.isinstance(res[0].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[0].predicate)[0].concept == CMV(concepts[0], x="a", y="b") assert res[0].concept == CMV(concepts[0], x="a", y="b") assert isinstance(res[1], RulePredicate) assert res[1].evaluator == CONCEPT_EVALUATOR_NAME assert sheerka.isinstance(res[1].predicate, BuiltinConcepts.RETURN_VALUE) assert sheerka.objvalue(res[1].predicate)[0].concept == CMV(concepts[1], x="a", y="b") assert res[1].concept == CMV(concepts[1], x="a", y="b") def test_i_can_get_rule_priorities(self): sheerka, context, rule_true, rule_false = self.init_test().with_rules(("True", "True"), ("False", "False")).unpack() sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE, rule_true, rule_false, RULE_COMPARISON_CONTEXT) rules_from_cache = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY) assert rules_from_cache[rule_true.id].priority == 2 assert rules_from_cache[rule_false.id].priority == 1 def test_i_can_get_and_retrieve_rules_when_multiple_ontology_layers(self): sheerka, context, rule_true = self.init_test().with_rules(("true", "True", "True")).unpack() sheerka.push_ontology(context, "new ontology") rule_false = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "false", "False", "False")).body.body # All rules are visible assert sheerka.get_rule_by_id(rule_true.id) == rule_true assert sheerka.get_rule_by_id(rule_false.id) == rule_false sheerka.pop_ontology() assert sheerka.get_rule_by_id(rule_true.id) == rule_true assert not sheerka.is_known(sheerka.get_rule_by_id(rule_false.id)) def test_i_can_resolve_rule(self): sheerka, context, rule = self.init_test().with_rules(("my rule", "True", "True")).unpack() context.add_to_short_term_memory("x", rule.id) # direct access by id assert sheerka.resolve_rule(context, rule.id) == rule # direct access by token assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", rule.id), -1, -1, -1)) == rule # indirect access by token assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "x"), -1, -1, -1)) == rule # Simple returns the rule if id is resolved assert sheerka.resolve_rule(context, rule) == rule # look for the correct rule when id is unresolved unresolved = Rule(rule_id="x") unresolved.metadata.id_is_unresolved = True assert sheerka.resolve_rule(context, unresolved) == rule # look for the correct rule when id is unresolved unresolved = Rule(rule_id="y") unresolved.metadata.id_is_unresolved = True assert sheerka.resolve_rule(context, unresolved) is None # no error raised # I still can get the value when the indirection has the wrong type context.add_to_short_term_memory("z", int(rule.id)) assert sheerka.resolve_rule(context, Token(TokenKind.RULE, ("i_do_not_care", "z"), -1, -1, -1)) == rule unresolved = Rule(rule_id="z") unresolved.metadata.id_is_unresolved = True assert sheerka.resolve_rule(context, unresolved) == rule # @pytest.mark.skip # @pytest.mark.parametrize("text, expected", [ # ("cat is an animal", set()), # ("a is an animal", {"a"}), # ("a is an b", {"a", "b"}), # ("cat is an b", {"b"}), # ("a cat is an b", {"b"}), # # ("cat is a animal", set()), # ("a is a animal", {"a"}), # ("a is a b", {"a", "b"}), # ("cat is a b", {"b"}), # ("a cat is an b", {"b"}), # # ("a == 5", {"a"}), # ("isinstance(a, int)", {"a"}), # ("a cat == b", {"b"}) # ]) # def test_i_can_get_rules_variables(self, text, expected): # sheerka, context, *concepts = self.init_concepts( # Concept("x is a y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"), # Concept("x is a y", pre="is_question()", body="isa(x, y)").def_var("x").def_var("y"), # Concept("x is an y", pre="is_question()", body="isinstance(x, y)").def_var("x").def_var("y"), # Concept("cat"), # Concept("animal"), # Concept("a cat"), # create_new=True # ) # service = sheerka.services[SheerkaRuleManager.NAME] # # compiled = service.compile_when(context, "test", "a is a b") # # assert service.get_unknown_variables(compiled) == expected class TestSheerkaRuleManagerUsingFileBasedSheerka(TestUsingFileBasedSheerka): def test_rules_are_initialized_at_startup(self): sheerka, context, *rules = self.init_format_rules( Rule("print", "name1", "True", "Hello world"), Rule("print", "name2", "value() is __EXPLANATION", "list(value())") ) sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE, rules[0], rules[1], RULE_COMPARISON_CONTEXT) sheerka.om.commit(context) expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY) sheerka = self.new_sheerka_instance(False) # new instance rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY) assert len(rules_by_id) == len(expected_rules_by_id) for rule_id, rule in rules_by_id.items(): expected = expected_rules_by_id[rule_id] assert rule.metadata.action_type == expected.metadata.action_type assert rule.metadata.name == expected.metadata.name assert rule.metadata.predicate == expected.metadata.predicate assert rule.metadata.action == expected.metadata.action assert rule.metadata.id == expected.metadata.id assert rule.metadata.is_compiled == expected.metadata.is_compiled assert rule.metadata.is_enabled == expected.metadata.is_enabled assert rule.compiled_action == expected.compiled_action assert rule.compiled_predicate == expected.compiled_predicate assert rule.priority is not None assert rule.priority == expected.priority def test_rules_are_still_accessible_after_a_new_ontology_layer(self): sheerka, context, *rules = self.init_format_rules( Rule("print", "name1", "True", "Hello world"), Rule("print", "name2", "value() is __EXPLANATION", "list(value())") ) sheerka.set_is_greater_than(context, BuiltinConcepts.PRECEDENCE, rules[0], rules[1], RULE_COMPARISON_CONTEXT) sheerka.om.commit(context) expected_rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY) sheerka.push_ontology(context, "new ontology") rules_by_id = sheerka.om.get_all(SheerkaRuleManager.FORMAT_RULE_ENTRY) assert len(rules_by_id) == len(expected_rules_by_id) for rule_id, rule in rules_by_id.items(): expected = expected_rules_by_id[rule_id] assert rule.metadata.action_type == expected.metadata.action_type assert rule.metadata.name == expected.metadata.name assert rule.metadata.predicate == expected.metadata.predicate assert rule.metadata.action == expected.metadata.action assert rule.metadata.id == expected.metadata.id assert rule.metadata.is_compiled == expected.metadata.is_compiled assert rule.metadata.is_enabled == expected.metadata.is_enabled assert rule.compiled_action == expected.compiled_action assert rule.compiled_predicate == expected.compiled_predicate assert rule.priority is not None assert rule.priority == expected.priority