Skip to content

refactor(ddp-streamer): backpressure, deterministic logout, metrics, fan-out tests#40358

Draft
ggazzo wants to merge 2 commits intodevelopfrom
ddp-streamer-improvements
Draft

refactor(ddp-streamer): backpressure, deterministic logout, metrics, fan-out tests#40358
ggazzo wants to merge 2 commits intodevelopfrom
ddp-streamer-improvements

Conversation

@ggazzo
Copy link
Copy Markdown
Member

@ggazzo ggazzo commented May 1, 2026

Summary

Internal refactor of ee/apps/ddp-streamer keeping the wire protocol identical (DDP-over-WS, EJSON, login via Account.login, method fallback to MeteorService). Five focused changes split into one commit.

Heartbeat & backpressure

  • Split heartbeat into idleTimer / pongTimer. The pongTimer is armed only when the server sends a PING and is cleared exclusively by an inbound PONG. Other inbound traffic no longer extends the deadline — a broken client that keeps sending data but never replies to PING is now disconnected deterministically.
  • Drop slow consumers. When ws.bufferedAmount > MAX_BUFFERED_BYTES (default 4 MiB, env override DDP_MAX_BUFFERED_BYTES), the socket is closed with code 1013. Prevents heap blow-up on the streamer pod when a single peer stalls.

Fan-out path

  • RawSender helper. The private ws._sender.sendFrame call is now isolated in lib/RawSender.ts, with the buffer-amount guard applied at fan-out too. The upgrade path to a different ws/uWS implementation is now local to one file.
  • fanOutText extracted from Stream.sendToManySubscriptions so the loop is testable without instantiating the full Streamer base class.

Lifecycle

  • Logout: setTimeout(1ms)setImmediate. ws guarantees frame ordering on the wire, so the result/updated frames reach the wire before the close frame.
  • Binary frame tolerance. Server.parse decodes binary buffers as UTF-8 instead of throwing 500. Some proxies and ArrayBuffer paths deliver text payloads in binary frames.

Observability

  • New metrics:
    • ddp_method_total{namespace, status} — counter, labeled by method namespace prefix (before :/.) to keep Prometheus cardinality bounded
    • ddp_close_total{code} — counter
    • ddp_send_buffer_bytes{nodeID} — gauge sampled every 5 s
  • Replace ad-hoc console.error/warn with @rocket.chat/logger in Client.ts and Streamer.ts.

Tests

  • New specs: Client.spec, Streamer.spec, RawSender.spec.
  • Server.spec extended to cover parse() and metric increments.
  • 6 → 32 tests; coverage ~10% → 63% (RawSender 94%, Streamer 75%, Server 65%, Client 60%).

Out of scope (future work)

  • Switching from ws to uWebSockets.js.
  • DDP session resume / restart-storm jitter.
  • permessage-deflate.
  • Microbatching method calls back to the monolith.

Test plan

  • yarn workspace @rocket.chat/ddp-streamer test — 32/32 passing
  • yarn workspace @rocket.chat/ddp-streamer typecheck — clean
  • yarn workspace @rocket.chat/ddp-streamer lint — 0 errors (8 pre-existing warnings in untouched code)
  • Local smoke: monolith + ddp-streamer up, login, subscribe to stream-room-messages, send messages, logout, reconnect
  • Stage canary: watch p99 fan-out latency, ddp_send_buffer_bytes, ddp_close_total{code="1013"} for one full traffic cycle
  • Confirm no regression in users_connected / users_logged gauges

…fan-out tests

Internal refactor of ee/apps/ddp-streamer keeping the wire protocol identical
(DDP-over-WS, EJSON, login via Account.login, method fallback to MeteorService).

Heartbeat & backpressure:
- Split heartbeat into idleTimer / pongTimer. The pongTimer is armed only when
  a server PING is sent and is cleared exclusively by an inbound PONG; other
  inbound traffic no longer extends the deadline (a broken client that keeps
  sending data but never replies to PING is now disconnected deterministically).
- Drop slow consumers: when ws.bufferedAmount exceeds MAX_BUFFERED_BYTES
  (default 4 MiB, env override DDP_MAX_BUFFERED_BYTES), close with code 1013.

Fan-out path:
- Encapsulate the ws private _sender.sendFrame call behind RawSender so the
  upgrade path to a different ws/uWS implementation is local. Apply the same
  bufferedAmount guard at the fan-out level.
- Extract fanOutText from Stream.sendToManySubscriptions so the loop is
  testable without instantiating the full Streamer base class.

Lifecycle:
- Replace setTimeout(1ms) on logout with setImmediate; ws frame ordering
  guarantees the result/updated frames reach the wire before the close frame.
- Decode binary frames as UTF-8 in Server.parse instead of throwing 500
  (some proxies/wrappers deliver UTF-8 JSON in binary frames).

Observability:
- Register ddp_method_total{namespace,status}, ddp_close_total{code} and
  ddp_send_buffer_bytes{nodeID} (sampled every 5s).
- Method labels are bucketed by the prefix before ':' or '.' to keep
  Prometheus cardinality bounded.
- Replace console.error/warn with @rocket.chat/logger in Client.ts and
  Streamer.ts.

Tests:
- New specs: Client.spec, Streamer.spec, RawSender.spec.
- Server.spec extended to cover parse() and metric increments.
- Suite grows from 6 to 32 tests; coverage from ~10% to 63% (RawSender 94%,
  Streamer 75%, Server 65%, Client 60%).
@dionisio-bot
Copy link
Copy Markdown
Contributor

dionisio-bot Bot commented May 1, 2026

Looks like this PR is not ready to merge, because of the following issues:

  • This PR is missing the 'stat: QA assured' label
  • This PR is missing the required milestone or project

Please fix the issues and try again

If you have any trouble, please check the PR guidelines

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 1, 2026

⚠️ No Changeset found

Latest commit: f85a7b0

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 1, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 158bd709-394d-4d1c-b52a-13319b284b1f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 1, 2026

Codecov Report

❌ Patch coverage is 72.11538% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.99%. Comparing base (d33009a) to head (f85a7b0).

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           develop   #40358      +/-   ##
===========================================
+ Coverage    69.97%   69.99%   +0.02%     
===========================================
  Files         3301     3305       +4     
  Lines       120443   120700     +257     
  Branches     21559    21630      +71     
===========================================
+ Hits         84281    84486     +205     
- Misses       32862    32921      +59     
+ Partials      3300     3293       -7     
Flag Coverage Δ
unit 70.85% <72.11%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant