from core.sheerka.services.SheerkaEventManager import SheerkaEventManager from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka def example_of_function(context): print(f"example_of_class_method. event={context.event.get_digest()}") def example_of_function_with_data(context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") class TestSheerkaEventManager(TestUsingMemoryBasedSheerka): def example_of_class_method(self, context): print(f"example_of_class_method. event={context.event.get_digest()}") @staticmethod def example_of_static_method(context): print(f"example_of_static_method. event={context.event.get_digest()}") def example_of_class_method_with_data(self, context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") @staticmethod def example_of_static_method_with_data(context, data): print(f"example_of_static_method. event={context.event.get_digest()}, {data=}") def test_i_can_subscribe_and_publish(self, capsys): sheerka, context = self.init_concepts() topic = "my topic" sheerka.subscribe(topic, self.example_of_class_method) sheerka.subscribe(topic, self.example_of_static_method) sheerka.subscribe(topic, example_of_function) sheerka.publish(context, topic) captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx example_of_static_method. event=xxx example_of_class_method. event=xxx """ service = sheerka.services[SheerkaEventManager.NAME] service.reset_topic(topic) def test_i_can_subscribe_and_publish_with_data(self, capsys): sheerka, context = self.init_concepts() topic = "my topic" sheerka.subscribe(topic, self.example_of_class_method_with_data) sheerka.subscribe(topic, self.example_of_static_method_with_data) sheerka.subscribe(topic, example_of_function_with_data) sheerka.publish(context, topic, "42") captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx, data='42' example_of_static_method. event=xxx, data='42' example_of_class_method. event=xxx, data='42' """ service = sheerka.services[SheerkaEventManager.NAME] service.reset_topic(topic)