This repository was archived by the owner on Aug 19, 2025. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 124
Expand file tree
/
Copy pathtest_broadcast.py
More file actions
140 lines (108 loc) · 4.83 KB
/
test_broadcast.py
File metadata and controls
140 lines (108 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations
import asyncio
import typing
import pytest
from broadcaster import Broadcast, BroadcastBackend, Event
from broadcaster._backends.kafka import KafkaBackend
class CustomBackend(BroadcastBackend):
def __init__(self, url: str):
self._subscribed: set[str] = set()
async def connect(self) -> None:
self._published: asyncio.Queue[Event] = asyncio.Queue()
async def disconnect(self) -> None:
pass
async def subscribe(self, channel: str) -> None:
self._subscribed.add(channel)
async def unsubscribe(self, channel: str) -> None:
self._subscribed.remove(channel)
async def publish(self, channel: str, message: typing.Any) -> None:
event = Event(channel=channel, message=message)
await self._published.put(event)
async def next_published(self) -> Event:
while True:
event = await self._published.get()
if event.channel in self._subscribed:
return event
@pytest.mark.asyncio
async def test_memory():
async with Broadcast("memory://") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_redis():
async with Broadcast("redis://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_redis_stream():
async with Broadcast("redis-stream://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
async with broadcast.subscribe("chatroom1") as subscriber:
await broadcast.publish("chatroom1", "hello")
event = await subscriber.get()
assert event.channel == "chatroom1"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_redis_resubscription():
async with Broadcast("redis://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
await asyncio.sleep(0)
async with broadcast.subscribe("chatroom2") as subscriber:
await broadcast.publish("chatroom2", "hello")
event = await subscriber.get()
assert event.channel == "chatroom2"
@pytest.mark.asyncio
async def test_postgres():
async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_kafka():
async with Broadcast("kafka://localhost:9092") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_kafka_multiple_urls():
async with Broadcast(backend=KafkaBackend(urls=["kafka://localhost:9092", "kafka://localhost:9092"])) as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_custom():
backend = CustomBackend("")
async with Broadcast(backend=backend) as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_unknown_backend():
with pytest.raises(ValueError, match="Unsupported backend"):
async with Broadcast(url="unknown://"):
pass
@pytest.mark.asyncio
async def test_needs_url_or_backend():
with pytest.raises(AssertionError, match="Either `url` or `backend` must be provided."):
Broadcast()