-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathtest_ThreadManager.py
More file actions
75 lines (57 loc) · 1.98 KB
/
test_ThreadManager.py
File metadata and controls
75 lines (57 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import print_function
import importlib
import time
script = importlib.import_module("aci-preupgrade-validation-script")
global_timeout = False
def task1(data=""):
time.sleep(2)
if not global_timeout:
print("Thread task1: Finishing with data {}".format(data))
def task2(data=""):
time.sleep(2.5)
if not global_timeout:
print("Thread task2: Finishing with data {}".format(data))
def task3(data=""):
time.sleep(1)
if not global_timeout:
print("Thread task3: Finishing with data {}".format(data))
def task4(data=""):
time.sleep(5)
if not global_timeout:
print("Thread task4: Finishing with data {}".format(data))
def task5(data=""):
time.sleep(5)
if not global_timeout:
print("Thread task5: Finishing with data {}".format(data))
def test_ThreadManager(capsys):
global global_timeout
tm = script.ThreadManager(
funcs=[task1, task2, task3, task4, task5],
common_kwargs={"data": "common_data"},
monitor_timeout=1,
max_threads=2,
callback_on_timeout=lambda x: print("Timeout. Abort {}".format(x))
)
tm.start()
tm.join()
# Join each task thread to ensure any in-progress prints complete before
# capsys.readouterr() is called. Without this there is a race where a
# thread passes the `if not global_timeout` check and then tries to print
# after pytest has already torn down the captured stdout fd, causing
# OSError: [Errno 9] Bad file descriptor.
for thread in tm.threads:
try:
thread.join(timeout=1.5)
except RuntimeError:
pass # thread was never started
if tm.is_timeout():
global_timeout = True
expected_output = """\
Timeout. Abort task1
Timeout. Abort task2
Thread task1: Finishing with data common_data
Thread task2: Finishing with data common_data
Thread task3: Finishing with data common_data
"""
captured = capsys.readouterr()
assert captured.out == expected_output