feat: implement WebSocket streaming protocol for large payloads #2256#58
feat: implement WebSocket streaming protocol for large payloads #2256#58
Conversation
|
@claude review this pr |
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
|
Can we make this change backwards compatible instead? |
iduartgomez
left a comment
There was a problem hiding this comment.
Review Summary
The code quality is high and the two-layer design (transparent reassembly + optional incremental streaming) is sound. With ?streaming=true gating in the companion PR (freenet-core#3346), old applications using the non-streaming protocol are protected from seeing unknown bincode variants. Most backward compatibility concerns are addressed by that opt-in mechanism.
However, there are a few issues that need attention before merging.
Must Fix (Blocking)
1. FlatBuffers HostResponseType union ordering breaks old browser clients
In schemas/flatbuffers/host_response.fbs, StreamChunk is inserted between GenerateRandData and Ok in the union:
union HostResponseType {
ContractResponse,
DelegateResponse,
GenerateRandData,
StreamChunk, // <-- inserted here, shifts Ok and Error
Ok,
Error,
}
This shifts Ok from index 4→5 and Error from 5→6. FlatBuffers forward-compatibility only works for appended variants. Old browser clients (using FBS encoding) will misinterpret every Ok and Error response from the server, regardless of whether streaming is enabled. This is a silent data corruption bug, not a deserialization error.
Fix: Move StreamChunk to the end of the union (after Error). The client_request.fbs union is fine — StreamChunk is already appended after Authenticate.
2. No TTL/timeout on ReassemblyBuffer entries
If a stream starts (first chunk received) but never completes (sender crashes, network drops remaining chunks), the partial StreamState entry stays in the HashMap forever. After MAX_CONCURRENT_STREAMS (8) abandoned streams, all new streams are permanently rejected.
Per the project's AGENTS.md: "Cleanup exemptions MUST be time-bounded. Any condition that exempts an entry from garbage collection MUST either expire via TTL or be overridden by an absolute age threshold."
Fix: Add a creation timestamp to StreamState and evict entries older than a reasonable timeout (e.g., 30-60 seconds) when new chunks arrive or when the concurrent stream limit is hit.
3. Deadlock in recv() with 5+ concurrent streamed responses
stream_tx has capacity 4. The recv() method calls handle.assemble().await inline, blocking until all chunks arrive. During assembly, recv() is not draining stream_rx. If the server sends 5+ StreamHeaders rapidly:
- Handler sends handles 1-4 to
stream_tx(fills capacity) - Client's
recv()picks up handle 1, startsassemble().await - Handler tries to send handle 5 to
stream_tx— blocks - Handler is now stuck and can't forward chunks from the WebSocket
- Client's
assemble()waits for chunks that will never arrive - Deadlock
Fix options:
- Increase
stream_txcapacity significantly, or - Make
recv()non-blocking: spawn assembly in a background task and return a future/handle, or - Buffer stream handles in
request_handlerinstead of blocking onstream_tx.send().await
Should Fix (Important)
4. assemble() has no upper bound check on received bytes
The truncation check (buf.len() < total_bytes) catches under-delivery, but there is no check for over-delivery. A malicious server claiming total_bytes=100 could stream unlimited data into the unbounded channel. The buf.extend_from_slice loop has no cap.
Fix: Add if buf.len() as u64 > self.total_bytes { return Err(...) } inside the receive loop.
5. Evicted stream_senders entries cause orphan reassembly entries
When stream_senders evicts an entry at capacity, subsequent chunks for that evicted stream fall through to the ReassemblyBuffer (since the sender is no longer in the map), creating an orphan partial stream that will never complete — permanently consuming a reassembly slot. This compounds issue #2.
6. MAX_CHUNKS_PER_BATCH is defined but never used
The constant is declared and documented in the PR description as enabling "cooperative yielding," but no yielding logic exists in the code. Either implement the yield logic or remove the dead constant.
Consider (Suggestions)
recv()/recv_stream()interaction: Both consume fromstream_rx. If a caller alternates between them, behavior is unpredictable. Worth documenting the mutual exclusivity.- Browser
stream_id = 0hardcoded: All browser chunked sends use the same stream ID. If concurrent large sends were possible, this would causeDuplicateChunkerrors. Mitigated by&mut self, but worth a comment. - Test gaps: Out-of-order chunk delivery, exact
CHUNK_SIZEboundaries,assemble()truncation error path, and mixed streamed + non-streamed response interleaving are all untested. - Test stability: Integration tests use 100ms timeouts and
AtomicU16port allocation — consider port 0 (OS-assigned) and longer timeouts for CI.
What's Done Well
- Thorough error handling in
ReassemblyBufferwith specific error variants for every failure case - Zero-copy chunking via
Bytes::slice() - Clean separation between transparent reassembly (Layer 1) and incremental streaming (Layer 2)
- Comprehensive unit test suite for the core chunking/reassembly logic
- Good integration tests covering both chunked-only and header+chunked paths
- FlatBuffers
StreamHeadercorrectly returns an error rather than silently failing
Problem
WebSocket frame size limits cause silent failures or disconnections when transmitting large contract states (up to 50 MiB) between the Freenet node and clients. The entire payload had to fit in a single WebSocket frame, requiring oversized
max_message_sizelimits and causing excessive memory pressure.Solution
Add an application-level chunking and streaming protocol to
freenet-stdlib, used by both the native (tokio-tungstenite) and browser (WASM) WebSocket clients. The protocol operates in two layers:Layer 1 — Transparent chunking (all message types)
Any serialized payload exceeding
CHUNK_THRESHOLD(512 KiB) is automatically split into 256 KiBStreamChunkframes. The receiver'sReassemblyBufferreassembles them transparently before deserialization. This applies to allHostResponseandClientRequestvariants — no type-specific logic needed.chunk_request(data, stream_id)→Vec<ClientRequest::StreamChunk>chunk_response(data, stream_id)→Vec<HostResponse::StreamChunk>ReassemblyBuffer::receive_chunk()→Option<Vec<u8>>when completeBytes::slice()— chunks share the original allocationLayer 2 — Incremental streaming with StreamHeader (opt-in, Native encoding only)
For responses where the client benefits from knowing the payload size and content type upfront (e.g., large
GetResponsewith WASM contracts), the server can send aStreamHeaderbefore the chunks:This enables two client consumption modes via
WsStreamHandle:handle.into_stream()— asyncStream<Item = Bytes>for chunk-by-chunk processing (progress bars, incremental writes)handle.assemble()— wait for all chunks and return the complete payloadStreamContentcurrently has two variants:GetResponse { key, includes_contract }— the primary use caseRaw— reserved for future extensionWhen
recv()encounters aStreamHeader, it automatically callsassemble()and returns the completeHostResponse— callers that don't need incremental consumption see no difference. Only callers that explicitly userecv_stream()get the incremental API.Client integration
Native client (
regular.rs):process_request()chunks outbound requests over the thresholdhandle_response_payload()routesStreamHeader→WsStreamHandlechannel,StreamChunkwith known sender → incremental delivery,StreamChunkwithout header → transparentReassemblyBufferreassemblyrecv()auto-assembles streamed responses;recv_stream()returns the handle for incremental consumptionBrowser client (
browser.rs):ReassemblyBufferintegrated into theonmessagecallbackStreamChunkresponses are reassembled before dispatching to the result handlerStreamHeadersupport (Flatbuffers encoding only; header is bincode-specific)Protocol constants
CHUNK_SIZECHUNK_THRESHOLDMAX_TOTAL_CHUNKSMAX_STATE_SIZE+ overhead)MAX_CONCURRENT_STREAMSReassemblyBufferMAX_CHUNKS_PER_BATCHWire format additions
New variants added to the Flatbuffers schemas and Rust enums:
ClientRequest::StreamChunk—{ stream_id, index, total, data }HostResponse::StreamChunk—{ stream_id, index, total, data }HostResponse::StreamHeader—{ stream_id, total_bytes, content }(bincode only; returns error over Flatbuffers)Breaking change justification (0.1.40 → 0.2.0)
This is a breaking change because:
ClientRequestandHostResponse(both#[non_exhaustive], but the wire protocol changes)WebApi::recv()behavior changes (now handlesStreamHeader+StreamChunkinternally viaselect!)WebApistruct has a newstream_rxfieldbytesdependency addedTesting
Unit tests (
streaming.rs)chunk_request_small/chunk_request_large_roundtrip/chunk_response_roundtrip— roundtrip chunking + reassemblychunk_empty— empty payload handlingchunk_exact_boundary— payloads at exact chunk sizereassembly_reset_across_messages— buffer cleanup between streamsreassembly_error_truncated_header/error_unknown_type— malformed input rejectionreassembly_error_zero_total/reassembly_error_too_large— bounds validationreassembly_error_mismatch/reassembly_error_duplicate— consistency checksconcurrent_streams— multiple interleaved streamsmax_concurrent_streams_limit— stream count cap enforcementIntegration tests (
regular.rs)test_recv_chunked— server sends chunks without header, clientrecv()reassembles transparentlytest_recv_stream_header— server sends header + chunks, client usesrecv_stream()+assemble()test_recv_transparent_stream_header— server sends header + chunks, client usesrecv()which auto-assemblesFiles changed
rust/Cargo.toml0.2.0, addbytesdependencyrust/src/client_api.rsstreamingmodule undernetfeaturerust/src/client_api/streaming.rsWsStreamHandle,WsStreamSenderrust/src/client_api/client_events.rsStreamChunk,StreamHeader,StreamContentto enums + FBS serializationrust/src/client_api/regular.rsrecv_stream()APIrust/src/client_api/browser.rsReassemblyBufferinto WASM client, chunk outbound requestsschemas/flatbuffers/client_request.fbsStreamChunktable and union variantschemas/flatbuffers/host_response.fbsStreamChunktable and union variantrust/src/generated/client_request_generated.rsrust/src/generated/host_response_generated.rsDownstream dependency
freenet-core#3346 depends on this PR. That PR integrates the server-side streaming into the node's WebSocket handler. This PR must be published to crates.io before the core PR can pass CI.
Fixes
Closes #2256