-
Notifications
You must be signed in to change notification settings - Fork 12
/
test_timer.py
145 lines (126 loc) · 4.02 KB
/
test_timer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import pytest
import aiotools
@pytest.mark.asyncio
async def test_timer():
"""
Test the timer functionality.
"""
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
count = 0
async def counter(interval):
assert interval == 0.1
nonlocal count
await asyncio.sleep(0)
count += 1
count = 0
timer = aiotools.create_timer(counter, 0.1)
await asyncio.sleep(0.22)
timer.cancel()
await timer
assert count == 3
count = 0
timer = aiotools.create_timer(counter, 0.1, aiotools.TimerDelayPolicy.CANCEL)
await asyncio.sleep(0.22)
timer.cancel()
await timer
# should have same results
assert count == 3
@pytest.mark.asyncio
async def test_timer_leak_default():
"""
Test if the timer-fired tasks are claned up properly
even when each timer-fired task takes longer than the timer interval.
(In this case they will accumulate indefinitely!)
"""
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
spawn_count = 0
cancel_count = 0
done_count = 0
async def delayed(interval):
nonlocal spawn_count, cancel_count, done_count
spawn_count += 1
try:
await asyncio.sleep(5)
done_count += 1
except asyncio.CancelledError:
cancel_count += 1
task_count = len(aiotools.compat.all_tasks())
timer = aiotools.create_timer(delayed, 1)
await asyncio.sleep(9.9)
timer.cancel()
await timer
assert task_count + 1 >= len(aiotools.compat.all_tasks())
assert spawn_count == done_count + cancel_count
assert spawn_count == 10
assert cancel_count == 5
@pytest.mark.asyncio
async def test_timer_leak_cancel():
"""
Test the effect of TimerDelayPolicy.CANCEL which always
cancels any pending previous tasks on each interval.
"""
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
spawn_count = 0
cancel_count = 0
done_count = 0
async def delayed(interval):
nonlocal spawn_count, cancel_count, done_count
spawn_count += 1
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
cancel_count += 1
else:
done_count += 1
task_count = len(aiotools.compat.all_tasks())
timer = aiotools.create_timer(
delayed,
0.01,
aiotools.TimerDelayPolicy.CANCEL,
)
await asyncio.sleep(0.1)
timer.cancel()
await timer
await asyncio.sleep(0)
assert task_count + 1 >= len(aiotools.compat.all_tasks())
assert spawn_count == cancel_count + done_count
assert cancel_count == 10
assert done_count == 0
@pytest.mark.asyncio
async def test_timer_leak_nocancel():
"""
Test the effect of TimerDelayPolicy.CANCEL which always
cancels any pending previous tasks on each interval.
"""
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
spawn_count = 0
cancel_count = 0
done_count = 0
async def delayed(interval):
nonlocal spawn_count, cancel_count, done_count
spawn_count += 1
try:
await asyncio.sleep(0)
except asyncio.CancelledError:
cancel_count += 1
else:
done_count += 1
task_count = len(aiotools.compat.all_tasks())
timer = aiotools.create_timer(
delayed,
0.01,
aiotools.TimerDelayPolicy.CANCEL,
)
await asyncio.sleep(0.096)
timer.cancel()
await timer
await asyncio.sleep(0)
assert task_count + 1 >= len(aiotools.compat.all_tasks())
assert spawn_count == cancel_count + done_count
assert cancel_count == 0
assert done_count == 10