67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
import pytest
|
|
|
|
from base import BaseTest
|
|
from services.SheerkaDummyEventManager import SheerkaDummyEventManager
|
|
|
|
|
|
def example_of_function(context):
|
|
print(f"example_of_class_method. event={context.event.get_digest()}")
|
|
|
|
|
|
def example_of_function_with_data(context, data):
|
|
print(f"example_of_class_method. event={context.event.get_digest()}, {data=}")
|
|
|
|
|
|
class TestSheerkaEventManager(BaseTest):
|
|
|
|
@pytest.fixture()
|
|
def service(self, sheerka):
|
|
service = sheerka.services[SheerkaDummyEventManager.NAME]
|
|
yield service
|
|
|
|
service.test_only_reset_service()
|
|
|
|
def example_of_class_method(self, context):
|
|
print(f"example_of_class_method. event={context.event.get_digest()}")
|
|
|
|
@staticmethod
|
|
def example_of_static_method(context):
|
|
print(f"example_of_static_method. event={context.event.get_digest()}")
|
|
|
|
def example_of_class_method_with_data(self, context, data):
|
|
print(f"example_of_class_method. event={context.event.get_digest()}, {data=}")
|
|
|
|
@staticmethod
|
|
def example_of_static_method_with_data(context, data):
|
|
print(f"example_of_static_method. event={context.event.get_digest()}, {data=}")
|
|
|
|
def test_i_can_subscribe_and_publish(self, context, service, capsys):
|
|
topic = "my topic"
|
|
|
|
service.subscribe(topic, self.example_of_class_method)
|
|
service.subscribe(topic, self.example_of_static_method)
|
|
service.subscribe(topic, example_of_function)
|
|
|
|
service.publish(context, topic)
|
|
|
|
captured = capsys.readouterr()
|
|
assert captured.out == """example_of_class_method. event=xxx
|
|
example_of_static_method. event=xxx
|
|
example_of_class_method. event=xxx
|
|
"""
|
|
|
|
def test_i_can_subscribe_and_publish_with_data(self, context, service, capsys):
|
|
topic = "my topic"
|
|
|
|
service.subscribe(topic, self.example_of_class_method_with_data)
|
|
service.subscribe(topic, self.example_of_static_method_with_data)
|
|
service.subscribe(topic, example_of_function_with_data)
|
|
|
|
service.publish(context, topic, "42")
|
|
|
|
captured = capsys.readouterr()
|
|
assert captured.out == """example_of_class_method. event=xxx, data='42'
|
|
example_of_static_method. event=xxx, data='42'
|
|
example_of_class_method. event=xxx, data='42'
|
|
"""
|