Skip to content

Commit 16ad47e

Browse files
d-csclaude
andcommitted
feat(redis-worker): per-env batched pop in MollifierDrainer
Adds a drainBatchSize option (default 1, preserves existing behaviour) that lets the drainer pop up to N entries from each chosen env per tick and dispatch them all through the shared concurrency-bounded limiter. Org/env fairness is preserved — the per-tick env selection is unchanged, only the in-env pop count grows. Wires TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE through the webapp (default 50). For a single-env burst of K entries with K > 1, drain time drops from K × tick_time to ceil(K / drainBatchSize) × tick_time with handler parallelism capped at concurrency. Heavy single-tenant tails go from minutes to tens of seconds without changing PG load characteristics. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent eaae99e commit 16ad47e

6 files changed

Lines changed: 489 additions & 30 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
`MollifierDrainer` now accepts a `drainBatchSize` option that controls how many entries it pops from a single env per tick. Default remains 1 (one pop per env per tick — previous behaviour). Setting it higher lets a single-env burst drain at handler-parallelism speed instead of one entry per ~50ms tick: the drainer pops up to `drainBatchSize` from the picked env and dispatches all popped entries through the shared `concurrency`-bounded limiter. Org/env fairness is unchanged — the per-tick env selection is unaffected.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Wire `TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE` (default 50) into the drainer so single-env bursts drain at the full `DRAIN_CONCURRENCY` budget instead of one pop per ~50ms tick. For a 20k-trigger burst on one env this cuts drain time from minutes to ~tens of seconds; smaller bursts (e.g. 50 on one env) drop from ~2.5s to ~50–100ms tail.

apps/webapp/app/env.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,16 @@ const EnvironmentSchema = z
11011101
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
11021102
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
11031103
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
1104+
// Per-env per-tick pop cap. The drainer rotates one env per org per
1105+
// tick; this bounds how many entries it pops from that env before
1106+
// dispatching them through the shared `DRAIN_CONCURRENCY`-bounded
1107+
// limiter. Default matches `DRAIN_CONCURRENCY` so a single-env burst
1108+
// uses the full handler-parallelism budget — for 20k buffered on one
1109+
// env this is the difference between ~17m (one-pop-per-tick × ~50ms)
1110+
// and ~20s (400 ticks × concurrent engine.trigger). Org/env fairness
1111+
// is preserved because the per-tick env selection is unchanged; only
1112+
// the in-env pop count grows.
1113+
TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE: z.coerce.number().int().positive().default(50),
11041114
// Periodic sweep that scans buffer queue LISTs for entries whose
11051115
// dwell exceeds the stale threshold. Independent of the drainer —
11061116
// its job is exactly to make a stuck/offline drainer visible to

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
7272
logger.debug("Initializing mollifier drainer", {
7373
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
7474
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
75+
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
7576
});
7677

7778
const drainer = new MollifierDrainer<MollifierSnapshot>({
@@ -81,6 +82,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
8182
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
8283
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
8384
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
85+
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
8486
isRetryable: isRetryablePgError,
8587
});
8688

0 commit comments

Comments
 (0)