Skip to content

Commit

Permalink
add RepeatedEvent, replacing Unqueued/MailboxRepeatedEvent
Browse files Browse the repository at this point in the history
  * supports both unqueued and eventual consistency cases
  * unlike MailboxRepeatedEvent, supports broadcast
  * supports one-shot usage: wait_event()
  * eventual consistency iterator supports "repeat_last"

This is a backwards incompatible change (#16).
  • Loading branch information
belm0 committed Apr 6, 2021
1 parent b8f6c9f commit 04c7e60
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 104 deletions.
8 changes: 3 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,10 @@ repeated events
:class:`trio.Event` does not offer a clear() method, so it can't be
triggered multiple times. It's for your own good.

The following are event classes which can be triggered repeatedly in a
relatively safe manner.
:class:`RepeatedEvent` can be triggered repeatedly in a relatively safe manner
while having multiple listeners.

.. autoclass:: UnqueuedRepeatedEvent
:members:
.. autoclass:: MailboxRepeatedEvent
.. autoclass:: RepeatedEvent
:members:

generators
Expand Down
2 changes: 1 addition & 1 deletion src/trio_util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ._awaitables import wait_all, wait_any
from ._exceptions import defer_to_cancelled, multi_error_defer_to
from ._periodic import periodic
from ._repeated_event import UnqueuedRepeatedEvent, MailboxRepeatedEvent
from ._repeated_event import RepeatedEvent
from ._task_stats import TaskStats
from ._trio_async_generator import trio_async_generator

Expand Down
136 changes: 64 additions & 72 deletions src/trio_util/_repeated_event.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,90 @@
import trio
from ._async_value import AsyncValue

from ._async_bool import AsyncBool

class RepeatedEvent:
"""A repeated event that supports multiple listeners.
class UnqueuedRepeatedEvent:
"""An unqueued repeated event that supports broadcast
The event may be triggered multiple times, and supports multiple listeners.
A listener will miss an event if it's blocked processing the previous one.
RepeatedEvent supports both "unqueued" and "eventual consistency" uses:
* unqueued - drop events while processing the previous one
* eventual consistency - some events may be missed while processing the
previous one, but receiving the latest event is ensured
"""

>>> event = UnqueuedRepeatedEvent()
def __init__(self):
self._event = AsyncValue(0)

A task listens for events:
def set(self):
"""Trigger an event"""
self._event.value += 1

>>> async for _ in event:
>>> # do blocking work
>>> await trio.sleep(1)
async def wait(self):
"""Wait for the next event"""
token = self._event.value
await self._event.wait_value(lambda val: val > token)

Another task triggers events:
async def unqueued_events(self):
"""Unqueued event iterator
>>> event.set() # trigger event
>>> trio.sleep(0) # listener will enter loop body
>>> event.set() # listener misses this event since it's still in the loop body
>>> trio.sleep(2)
>>> event.set() # listener will enter loop body again
"""
The listener will miss an event if it's blocked processing the previous
one. This is effectively the same as the following manual loop::
def __init__(self):
self._event = AsyncBool()
>>> while True:
>>> await event.wait()
>>> # do work...
def set(self):
"""Trigger event."""
self._event.value ^= True
Typical usage::
async def __aiter__(self):
async for _ in self._event.transitions():
yield
>>> event = RepeatedEvent()
A task listens for events:
class MailboxRepeatedEvent:
"""A single-listener repeated event with one queue slot
>>> async for _ in event.unqueued_events():
>>> # do blocking work
>>> await trio.sleep(1)
MailboxRepeatedEvent is used to coordinate some work whenever a collection
or other stateful object is mutated. Although you may miss intermediate
states, you're ensured to eventually receive an event to process the most
recent state.
Another task triggers events:
>>> my_list = []
>>> repeated_event = MailboxRepeatedEvent()
>>> event.set() # trigger event
>>> trio.sleep(0) # listener will enter loop body
>>> event.set() # listener misses this event since it's still in the loop body
>>> trio.sleep(2)
>>> event.set() # listener will enter loop body again
"""
async for _ in self._event.transitions():
yield

Whenever your collection is mutated, simply call the `set()` method.
async def events(self, *, repeat_last=False):
"""Event iterator with eventual consistency
>>> my_list.append('hello')
>>> repeated_event.set()
Use this iterator to coordinate some work whenever a collection
or other stateful object is mutated. Although you may miss intermediate
states, you're ensured to eventually receive an event to process the most
recent state. (https://en.wikipedia.org/wiki/Eventual_consistency)
The listener to continually process the latest state is simply:
:param repeat_last: if true, repeat the last position in the event
stream. If no event has been set yet it still yields immediately,
representing the "start" position.
>>> async for _ in repeated_event:
>>> await persist_to_storage(my_list)
Typical usage::
Even if you exit the listen loop and start a new one, you'll still receive
an event if a `set()` occurred in the meantime. Due to this statefulness,
only one listener is allowed-- a second listener will encounter a RuntimeError.
>>> my_list = []
>>> repeated_event = RepeatedEvent()
To avoid false positives from the "multiple listener" check, it's advised
to use `aclosing()` (from the async_generator package or Python 3.10) for
deterministic cleanup of the generator:
Whenever your collection is mutated, call the `set()` method.
>>> async with aclosing(repeated_event.__aiter__()) as events:
>>> async for _ in events:
>>> await persist_to_storage(my_list)
"""
>>> my_list.append('hello')
>>> repeated_event.set()
def __init__(self):
self._event = trio.Event()
self._iteration_open = False
The listener to continually process the latest state is:
def set(self):
"""Trigger event
>>> async for _ in repeated_event.events():
>>> await persist_to_storage(my_list)
Up to one event may be queued if there is no waiting listener (i.e.
no listener, or the listener is still processing the previous event).
If you'd like to persist the initial state of the list (before any
set() is called), use the `repeat_last=True` option.
"""
self._event.set()

async def __aiter__(self):
"""NOTE: be sure to use with `aclosing()`"""
if self._iteration_open:
raise RuntimeError(f'{self.__class__.__name__} can only have one listener')
self._iteration_open = True
try:
while True:
await self._event.wait()
self._event = trio.Event()
yield
finally:
self._iteration_open = False
token = self._event.value
if repeat_last:
token -= 1
async for _ in self._event.eventual_values(lambda val: val > token):
yield
97 changes: 71 additions & 26 deletions tests/test_repeated_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,42 @@
import trio
from trio.testing import wait_all_tasks_blocked

from trio_util import UnqueuedRepeatedEvent, MailboxRepeatedEvent
from trio_util import RepeatedEvent


async def test_unqueued_repeated_event(nursery, autojump_clock):
event = UnqueuedRepeatedEvent()
async def test_repeated_event_wait(nursery, autojump_clock):
done = trio.Event()
event = RepeatedEvent()

event.set()
with pytest.raises(trio.TooSlowError):
with trio.fail_after(1):
await event.wait()

@nursery.start_soon
async def _listener():
await event.wait()
done.set()

await wait_all_tasks_blocked()
event.set()
await done.wait()


async def test_repeated_event_unqueued(nursery, autojump_clock):
event = RepeatedEvent()
counts = [0, 0]

# a set() before the listener opens will not be queued
event.set()

async def listener(i):
async for _ in event:
async for _ in event.unqueued_events():
counts[i] += 1
await trio.sleep(i + 1)

for i in range(2):
nursery.start_soon(listener, i)
for i_ in range(2):
nursery.start_soon(listener, i_)
await wait_all_tasks_blocked()
assert counts == [0, 0]

Expand All @@ -39,31 +58,57 @@ async def listener(i):
assert counts == [2, 2]


async def test_mailbox_repeated_event(nursery, autojump_clock):
event = MailboxRepeatedEvent()
count = 0
async def test_repeated_event_eventually_consistent(nursery, autojump_clock):
event = RepeatedEvent()
counts = [0, 0]

# this set() will be queued since there is no listener
# a set() before the listener opens will not be queued
event.set()

@nursery.start_soon
async def _listener():
async for _ in event:
nonlocal count
count += 1
await trio.sleep(1)
async def listener(i):
async for _ in event.events():
counts[i] += 1
await trio.sleep(i + 1)

for i_ in range(2):
nursery.start_soon(listener, i_)
await wait_all_tasks_blocked()
assert count == 1
assert counts == [0, 0]

# this set() will be queued since listener is not waiting
event.set()
await wait_all_tasks_blocked()
assert count == 1
await trio.sleep(2)
assert count == 2

# 2nd listener not allowed
with pytest.raises(RuntimeError):
async for _ in event:
pass
assert counts == [1, 1]

# 2nd listener is blocked during this set()
await trio.sleep(1.5)
event.set()
await wait_all_tasks_blocked()
assert counts == [2, 1]
await trio.sleep(.6)
assert counts == [2, 2]

# both listeners blocked during this set()
event.set()
await wait_all_tasks_blocked()
assert counts == [2, 2]
await trio.sleep(3)
assert counts == [3, 3]


async def test_repeated_event_repeat_last(autojump_clock):
event = RepeatedEvent()

# no event was set, repeat_last=True will still iterate immediately
async for _ in event.events(repeat_last=True):
break

# set between listener sessions is missed
event.set()
with pytest.raises(trio.TooSlowError):
with trio.fail_after(1):
async for _ in event.events():
break

# repeat_last=True will still iterate immediately
async for _ in event.events(repeat_last=True):
break

0 comments on commit 04c7e60

Please sign in to comment.