Added Context Manager ObservableResultCollector
This commit is contained in:
@@ -544,4 +544,5 @@ Special thanks to the Python and open-source community for their tools, inspirat
|
||||
* 0.2.0 : Observable results can be collected using `collect_return_values`
|
||||
* 0.3.0 : Added `unbind`, `unbind_all`, `has_listeners` `get_listener_count` to Observable
|
||||
* 0.4.0 : Added `add_event_listener` and `remove_event_listener`
|
||||
* 0.5.0 : Added `ProxyObject`
|
||||
* 0.5.0 : Added `ProxyObject`
|
||||
* 0.5.1 : Added `ObservableResultCollector`
|
||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "myutils"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
description = "Base useful classes."
|
||||
readme = "README.md"
|
||||
authors = [
|
||||
|
||||
@@ -14,6 +14,9 @@ class ProxyObject:
|
||||
def __hasattr__(self, item):
|
||||
return item in self._props
|
||||
|
||||
def as_dict(self):
|
||||
return self._props.copy()
|
||||
|
||||
def __repr__(self):
|
||||
if "key" in self._props:
|
||||
return f"ProxyObject(key={self._props["key"]})"
|
||||
@@ -37,10 +40,10 @@ class ProxyObject:
|
||||
for prop_name, path in self._mappings.items():
|
||||
attrs = path.split(".")
|
||||
current = self._obj
|
||||
|
||||
|
||||
# Check if path ends with wildcard
|
||||
has_wildcard_in_path = attrs[-1] == "*"
|
||||
|
||||
|
||||
# Navigate to the target object
|
||||
for attr in attrs:
|
||||
if attr == "*":
|
||||
@@ -49,7 +52,7 @@ class ProxyObject:
|
||||
current = getattr(current, attr)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
# Handle wildcard cases
|
||||
if prop_name == "*" or has_wildcard_in_path:
|
||||
# Copy all properties from current object
|
||||
|
||||
@@ -10,6 +10,112 @@ class ObservableEvent(Enum):
|
||||
AFTER_PROPERTY_CHANGE = "__*after_property_change*__"
|
||||
|
||||
|
||||
class ObservableResultCollector:
|
||||
"""
|
||||
Context manager to automatically collect results from observable events.
|
||||
|
||||
This context manager simplifies event listener management by automatically adding
|
||||
listeners at the start of the `with` block and removing them at the end, while
|
||||
collecting the results produced by observable callbacks.
|
||||
|
||||
Args:
|
||||
observables: List of observable objects (or a single object)
|
||||
attr_name: Name of the attribute to observe ("" for all attributes)
|
||||
event: Event type to listen to (default AFTER_PROPERTY_CHANGE)
|
||||
|
||||
Attributes:
|
||||
results: List of results collected during the with block execution
|
||||
|
||||
Raises:
|
||||
ValueError: If event is not AFTER_PROPERTY_CHANGE (only supported event)
|
||||
|
||||
Example:
|
||||
>>> from myutils.observable import (
|
||||
... make_observable, bind, ObservableResultCollector, ObservableEvent
|
||||
... )
|
||||
>>> from dataclasses import dataclass
|
||||
>>>
|
||||
>>> @dataclass
|
||||
>>> class Data:
|
||||
... value: str = "hello"
|
||||
>>>
|
||||
>>> data = make_observable(Data())
|
||||
>>>
|
||||
>>> # Add a callback that returns a value
|
||||
>>> def my_callback(old, new):
|
||||
... return f"Changed from {old} to {new}"
|
||||
>>> bind(data, "value", my_callback)
|
||||
>>>
|
||||
>>> # Use the collector
|
||||
>>> with ObservableResultCollector([data]) as collector:
|
||||
... data.value = "world"
|
||||
>>>
|
||||
>>> print(collector.results) # ['Changed from hello to world']
|
||||
|
||||
Notes:
|
||||
- The context manager ensures listeners are always removed, even on exception
|
||||
- If `observables` is not a list, it will be converted to a list
|
||||
- Results are collected in the order events occur
|
||||
- Currently only supports AFTER_PROPERTY_CHANGE event
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
observables: list,
|
||||
attr_name: str = "",
|
||||
event: ObservableEvent = ObservableEvent.AFTER_PROPERTY_CHANGE
|
||||
):
|
||||
if event != ObservableEvent.AFTER_PROPERTY_CHANGE:
|
||||
raise ValueError("Only AFTER_PROPERTY_CHANGE event is currently supported")
|
||||
|
||||
# Convert to list if single object is passed
|
||||
if not isinstance(observables, list):
|
||||
observables = [observables]
|
||||
|
||||
self.observables = observables
|
||||
self.attr_name = attr_name
|
||||
self.event = event
|
||||
self.results = []
|
||||
self._callback = None
|
||||
|
||||
def __enter__(self):
|
||||
"""Add listeners and return self to access results."""
|
||||
# Create callback that collects results
|
||||
def collector_callback(attr, old, new, results):
|
||||
self.results.extend(results)
|
||||
|
||||
self._callback = collector_callback
|
||||
|
||||
# Add listener to all observables
|
||||
for observable in self.observables:
|
||||
add_event_listener(
|
||||
self.event,
|
||||
observable,
|
||||
self.attr_name,
|
||||
self._callback
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Automatically remove all listeners."""
|
||||
if self._callback is not None:
|
||||
for observable in self.observables:
|
||||
try:
|
||||
remove_event_listener(
|
||||
self.event,
|
||||
observable,
|
||||
self.attr_name,
|
||||
self._callback
|
||||
)
|
||||
except (NotObservableError, ValueError, KeyError):
|
||||
# If object is no longer observable or listener doesn't exist,
|
||||
# continue silently
|
||||
pass
|
||||
|
||||
# Don't suppress exception if one exists
|
||||
return False
|
||||
|
||||
def make_observable(obj):
|
||||
if hasattr(obj, '_listeners'):
|
||||
return obj # Already observable
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import pytest
|
||||
|
||||
from myutils.observable import NotObservableError, make_observable, bind, collect_return_values, unbind, unbind_all, \
|
||||
has_listeners, get_listener_count, add_event_listener, ObservableEvent, remove_event_listener
|
||||
has_listeners, get_listener_count, add_event_listener, ObservableEvent, remove_event_listener, \
|
||||
ObservableResultCollector
|
||||
|
||||
|
||||
# Test fixtures
|
||||
@@ -1045,3 +1046,244 @@ def test_i_can_receive_changes_in_after_property_change_event_for_all_attributes
|
||||
data.value = "new value"
|
||||
|
||||
assert on_after_change_results == [("number", 1, 5, [5, "5_1"]), ("value", "initial", "new value", ["new value"])]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ObservableResultCollector tests
|
||||
# ============================================================================
|
||||
|
||||
def test_i_can_create_collector_with_list_of_observables():
|
||||
"""
|
||||
ObservableResultCollector should accept a list of observable objects.
|
||||
"""
|
||||
data1 = make_observable(Data())
|
||||
data2 = make_observable(Data())
|
||||
|
||||
collector = ObservableResultCollector([data1, data2])
|
||||
|
||||
assert collector.observables == [data1, data2]
|
||||
assert collector.attr_name == ""
|
||||
assert collector.event == ObservableEvent.AFTER_PROPERTY_CHANGE
|
||||
assert collector.results == []
|
||||
|
||||
|
||||
def test_i_can_create_collector_with_single_observable():
|
||||
"""
|
||||
ObservableResultCollector should automatically convert a single object to a list.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
collector = ObservableResultCollector(data)
|
||||
|
||||
assert collector.observables == [data]
|
||||
assert isinstance(collector.observables, list)
|
||||
|
||||
|
||||
def test_i_can_collect_results_from_single_observable():
|
||||
"""
|
||||
Collector should collect results from a callback bound to an observable.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback(old, new):
|
||||
return f"Changed from {old} to {new}"
|
||||
|
||||
bind(data, "value", callback)
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.value = "world"
|
||||
|
||||
assert collector.results == ["Changed from initial to world"]
|
||||
|
||||
|
||||
def test_i_can_collect_results_from_multiple_callbacks():
|
||||
"""
|
||||
Collector should collect results from multiple callbacks on the same attribute.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback1(old, new):
|
||||
return f"callback1: {new}"
|
||||
|
||||
def callback2(old, new):
|
||||
return f"callback2: {new}"
|
||||
|
||||
bind(data, "value", callback1)
|
||||
bind(data, "value", callback2)
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.value = "test"
|
||||
|
||||
assert collector.results == ["callback1: test", "callback2: test"]
|
||||
|
||||
|
||||
def test_i_can_collect_results_from_multiple_observables():
|
||||
"""
|
||||
Collector should collect results from multiple observable objects.
|
||||
"""
|
||||
data1 = make_observable(Data())
|
||||
data2 = make_observable(Data())
|
||||
|
||||
def callback(old, new):
|
||||
return f"Changed to {new}"
|
||||
|
||||
bind(data1, "value", callback)
|
||||
bind(data2, "value", callback)
|
||||
|
||||
with ObservableResultCollector([data1, data2]) as collector:
|
||||
data1.value = "first"
|
||||
data2.value = "second"
|
||||
|
||||
assert collector.results == ["Changed to first", "Changed to second"]
|
||||
|
||||
|
||||
def test_i_can_collect_results_for_specific_attribute():
|
||||
"""
|
||||
Collector should only collect results for a specific attribute when specified.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def value_callback(old, new):
|
||||
return f"value: {new}"
|
||||
|
||||
def number_callback(old, new):
|
||||
return f"number: {new}"
|
||||
|
||||
bind(data, "value", value_callback)
|
||||
bind(data, "number", number_callback)
|
||||
|
||||
with ObservableResultCollector(data, attr_name="value") as collector:
|
||||
data.value = "test"
|
||||
data.number = 42
|
||||
|
||||
# Should only collect results from value changes
|
||||
assert collector.results == ["value: test"]
|
||||
|
||||
|
||||
def test_i_can_collect_results_for_all_attributes():
|
||||
"""
|
||||
Collector with attr_name="" should collect results from all attributes.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def value_callback(old, new):
|
||||
return f"value: {new}"
|
||||
|
||||
def number_callback(old, new):
|
||||
return f"number: {new}"
|
||||
|
||||
bind(data, "value", value_callback)
|
||||
bind(data, "number", number_callback)
|
||||
|
||||
with ObservableResultCollector(data, attr_name="") as collector:
|
||||
data.value = "test"
|
||||
data.number = 42
|
||||
|
||||
# Should collect results from all attributes
|
||||
assert collector.results == ["value: test", "number: 42"]
|
||||
|
||||
|
||||
def test_i_can_verify_listeners_are_removed_after_exit():
|
||||
"""
|
||||
Listeners should be automatically removed when exiting the context manager.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback(old, new):
|
||||
return f"Changed to {new}"
|
||||
|
||||
bind(data, "value", callback)
|
||||
|
||||
event_name = ObservableEvent.AFTER_PROPERTY_CHANGE.value
|
||||
|
||||
# Before entering context, no event listeners
|
||||
assert event_name not in data._listeners
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
# Inside context, event listener should be added
|
||||
assert event_name in data._listeners
|
||||
assert "" in data._listeners[event_name]
|
||||
|
||||
data.value = "test"
|
||||
|
||||
# After exiting context, event listener should be removed
|
||||
assert event_name not in data._listeners
|
||||
assert collector.results == ["Changed to test"]
|
||||
|
||||
|
||||
def test_i_can_verify_listeners_are_removed_on_exception():
|
||||
"""
|
||||
Listeners should be removed even when an exception occurs in the with block.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback(old, new):
|
||||
return f"Changed to {new}"
|
||||
|
||||
bind(data, "value", callback)
|
||||
|
||||
event_name = ObservableEvent.AFTER_PROPERTY_CHANGE.value
|
||||
|
||||
try:
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.value = "test"
|
||||
raise ValueError("Test exception")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Listener should be removed despite exception
|
||||
assert event_name not in data._listeners
|
||||
|
||||
|
||||
def test_i_can_access_results_through_collector_object():
|
||||
"""
|
||||
The collector object returned by __enter__ should provide access to results.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback(old, new):
|
||||
return new * 2
|
||||
|
||||
bind(data, "number", callback)
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.number = 5
|
||||
# Results should be accessible during the with block
|
||||
assert collector.results == [10]
|
||||
|
||||
data.number = 10
|
||||
assert collector.results == [10, 20]
|
||||
|
||||
|
||||
def test_i_can_collect_empty_results_when_no_callbacks():
|
||||
"""
|
||||
Collector should have empty results if no callbacks are bound.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.value = "test"
|
||||
|
||||
# No callbacks were bound, so results should be empty
|
||||
assert collector.results == []
|
||||
|
||||
|
||||
def test_i_can_collect_none_values():
|
||||
"""
|
||||
Collector should collect None values returned by callbacks.
|
||||
"""
|
||||
data = make_observable(Data())
|
||||
|
||||
def callback1(old, new):
|
||||
return None
|
||||
|
||||
def callback2(old, new):
|
||||
return "not none"
|
||||
|
||||
bind(data, "value", callback1)
|
||||
bind(data, "value", callback2)
|
||||
|
||||
with ObservableResultCollector(data) as collector:
|
||||
data.value = "test"
|
||||
|
||||
assert collector.results == [None, "not none"]
|
||||
|
||||
Reference in New Issue
Block a user