-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathwebsocket.uts
More file actions
96 lines (68 loc) · 3.61 KB
/
websocket.uts
File metadata and controls
96 lines (68 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# WebSocket layer unit tests
# Copyright (C) 2024 Lucas Drufva <lucas.drufva@gmail.com>
#
# Type the following command to launch start the tests:
# $ test/run_tests -P "load_contrib('websocket')" -t test/contrib/websocket.uts
+ Syntax check
= Import the WebSocket layer
from scapy.contrib.websocket import *
+ WebSocket protocol test
= Packet instantiation
pkt = WebSocket(wsPayload=b"Hello, world!", opcode="text", mask=True, maskingKey=0x11223344)
assert pkt.wsPayload == b"Hello, world!"
assert pkt.mask == True
assert pkt.maskingKey == 0x11223344
assert bytes(pkt) == b'\x01\x8d\x11\x22\x33\x44\x59\x47\x5f\x28\x7e\x0e\x13\x33\x7e\x50\x5f\x20\x30'
= Packet dissection
raw = b'\x01\x0dHello, world!'
pkt = WebSocket(raw)
assert pkt.fin == 0
assert pkt.rsv == 0
assert pkt.opcode == 0x1
assert pkt.mask == False
assert pkt.payloadLen == 13
assert pkt.wsPayload == b'Hello, world!'
= Dissect masked packet
raw = b'\x01\x8d\x11\x22\x33\x44\x59\x47\x5f\x28\x7e\x0e\x13\x33\x7e\x50\x5f\x20\x30'
pkt = WebSocket(raw)
assert pkt.fin == 0
assert pkt.rsv == 0
assert pkt.opcode == 0x1
assert pkt.mask == True
assert pkt.payloadLen == 13
assert pkt.wsPayload == b'Hello, world!'
= Session with compression
bind_layers(TCP, WebSocket, dport=5000)
bind_layers(TCP, WebSocket, sport=5000)
from scapy.sessions import TCPSession
filename = scapy_path("/test/pcaps/websocket_compressed_session.pcap")
pkts = sniff(offline=filename, session=TCPSession)
assert len(pkts) == 13
assert pkts[7][WebSocket].wsPayload == b'Hello'
assert pkts[8][WebSocket].wsPayload == b'"Hello"'
assert pkts[10][WebSocket].wsPayload == b'Hello2'
assert pkts[11][WebSocket].wsPayload == b'"Hello2"'
= Create packet with long payload
pkt = WebSocket(wsPayload=b"a"*126, opcode="text")
assert bytes(pkt) == b'\x01\x7e\x00\x7e\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
= Dissect packet with long payload
raw = b'\x01\x7e\x00\x7e\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
pkt = WebSocket(raw)
assert pkt.payloadLen == 126
assert pkt.wsPayload == b'a'*126
= Create packet with very long payload
pkt = WebSocket(wsPayload=b"a"*65536, opcode="text")
assert bytes(pkt) == b'\x01\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + b'a'*65536
= Dissect packet with very long payload
raw = b'\x01\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + b'a'*65536
pkt = WebSocket(raw)
assert pkt.payloadLen == 65536
assert pkt.wsPayload == b'a'*65536
= Session with invalid parts in upgrade sequence
bind_layers(TCP, WebSocket, dport=5000)
bind_layers(TCP, WebSocket, sport=5000)
from scapy.sessions import TCPSession
filename = scapy_path("/test/pcaps/websocket_invalid_handshakes.pcap")
pkts = sniff(offline=filename, session=TCPSession)
assert len(pkts) == 19
assert pkts[15][WebSocket].wsPayload == b'Hello, world!'