Files
Sheerka-Old/tests/core/test_sheerkaResultManager.py
T
2021-01-11 15:36:03 +01:00

371 lines
15 KiB
Python

import types
import pytest
from core.builtin_concepts import BuiltinConcepts
from core.sheerka.ExecutionContext import ExecutionContext
from core.sheerka.SheerkaOntologyManager import SheerkaOntologyManager
from core.sheerka.services.SheerkaResultManager import SheerkaResultConcept
from sdp.sheerkaDataProvider import Event
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
class TestSheerkaResultManager(TestUsingMemoryBasedSheerka):
@classmethod
def setup_class(cls):
sheerka = cls().get_sheerka(cache_only=False, ontology="#TestSheerkaResultManager#")
sheerka.save_execution_context = True
cls.root_ontology_name = "#TestSheerkaResultManager#"
@classmethod
def teardown_class(cls):
cls.sheerka.pop_ontology()
cls.root_ontology_name = SheerkaOntologyManager.ROOT_ONTOLOGY_NAME
def init_service(self):
sheerka, context = self.init_test().unpack()
service = sheerka.services[SheerkaResultConcept.NAME]
return sheerka, context, service
def test_i_can_record_execution_contexts(self):
sheerka, context, service = self.init_service()
service.test_only_reset()
sheerka.evaluate_user_input("foo")
executions_contexts_in_cache = service.executions_contexts_cache.copy()
assert len(executions_contexts_in_cache) == 1
event_id = list(executions_contexts_in_cache.keys())[0]
execution_context = list(executions_contexts_in_cache.values())[0]
assert execution_context.desc == "Evaluating 'foo'"
executions_contexts_in_db = sheerka.om.current_sdp().load_result(event_id)
assert executions_contexts_in_db is not None
assert executions_contexts_in_db.desc == "Evaluating 'foo'"
assert service.last_execution is not None
assert service.last_execution.desc == "Evaluating 'foo'"
def test_i_can_get_the_result_by_digest_using_cache(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
digest = service.last_execution.event.get_digest()
res = sheerka.get_results_by_digest(context, digest)
# we get the result
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
assert res.digest == digest
assert isinstance(res.body, types.GeneratorType)
# the digest is correctly recorded
assert sheerka.load_var(SheerkaResultConcept.NAME, "digest") == digest
previous_results = list(res.body)
# Second test,
# I can get the result from the recorded digest
res = sheerka.get_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
assert res.digest == digest
assert isinstance(res.body, types.GeneratorType)
assert list(res.body) == previous_results
def test_i_can_get_result_by_digest_using_db(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
digest = service.last_execution.event.get_digest()
service.test_only_reset()
res = sheerka.get_results_by_digest(context, digest)
# we get the result
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
assert res.digest == digest
assert isinstance(res.body, types.GeneratorType)
# the digest is correctly recorded
assert sheerka.load_var(SheerkaResultConcept.NAME, "digest") == digest
previous_results = list(res.body)
# Second test,
# I can get the result from the recorded digest
res = sheerka.get_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
assert res.digest == digest
assert isinstance(res.body, types.GeneratorType)
assert list(res.body) == previous_results
def test_i_cannot_get_result_by_digest_if_the_digest_does_not_exist(self):
sheerka, context = self.init_test().unpack()
res = sheerka.get_results_by_digest(context, "fake digest")
assert sheerka.isinstance(res, BuiltinConcepts.NOT_FOUND)
assert res.body == {'digest': 'fake digest'}
def test_i_cannot_get_results_if_no_previous_digest(self):
sheerka, context = self.init_test().unpack()
assert sheerka.get_results(context) is None
def test_i_can_get_the_result_by_command_name(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
digest = service.last_execution.event.get_digest()
sheerka.evaluate_user_input("one") # another command
res = sheerka.get_results_by_command(context, "def concept")
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
assert res.digest == digest
assert isinstance(res.body, types.GeneratorType)
def test_i_can_get_the_result_by_command_name_using_db(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
sheerka.evaluate_user_input("one") # another command
service.test_only_reset()
res = sheerka.get_results_by_command(context, "def concept")
assert res.command == "def concept one as 1"
def test_i_can_get_the_result_by_command_when_not_in_the_same_page_size(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
sheerka.evaluate_user_input("one")
sheerka.evaluate_user_input("one")
sheerka.evaluate_user_input("one")
service = SheerkaResultConcept(sheerka, 2)
res = service.get_results_by_command(context, "def concept")
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
def test_i_cannot_get_results_from_command_if_the_command_does_not_exist(self):
sheerka, context, service = self.init_service()
service.test_only_reset()
res = sheerka.get_results_by_command(context, "def concept")
assert sheerka.isinstance(res, BuiltinConcepts.NOT_FOUND)
assert res.body == {'command': 'def concept'}
def test_i_cannot_get_result_from_command_if_the_command_does_not_exists_multiple_pages(self):
sheerka, context, service = self.init_service()
service.test_only_reset()
sheerka.evaluate_user_input("def concept one as 1")
sheerka.evaluate_user_input("one")
sheerka.evaluate_user_input("one")
sheerka.evaluate_user_input("one")
service = SheerkaResultConcept(sheerka, 2)
res = service.get_results_by_command(context, "fake command")
assert sheerka.isinstance(res, BuiltinConcepts.NOT_FOUND)
assert res.body == {'command': 'fake command'}
def test_i_can_get_last_results(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
sheerka.evaluate_user_input("one")
res = sheerka.get_last_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "one"
def test_i_can_get_last_results_using_db(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
sheerka.evaluate_user_input("one")
service.test_only_reset()
res = sheerka.get_last_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "one"
def test_i_can_get_last_results_when_event_with_no_result(self):
sheerka, context = self.init_test().unpack()
sheerka.om.save_event(Event("event 1"))
sheerka.om.save_event(Event("event 2"))
sheerka.om.save_event(Event("event 3"))
sheerka.evaluate_user_input("def concept one as 1")
res = sheerka.get_last_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.EXPLANATION)
assert res.command == "def concept one as 1"
def test_i_cannot_get_last_results_when_no_result(self):
sheerka, context, service = self.init_service()
service.test_only_reset()
res = sheerka.get_last_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.NOT_FOUND)
assert res.body == {'query': 'last'}
def test_i_cannot_get_last_results_when_only_events(self):
sheerka, context, service = self.init_service()
service.test_only_reset()
sheerka.om.save_event(Event("event 1"))
sheerka.om.save_event(Event("event 2"))
sheerka.om.save_event(Event("event 3"))
res = sheerka.get_last_results(context)
assert sheerka.isinstance(res, BuiltinConcepts.NOT_FOUND)
assert res.body == {'query': 'last'}
@pytest.mark.parametrize('kwargs', [
{"desc": "Evaluating 'def concept one as 1'"},
{"id": 0},
{"desc": "Evaluating 'def concept one as 1'", "id": 0}
])
def test_i_can_get_last_results_using_kwarg(self, kwargs):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
res = sheerka.get_last_results(context, **kwargs)
items = list(res.body)
assert len(items) == 1
for k, v in kwargs.items():
assert getattr(items[0], k) == v
@pytest.mark.parametrize('predicate, expected', [
('desc.startswith("Evaluating \'def concept one as 1\'")', {"desc": "Evaluating 'def concept one as 1'"}),
("id == 0", {"id": 0}),
("'def concept one as 1' in desc and id == 0", {"desc": "Evaluating 'def concept one as 1'", "id": 0})
])
def test_i_can_get_last_results_using_filter(self, predicate, expected):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
res = sheerka.get_last_results(context, filter=predicate)
items = list(res.body)
assert len(items) == 1
for k, v in expected.items():
assert getattr(items[0], k) == v
@pytest.mark.parametrize('predicate, expected', [
('desc.startswith("Evaluating \'def concept one as 1\'")', {"desc": "Evaluating 'def concept one as 1'"}),
("id == 0", {"id": 0}),
("'def concept one as 1' in desc and id == 0", {"desc": "Evaluating 'def concept one as 1'", "id": 0})
])
def test_i_can_get_last_results_using_the_first_argument_to_filter(self, predicate, expected):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
res = sheerka.get_last_results(context, predicate)
items = list(res.body)
assert len(items) == 1
for k, v in expected.items():
assert getattr(items[0], k) == v
@pytest.mark.parametrize('kwargs', [
{"desc": "Evaluating 'def concept one as 1'"},
{"id": 0},
{"desc": "Evaluating 'def concept one as 1'", "id": 0}
])
def test_i_can_get_results_using_kwarg(self, kwargs):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
sheerka.get_last_results(context)
res = sheerka.get_results(context, **kwargs)
items = list(res.body)
assert len(items) == 1
for k, v in kwargs.items():
assert getattr(items[0], k) == v
@pytest.mark.parametrize('predicate, expected', [
('desc.startswith("Evaluating \'def concept one as 1\'")', {"desc": "Evaluating 'def concept one as 1'"}),
("id == 0", {"id": 0}),
("'def concept one as 1' in desc and id == 0", {"desc": "Evaluating 'def concept one as 1'", "id": 0})
])
def test_i_can_get_results_using_filter(self, predicate, expected):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
sheerka.get_last_results(context)
res = sheerka.get_results(context, filter=predicate)
items = list(res.body)
assert len(items) == 1
for k, v in expected.items():
assert getattr(items[0], k) == v
@pytest.mark.parametrize('predicate, expected', [
('desc.startswith("Evaluating \'def concept one as 1\'")', {"desc": "Evaluating 'def concept one as 1'"}),
("id == 0", {"id": 0}),
("'def concept one as 1' in desc and id == 0", {"desc": "Evaluating 'def concept one as 1'", "id": 0})
])
def test_i_can_get_results_using_the_first_argument_to_filter(self, predicate, expected):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ExecutionContext.ids.clear()
sheerka.get_last_results(context)
res = sheerka.get_results(context, predicate)
items = list(res.body)
assert len(items) == 1
for k, v in expected.items():
assert getattr(items[0], k) == v
def test_i_can_manage_invalid_predicates(self):
predicate = {"filter": "a b c"}
with pytest.raises(SyntaxError):
SheerkaResultConcept.get_predicate(**predicate)
def test_i_can_get_last_return_value(self):
sheerka, context, service = self.init_service()
sheerka.evaluate_user_input("def concept one as 1")
ret = sheerka.get_last_return_value(context)
assert sheerka.isinstance(ret[0].body, BuiltinConcepts.NEW_CONCEPT)
sheerka.evaluate_user_input("eval one")
ret = sheerka.get_last_return_value(context)
assert ret[0].body == 1
def test_i_can_track_new_concept(self):
sheerka, context, service = self.init_service()
res = sheerka.evaluate_user_input("def concept one as 1")
new_concept = res[0].body.body
assert sheerka.get_last_created_concept(context) == new_concept
assert service.last_created_concept_id == new_concept.id
def test_last_created_concept_is_recorded(self):
sheerka, context, service = self.init_service()
res = sheerka.evaluate_user_input("def concept one as 1")
new_concept = res[0].body.body
service.test_only_reset()
service.initialize_deferred(context, False)
assert service.last_created_concept_id == new_concept.id
assert sheerka.get_last_created_concept(context) == new_concept