Skip to content

feat: implement WebSocket streaming protocol for large payloads #2256#58

Open
netsirius wants to merge 5 commits intomainfrom
feat/ws-message-streaming
Open

feat: implement WebSocket streaming protocol for large payloads #2256#58
netsirius wants to merge 5 commits intomainfrom
feat/ws-message-streaming

Conversation

@netsirius
Copy link
Contributor

@netsirius netsirius commented Feb 28, 2026

Problem

WebSocket frame size limits cause silent failures or disconnections when transmitting large contract states (up to 50 MiB) between the Freenet node and clients. The entire payload had to fit in a single WebSocket frame, requiring oversized max_message_size limits and causing excessive memory pressure.

Solution

Add an application-level chunking and streaming protocol to freenet-stdlib, used by both the native (tokio-tungstenite) and browser (WASM) WebSocket clients. The protocol operates in two layers:

Layer 1 — Transparent chunking (all message types)

Any serialized payload exceeding CHUNK_THRESHOLD (512 KiB) is automatically split into 256 KiB StreamChunk frames. The receiver's ReassemblyBuffer reassembles them transparently before deserialization. This applies to all HostResponse and ClientRequest variants — no type-specific logic needed.

  • chunk_request(data, stream_id)Vec<ClientRequest::StreamChunk>
  • chunk_response(data, stream_id)Vec<HostResponse::StreamChunk>
  • ReassemblyBuffer::receive_chunk()Option<Vec<u8>> when complete
  • Zero-copy fragmentation via Bytes::slice() — chunks share the original allocation

Layer 2 — Incremental streaming with StreamHeader (opt-in, Native encoding only)

For responses where the client benefits from knowing the payload size and content type upfront (e.g., large GetResponse with WASM contracts), the server can send a StreamHeader before the chunks:

HostResponse::StreamHeader {
    stream_id: u32,
    total_bytes: u64,
    content: StreamContent,
}

This enables two client consumption modes via WsStreamHandle:

  • handle.into_stream() — async Stream<Item = Bytes> for chunk-by-chunk processing (progress bars, incremental writes)
  • handle.assemble() — wait for all chunks and return the complete payload

StreamContent currently has two variants:

  • GetResponse { key, includes_contract } — the primary use case
  • Raw — reserved for future extension

When recv() encounters a StreamHeader, it automatically calls assemble() and returns the complete HostResponse — callers that don't need incremental consumption see no difference. Only callers that explicitly use recv_stream() get the incremental API.

Client integration

Native client (regular.rs):

  • process_request() chunks outbound requests over the threshold
  • handle_response_payload() routes StreamHeaderWsStreamHandle channel, StreamChunk with known sender → incremental delivery, StreamChunk without header → transparent ReassemblyBuffer reassembly
  • recv() auto-assembles streamed responses; recv_stream() returns the handle for incremental consumption

Browser client (browser.rs):

  • ReassemblyBuffer integrated into the onmessage callback
  • StreamChunk responses are reassembled before dispatching to the result handler
  • Outbound requests are chunked when over threshold
  • No StreamHeader support (Flatbuffers encoding only; header is bincode-specific)

Protocol constants

Constant Value Purpose
CHUNK_SIZE 256 KiB Payload size per chunk
CHUNK_THRESHOLD 512 KiB Minimum size to trigger chunking
MAX_TOTAL_CHUNKS 256 Cap per stream (~64 MiB, covers MAX_STATE_SIZE + overhead)
MAX_CONCURRENT_STREAMS 8 Concurrent streams per ReassemblyBuffer
MAX_CHUNKS_PER_BATCH 32 Chunks before yielding to runtime

Wire format additions

New variants added to the Flatbuffers schemas and Rust enums:

ClientRequest::StreamChunk{ stream_id, index, total, data }
HostResponse::StreamChunk{ stream_id, index, total, data }
HostResponse::StreamHeader{ stream_id, total_bytes, content } (bincode only; returns error over Flatbuffers)

Breaking change justification (0.1.40 → 0.2.0)

