import pytest from core.sheerka.services.SheerkaEventManager import SheerkaEventManager from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka def example_of_function(context): print(f"example_of_class_method. event={context.event.get_digest()}") def example_of_function_with_data(context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") class TestSheerkaEventManager(TestUsingMemoryBasedSheerka): def example_of_class_method(self, context): print(f"example_of_class_method. event={context.event.get_digest()}") @staticmethod def example_of_static_method(context): print(f"example_of_static_method. event={context.event.get_digest()}") def example_of_class_method_with_data(self, context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") @staticmethod def example_of_static_method_with_data(context, data): print(f"example_of_static_method. event={context.event.get_digest()}, {data=}") def test_i_can_subscribe_and_publish(self, capsys): sheerka, context = self.init_concepts() topic = "my topic" sheerka.subscribe(topic, self.example_of_class_method) sheerka.subscribe(topic, self.example_of_static_method) sheerka.subscribe(topic, example_of_function) sheerka.publish(context, topic) captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx example_of_static_method. event=xxx example_of_class_method. event=xxx """ service = sheerka.services[SheerkaEventManager.NAME] service.test_only_reset_topic(topic) def test_i_can_subscribe_and_publish_with_data(self, capsys): sheerka, context = self.init_concepts() topic = "my topic" sheerka.subscribe(topic, self.example_of_class_method_with_data) sheerka.subscribe(topic, self.example_of_static_method_with_data) sheerka.subscribe(topic, example_of_function_with_data) sheerka.publish(context, topic, "42") captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx, data='42' example_of_static_method. event=xxx, data='42' example_of_class_method. event=xxx, data='42' """ service = sheerka.services[SheerkaEventManager.NAME] service.test_only_reset_topic(topic) def test_i_can_save_and_reset_state(self): sheerka, context = self.init_concepts() service = sheerka.services[SheerkaEventManager.NAME] sheerka.subscribe("my first topic", self.example_of_class_method_with_data) sheerka.push_ontology(context, "new ontology") sheerka.subscribe("my second topic", self.example_of_class_method_with_data) # I can access to the topic assert "my first topic" in service.subscribers assert "my second topic" in service.subscribers sheerka.pop_ontology(context) assert "my first topic" in service.subscribers assert "my second topic" not in service.subscribers