import pytest from base import BaseTest from services.SheerkaDummyEventManager import SheerkaDummyEventManager def example_of_function(context): print(f"example_of_class_method. event={context.event.get_digest()}") def example_of_function_with_data(context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") class TestSheerkaEventManager(BaseTest): @pytest.fixture() def service(self, sheerka): service = sheerka.services[SheerkaDummyEventManager.NAME] yield service service.test_only_reset_service() def example_of_class_method(self, context): print(f"example_of_class_method. event={context.event.get_digest()}") @staticmethod def example_of_static_method(context): print(f"example_of_static_method. event={context.event.get_digest()}") def example_of_class_method_with_data(self, context, data): print(f"example_of_class_method. event={context.event.get_digest()}, {data=}") @staticmethod def example_of_static_method_with_data(context, data): print(f"example_of_static_method. event={context.event.get_digest()}, {data=}") def test_i_can_subscribe_and_publish(self, context, service, capsys): topic = "my topic" service.subscribe(topic, self.example_of_class_method) service.subscribe(topic, self.example_of_static_method) service.subscribe(topic, example_of_function) service.publish(context, topic) captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx example_of_static_method. event=xxx example_of_class_method. event=xxx """ def test_i_can_subscribe_and_publish_with_data(self, context, service, capsys): topic = "my topic" service.subscribe(topic, self.example_of_class_method_with_data) service.subscribe(topic, self.example_of_static_method_with_data) service.subscribe(topic, example_of_function_with_data) service.publish(context, topic, "42") captured = capsys.readouterr() assert captured.out == """example_of_class_method. event=xxx, data='42' example_of_static_method. event=xxx, data='42' example_of_class_method. event=xxx, data='42' """