-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpubsub_test.py
More file actions
73 lines (50 loc) · 2.49 KB
/
pubsub_test.py
File metadata and controls
73 lines (50 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import concurrent
import os
import hashlib
from concurrent.futures import ThreadPoolExecutor
import pytest
from wampproto import serializers
from xconn import Client
from xconn.types import Event
from xconn.client import connect_anonymous
def test_pubsub():
client1 = connect_anonymous("ws://localhost:8079/ws", "realm1")
client2 = connect_anonymous("ws://localhost:8079/ws", "realm1")
args = ["client1", "client2"]
def event_handler_with_args(event: Event):
assert event.args == args
assert event.kwargs is None
args_subscription = client1.subscribe("io.xconn.pubsub.args", event_handler_with_args)
client2.publish("io.xconn.pubsub.args", args, options={"acknowledge": True})
args_subscription.unsubscribe()
kwargs = {"foo": "bar", "baz": {"k": "v"}}
def event_handler_with_kwargs(event: Event):
assert event.args == []
assert event.kwargs == kwargs
registration = client1.subscribe("io.xconn.pubsub.kwargs", event_handler_with_kwargs)
client2.publish("io.xconn.pubsub.kwargs", kwargs=kwargs, options={"acknowledge": True})
registration.unsubscribe()
client2.leave()
client1.leave()
@pytest.mark.parametrize("serializer", [serializers.CBORSerializer(), serializers.MsgPackSerializer()])
def test_pubsub_with_various_data(serializer: serializers.Serializer):
client1 = Client(serializer=serializer).connect("ws://localhost:8079/ws", "realm1")
client2 = Client(serializer=serializer).connect("ws://localhost:8079/ws", "realm1")
def event_handler(inv: Event):
payload: bytes = inv.kwargs["payload"]
checksum: bytes = inv.kwargs["checksum"]
calculated_checksum = hashlib.sha256(payload).digest()
assert calculated_checksum == checksum, f"Checksum mismatch! got {calculated_checksum}, expected {checksum}"
client1.subscribe("io.xconn.pubsub.event_handler", event_handler)
def send_payload(size_bytes: int):
payload = os.urandom(size_bytes)
checksum = hashlib.sha256(payload).digest()
client2.publish("io.xconn.pubsub.event_handler", kwargs={"payload": payload, "checksum": checksum})
# test call with different payload sizes
sizes = [1024 * n for n in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1023]]
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(send_payload, size) for size in sizes]
for future in concurrent.futures.as_completed(futures):
future.result()
client1.leave()
client2.leave()