This is a breaking change because:

  • New variants added to ClientRequest and HostResponse (both #[non_exhaustive], but the wire protocol changes)
  • WebApi::recv() behavior changes (now handles StreamHeader + StreamChunk internally via select!)
  • WebApi struct has a new stream_rx field
  • New bytes dependency added

Testing

Unit tests (streaming.rs)

  • chunk_request_small / chunk_request_large_roundtrip / chunk_response_roundtrip — roundtrip chunking + reassembly
  • chunk_empty — empty payload handling
  • chunk_exact_boundary — payloads at exact chunk size
  • reassembly_reset_across_messages — buffer cleanup between streams
  • reassembly_error_truncated_header / error_unknown_type — malformed input rejection
  • reassembly_error_zero_total / reassembly_error_too_large — bounds validation
  • reassembly_error_mismatch / reassembly_error_duplicate — consistency checks
  • concurrent_streams — multiple interleaved streams
  • max_concurrent_streams_limit — stream count cap enforcement

Integration tests (regular.rs)

  • test_recv_chunked — server sends chunks without header, client recv() reassembles transparently
  • test_recv_stream_header — server sends header + chunks, client uses recv_stream() + assemble()
  • test_recv_transparent_stream_header — server sends header + chunks, client uses recv() which auto-assembles

Files changed

File Change
rust/Cargo.toml Version bump to 0.2.0, add bytes dependency
rust/src/client_api.rs Expose streaming module under net feature
rust/src/client_api/streaming.rs New — Core streaming protocol: chunking, reassembly, WsStreamHandle, WsStreamSender
rust/src/client_api/client_events.rs Add StreamChunk, StreamHeader, StreamContent to enums + FBS serialization
rust/src/client_api/regular.rs Integrate streaming into native client: chunked send, header/chunk receive routing, recv_stream() API
rust/src/client_api/browser.rs Integrate ReassemblyBuffer into WASM client, chunk outbound requests
schemas/flatbuffers/client_request.fbs Add StreamChunk table and union variant
schemas/flatbuffers/host_response.fbs Add StreamChunk table and union variant
rust/src/generated/client_request_generated.rs Regenerated from FBS schema
rust/src/generated/host_response_generated.rs Regenerated from FBS schema

Downstream dependency

freenet-core#3346 depends on this PR. That PR integrates the server-side streaming into the node's WebSocket handler. This PR must be published to crates.io before the core PR can pass CI.

Fixes

Closes #2256

@netsirius
Copy link
Contributor Author

@claude review this pr

@claude
Copy link

claude bot commented Feb 28, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

@iduartgomez
Copy link
Contributor

Can we make this change backwards compatible instead?

Copy link
Contributor

@iduartgomez iduartgomez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

The code quality is high and the two-layer design (transparent reassembly + optional incremental streaming) is sound. With ?streaming=true gating in the companion PR (freenet-core#3346), old applications using the non-streaming protocol are protected from seeing unknown bincode variants. Most backward compatibility concerns are addressed by that opt-in mechanism.

However, there are a few issues that need attention before merging.


Must Fix (Blocking)

1. FlatBuffers HostResponseType union ordering breaks old browser clients

In schemas/flatbuffers/host_response.fbs, StreamChunk is inserted between GenerateRandData and Ok in the union:

union HostResponseType {
  ContractResponse,
  DelegateResponse,
  GenerateRandData,
  StreamChunk,      // <-- inserted here, shifts Ok and Error
  Ok,
  Error,
}

This shifts Ok from index 4→5 and Error from 5→6. FlatBuffers forward-compatibility only works for appended variants. Old browser clients (using FBS encoding) will misinterpret every Ok and Error response from the server, regardless of whether streaming is enabled. This is a silent data corruption bug, not a deserialization error.

Fix: Move StreamChunk to the end of the union (after Error). The client_request.fbs union is fine — StreamChunk is already appended after Authenticate.

2. No TTL/timeout on ReassemblyBuffer entries

If a stream starts (first chunk received) but never completes (sender crashes, network drops remaining chunks), the partial StreamState entry stays in the HashMap forever. After MAX_CONCURRENT_STREAMS (8) abandoned streams, all new streams are permanently rejected.

Per the project's AGENTS.md: "Cleanup exemptions MUST be time-bounded. Any condition that exempts an entry from garbage collection MUST either expire via TTL or be overridden by an absolute age threshold."

Fix: Add a creation timestamp to StreamState and evict entries older than a reasonable timeout (e.g., 30-60 seconds) when new chunks arrive or when the concurrent stream limit is hit.

3. Deadlock in recv() with 5+ concurrent streamed responses

stream_tx has capacity 4. The recv() method calls handle.assemble().await inline, blocking until all chunks arrive. During assembly, recv() is not draining stream_rx. If the server sends 5+ StreamHeaders rapidly:

  1. Handler sends handles 1-4 to stream_tx (fills capacity)
  2. Client's recv() picks up handle 1, starts assemble().await
  3. Handler tries to send handle 5 to stream_txblocks
  4. Handler is now stuck and can't forward chunks from the WebSocket
  5. Client's assemble() waits for chunks that will never arrive
  6. Deadlock

Fix options:

  • Increase stream_tx capacity significantly, or
  • Make recv() non-blocking: spawn assembly in a background task and return a future/handle, or
  • Buffer stream handles in request_handler instead of blocking on stream_tx.send().await

Should Fix (Important)

4. assemble() has no upper bound check on received bytes

The truncation check (buf.len() < total_bytes) catches under-delivery, but there is no check for over-delivery. A malicious server claiming total_bytes=100 could stream unlimited data into the unbounded channel. The buf.extend_from_slice loop has no cap.

Fix: Add if buf.len() as u64 > self.total_bytes { return Err(...) } inside the receive loop.

5. Evicted stream_senders entries cause orphan reassembly entries

When stream_senders evicts an entry at capacity, subsequent chunks for that evicted stream fall through to the ReassemblyBuffer (since the sender is no longer in the map), creating an orphan partial stream that will never complete — permanently consuming a reassembly slot. This compounds issue #2.

6. MAX_CHUNKS_PER_BATCH is defined but never used

The constant is declared and documented in the PR description as enabling "cooperative yielding," but no yielding logic exists in the code. Either implement the yield logic or remove the dead constant.


Consider (Suggestions)

  • recv() / recv_stream() interaction: Both consume from stream_rx. If a caller alternates between them, behavior is unpredictable. Worth documenting the mutual exclusivity.
  • Browser stream_id = 0 hardcoded: All browser chunked sends use the same stream ID. If concurrent large sends were possible, this would cause DuplicateChunk errors. Mitigated by &mut self, but worth a comment.
  • Test gaps: Out-of-order chunk delivery, exact CHUNK_SIZE boundaries, assemble() truncation error path, and mixed streamed + non-streamed response interleaving are all untested.
  • Test stability: Integration tests use 100ms timeouts and AtomicU16 port allocation — consider port 0 (OS-assigned) and longer timeouts for CI.

What's Done Well

  • Thorough error handling in ReassemblyBuffer with specific error variants for every failure case
  • Zero-copy chunking via Bytes::slice()
  • Clean separation between transparent reassembly (Layer 1) and incremental streaming (Layer 2)
  • Comprehensive unit test suite for the core chunking/reassembly logic
  • Good integration tests covering both chunked-only and header+chunked paths
  • FlatBuffers StreamHeader correctly returns an error rather than silently failing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants