Files
Sheerka-Old/tests/core/test_SheerkaEventManager.py

86 lines
3.0 KiB
Python

import pytest
from core.sheerka.services.SheerkaEventManager import SheerkaEventManager
from tests.TestUsingMemoryBasedSheerka import TestUsingMemoryBasedSheerka
def example_of_function(context):
print(f"example_of_class_method. event={context.event.get_digest()}")
def example_of_function_with_data(context, data):
print(f"example_of_class_method. event={context.event.get_digest()}, {data=}")
class TestSheerkaEventManager(TestUsingMemoryBasedSheerka):
def example_of_class_method(self, context):
print(f"example_of_class_method. event={context.event.get_digest()}")
@staticmethod
def example_of_static_method(context):
print(f"example_of_static_method. event={context.event.get_digest()}")
def example_of_class_method_with_data(self, context, data):
print(f"example_of_class_method. event={context.event.get_digest()}, {data=}")
@staticmethod
def example_of_static_method_with_data(context, data):
print(f"example_of_static_method. event={context.event.get_digest()}, {data=}")
def test_i_can_subscribe_and_publish(self, capsys):
sheerka, context = self.init_concepts()
topic = "my topic"
sheerka.subscribe(topic, self.example_of_class_method)
sheerka.subscribe(topic, self.example_of_static_method)
sheerka.subscribe(topic, example_of_function)
sheerka.publish(context, topic)
captured = capsys.readouterr()
assert captured.out == """example_of_class_method. event=xxx
example_of_static_method. event=xxx
example_of_class_method. event=xxx
"""
service = sheerka.services[SheerkaEventManager.NAME]
service.test_only_reset_topic(topic)
def test_i_can_subscribe_and_publish_with_data(self, capsys):
sheerka, context = self.init_concepts()
topic = "my topic"
sheerka.subscribe(topic, self.example_of_class_method_with_data)
sheerka.subscribe(topic, self.example_of_static_method_with_data)
sheerka.subscribe(topic, example_of_function_with_data)
sheerka.publish(context, topic, "42")
captured = capsys.readouterr()
assert captured.out == """example_of_class_method. event=xxx, data='42'
example_of_static_method. event=xxx, data='42'
example_of_class_method. event=xxx, data='42'
"""
service = sheerka.services[SheerkaEventManager.NAME]
service.test_only_reset_topic(topic)
def test_i_can_save_and_reset_state(self):
sheerka, context = self.init_concepts()
service = sheerka.services[SheerkaEventManager.NAME]
sheerka.subscribe("my first topic", self.example_of_class_method_with_data)
sheerka.push_ontology(context, "new ontology")
sheerka.subscribe("my second topic", self.example_of_class_method_with_data)
# I can access to the topic
assert "my first topic" in service.subscribers
assert "my second topic" in service.subscribers
sheerka.pop_ontology(context)
assert "my first topic" in service.subscribers
assert "my second topic" not in service.subscribers