-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathconftest.py
More file actions
242 lines (191 loc) · 5.59 KB
/
conftest.py
File metadata and controls
242 lines (191 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import os
from contextlib import suppress
from typing import AsyncGenerator
from uuid import uuid4
import pytest
from aio_pika import Channel, connect_robust
from aio_pika.abc import AbstractChannel, AbstractRobustConnection
from taskiq_aio_pika.broker import AioPikaBroker
@pytest.fixture(scope="session")
def anyio_backend() -> str:
"""
Backend for anyio pytest plugin.
:return: backend name.
"""
return "asyncio"
@pytest.fixture
def amqp_url() -> str:
"""
Get custom amqp url.
This function tries to get custom amqp URL,
or returns default otherwise.
:return: rabbitmq url.
"""
return os.environ.get("TEST_AMQP_URL", "amqp://guest:guest@localhost:5672")
@pytest.fixture
def queue_name() -> str:
"""
Generated queue name.
:return: random queue name.
"""
return uuid4().hex
@pytest.fixture
def routing_key() -> str:
"""
Generated routing key.
:return: random routing key.
"""
return uuid4().hex
@pytest.fixture
def delay_queue_name() -> str:
"""
Generated name for delay queue.
:return: random exchange name.
"""
return uuid4().hex
@pytest.fixture
def dead_queue_name() -> str:
"""
Generated name for dead letter queue.
:return: random exchange name.
"""
return uuid4().hex
@pytest.fixture
def exchange_name() -> str:
"""
Generated exchange name.
:return: random exchange name.
"""
return uuid4().hex
@pytest.fixture
async def test_connection(
amqp_url: str,
) -> AsyncGenerator[AbstractRobustConnection, None]:
"""
Create robust connection.
:param amqp_url: url for rabbitmq.
:yield: opened connection.
"""
connection = await connect_robust(amqp_url)
yield connection
await connection.close()
@pytest.fixture
async def test_channel(
test_connection: AbstractRobustConnection,
) -> AsyncGenerator[AbstractChannel, None]:
"""
Opens a new channel.
:param test_connection: current rabbitmq conneciton.
:yield: opened channel.
"""
async with test_connection.channel() as chan:
yield chan
@pytest.fixture
async def broker(
amqp_url: str,
queue_name: str,
delay_queue_name: str,
dead_queue_name: str,
exchange_name: str,
test_channel: Channel,
) -> AsyncGenerator[AioPikaBroker, None]:
"""
Yields new broker instance.
This function is used to
create broker, run startup,
and shutdown after test.
:param amqp_url: current rabbitmq connection string.
:param test_channel: amqp channel for tests.
:param queue_name: test queue name.
:param delay_queue_name: test delay queue name.
:param dead_queue_name: test dead letter queue name.
:param exchange_name: test exchange name.
:yield: broker.
"""
broker = AioPikaBroker(
url=amqp_url,
declare_exchange=True,
exchange_name=exchange_name,
dead_letter_queue_name=dead_queue_name,
delay_queue_name=delay_queue_name,
queue_name=queue_name,
)
broker.is_worker_process = True
await broker.startup()
yield broker
await broker.shutdown()
exchange = await test_channel.get_exchange(exchange_name)
await exchange.delete(
timeout=1,
if_unused=False,
)
for i_queue_name in (queue_name, delay_queue_name, dead_queue_name):
queue = await test_channel.get_queue(i_queue_name, ensure=False)
await queue.delete(
timeout=1,
if_empty=False,
if_unused=False,
)
@pytest.fixture
async def broker_with_delayed_message_plugin(
amqp_url: str,
queue_name: str,
delay_queue_name: str,
dead_queue_name: str,
exchange_name: str,
routing_key: str,
test_channel: Channel,
) -> AsyncGenerator[AioPikaBroker, None]:
"""
Yields new broker instance.
This function is used to
create broker, run startup,
and shutdown after test.
:param amqp_url: current rabbitmq connection string.
:param test_channel: amqp channel for tests.
:param queue_name: test queue name.
:param delay_queue_name: test delay queue name.
:param dead_queue_name: test dead letter queue name.
:param exchange_name: test exchange name.
:param routing_key: routing_key.
:yield: broker.
"""
broker = AioPikaBroker(
url=amqp_url,
declare_exchange=True,
exchange_name=exchange_name,
dead_letter_queue_name=dead_queue_name,
queue_name=queue_name,
delayed_message_exchange_plugin=True,
routing_key=routing_key,
)
broker.is_worker_process = True
await broker.startup()
yield broker
await broker.shutdown()
exchange = await test_channel.get_exchange(exchange_name)
await exchange.delete(
timeout=1,
if_unused=False,
)
plugin_exchange = await test_channel.get_exchange(
broker._delay_plugin_exchange_name,
)
await plugin_exchange.delete(
timeout=1,
if_unused=False,
)
for i_queue_name in (queue_name, delay_queue_name, dead_queue_name):
queue = await test_channel.get_queue(i_queue_name, ensure=False)
await queue.delete(
timeout=1,
if_empty=False,
if_unused=False,
)
@pytest.fixture(autouse=True, scope="function")
async def cleanup_rabbitmq(test_channel: Channel) -> AsyncGenerator[None, None]:
yield
for queue_name in ["taskiq", "taskiq.dead_letter", "taskiq.delay"]:
with suppress(Exception):
queue = await test_channel.get_queue(queue_name, ensure=False)
await queue.delete(if_unused=False, if_empty=False)