Skip to content

Commit 3a47cc7

Browse files
d-csclaude
andcommitted
feat(webapp,redis-worker): add global mollifier gate mode
The per-env trip rate-limits each environment independently and cannot bound the aggregate trigger rate hitting the primary database. Global mode rate-limits the fleet-wide aggregate via a single shared Redis counter (mollifier:rate:{global}), ignoring per-env contributions. Behind config, default unchanged (per_env). - redis-worker: MollifierBuffer.evaluateTripGlobal reuses the trip Lua against hash-tagged global keys - trip evaluator gains a mode option; global mode reports reason "global_rate" - separate TRIGGER_MOLLIFIER_GATE_MODE + TRIGGER_MOLLIFIER_GLOBAL_* env vars Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 4ea3ef1 commit 3a47cc7

9 files changed

Lines changed: 373 additions & 9 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add `MollifierBuffer.evaluateTripGlobal` — a fleet-wide variant of `evaluateTrip` that increments a single shared fixed-window counter regardless of env, so the mollifier can rate-limit the aggregate trigger rate rather than per-env. Reuses the existing trip Lua; keys are hash-tagged for Redis Cluster safety.

apps/webapp/app/env.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,18 @@ const EnvironmentSchema = z
10971097
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
10981098
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
10991099
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
1100+
// Gate mode. "per_env" (default) rate-limits each env independently via the
1101+
// TRIGGER_MOLLIFIER_TRIP_* values above. "global" rate-limits the aggregate
1102+
// fleet-wide trigger.run rate via a single shared counter (ignoring per-env
1103+
// contributions) using the TRIGGER_MOLLIFIER_GLOBAL_* values below — it
1104+
// protects the primary DB from the aggregate rate that per-env tripping
1105+
// cannot bound. The two parameter sets are kept separate so switching modes
1106+
// never silently reuses the other regime's tuning. Global threshold default
1107+
// (1000 per 200ms ≈ 5k/s) targets the observed aggregate metastable edge.
1108+
TRIGGER_MOLLIFIER_GATE_MODE: z.enum(["per_env", "global"]).default("per_env"),
1109+
TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1110+
TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD: z.coerce.number().int().positive().default(1000),
1111+
TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS: z.coerce.number().int().positive().default(500),
11001112
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
11011113
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
11021114
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),

apps/webapp/app/v3/mollifier/mollifierGate.server.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export type TripDecision =
2323
| { divert: false }
2424
| {
2525
divert: true;
26-
reason: "per_env_rate";
26+
reason: "per_env_rate" | "global_rate";
2727
count: number;
2828
threshold: number;
2929
windowMs: number;
@@ -88,11 +88,22 @@ export type GateDependencies = {
8888
// gate observing whichever env values are live at trigger time.
8989
const defaultEvaluator = createRealTripEvaluator({
9090
getBuffer: () => getMollifierBuffer(),
91-
options: () => ({
92-
windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS,
93-
threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD,
94-
holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS,
95-
}),
91+
// Pick the per-env or global rate parameters based on the configured gate
92+
// mode. Kept as separate env vars so the two regimes never share tuning.
93+
options: () =>
94+
env.TRIGGER_MOLLIFIER_GATE_MODE === "global"
95+
? {
96+
mode: "global",
97+
windowMs: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS,
98+
threshold: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD,
99+
holdMs: env.TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS,
100+
}
101+
: {
102+
mode: "per_env",
103+
windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS,
104+
threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD,
105+
holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS,
106+
},
96107
});
97108

98109
function logDivertDecision(

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export const mollifierDecisionsCounter = meter.createCounter("mollifier.decision
77
});
88

99
export type DecisionOutcome = "pass_through" | "shadow_log" | "mollify";
10-
export type DecisionReason = "per_env_rate";
10+
export type DecisionReason = "per_env_rate" | "global_rate";
1111

1212
export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason): void {
1313
mollifierDecisionsCounter.add(1, {

apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ import { logger } from "~/services/logger.server";
33
import type { GateInputs, TripDecision, TripEvaluator } from "./mollifierGate.server";
44

55
export type TripEvaluatorOptions = {
6+
// "per_env" (default) rate-limits each env independently. "global" rate-limits
7+
// the aggregate fleet-wide trigger.run rate via a single shared counter and
8+
// ignores per-env contributions — it protects shared infra (the primary DB)
9+
// from the aggregate rate that per-env tripping structurally cannot bound.
10+
mode?: "per_env" | "global";
611
windowMs: number;
712
threshold: number;
813
holdMs: number;
@@ -21,12 +26,15 @@ export function createRealTripEvaluator(deps: CreateRealTripEvaluatorDeps): Trip
2126
const opts = deps.options();
2227

2328
try {
24-
const { tripped, count } = await buffer.evaluateTrip(inputs.envId, opts);
29+
const { tripped, count } =
30+
opts.mode === "global"
31+
? await buffer.evaluateTripGlobal(opts)
32+
: await buffer.evaluateTrip(inputs.envId, opts);
2533
if (!tripped) return { divert: false };
2634

2735
return {
2836
divert: true,
29-
reason: "per_env_rate",
37+
reason: opts.mode === "global" ? "global_rate" : "per_env_rate",
3038
count,
3139
threshold: opts.threshold,
3240
windowMs: opts.windowMs,

apps/webapp/test/mollifierGate.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,51 @@ describe("evaluateGate cascade — exhaustive truth table", () => {
182182
});
183183
});
184184

185+
// Global-mode trips surface `reason: "global_rate"` (vs "per_env_rate"). The
186+
// gate is reason-agnostic — it forwards `decision.reason` straight to
187+
// `recordDecision` — but `reason` is the metric label we alert/dashboard on, so
188+
// pin the contract end-to-end for both the shadow-log and mollify outcomes.
189+
const globalTrippedDecision = {
190+
divert: true as const,
191+
reason: "global_rate" as const,
192+
count: 1200,
193+
threshold: 1000,
194+
windowMs: 200,
195+
holdMs: 500,
196+
};
197+
198+
describe("evaluateGate — global_rate reason propagates to the metric", () => {
199+
it("shadow path records shadow_log with reason global_rate", async () => {
200+
const { deps, spies } = makeDeps({
201+
enabled: true,
202+
shadow: true,
203+
flag: false,
204+
decision: globalTrippedDecision,
205+
});
206+
207+
const outcome = await evaluateGate(inputs, deps);
208+
209+
expect(outcome.action).toBe("shadow_log");
210+
expect(spies.recordDecisionCalls).toEqual([{ outcome: "shadow_log", reason: "global_rate" }]);
211+
expect(spies.logShadowCalls).toEqual([{ inputs, decision: globalTrippedDecision }]);
212+
});
213+
214+
it("mollify path records mollify with reason global_rate", async () => {
215+
const { deps, spies } = makeDeps({
216+
enabled: true,
217+
shadow: false,
218+
flag: true,
219+
decision: globalTrippedDecision,
220+
});
221+
222+
const outcome = await evaluateGate(inputs, deps);
223+
224+
expect(outcome.action).toBe("mollify");
225+
expect(spies.recordDecisionCalls).toEqual([{ outcome: "mollify", reason: "global_rate" }]);
226+
expect(spies.logMollifiedCalls).toEqual([{ inputs, decision: globalTrippedDecision }]);
227+
});
228+
});
229+
185230
// Hot-path guard: `triggerTask.server.ts` calls `evaluateGate` on every
186231
// trigger when `TRIGGER_MOLLIFIER_ENABLED=1`. The per-org override path must resolve
187232
// without a Prisma round-trip — otherwise the gate adds a DB query to the

apps/webapp/test/mollifierTripEvaluator.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,94 @@ describe("createRealTripEvaluator", () => {
6060
},
6161
);
6262

63+
redisTest(
64+
"global mode trips on aggregate load across distinct envs with reason global_rate",
65+
async ({ redisOptions }) => {
66+
const buffer = new MollifierBuffer({ redisOptions });
67+
try {
68+
// threshold=3 → the 4th trigger trips. Crucially every trigger is a
69+
// DIFFERENT env, so per-env tripping would never fire (each env count=1).
70+
const options = { mode: "global", windowMs: 5000, threshold: 3, holdMs: 5000 } as const;
71+
const evaluator = createRealTripEvaluator({
72+
getBuffer: () => buffer,
73+
options: () => options,
74+
});
75+
76+
await evaluator({ ...inputs, envId: "g1" });
77+
await evaluator({ ...inputs, envId: "g2" });
78+
await evaluator({ ...inputs, envId: "g3" });
79+
const decision = await evaluator({ ...inputs, envId: "g4" });
80+
81+
expect(decision.divert).toBe(true);
82+
if (decision.divert) {
83+
expect(decision.reason).toBe("global_rate");
84+
expect(decision.count).toBeGreaterThan(options.threshold);
85+
}
86+
} finally {
87+
await buffer.close();
88+
}
89+
},
90+
);
91+
92+
redisTest(
93+
"per_env mode does NOT trip on the same load spread across distinct envs",
94+
async ({ redisOptions }) => {
95+
const buffer = new MollifierBuffer({ redisOptions });
96+
try {
97+
const options = { mode: "per_env", windowMs: 5000, threshold: 3, holdMs: 5000 } as const;
98+
const evaluator = createRealTripEvaluator({
99+
getBuffer: () => buffer,
100+
options: () => options,
101+
});
102+
103+
// Four triggers, four distinct envs — every per-env counter stays at 1.
104+
for (const envId of ["p1", "p2", "p3", "p4"]) {
105+
const decision = await evaluator({ ...inputs, envId });
106+
expect(decision.divert).toBe(false);
107+
}
108+
} finally {
109+
await buffer.close();
110+
}
111+
},
112+
);
113+
114+
redisTest(
115+
"switching to global mid-flight starts the global counter cold (per-env load does not preload it)",
116+
async ({ redisOptions }) => {
117+
const buffer = new MollifierBuffer({ redisOptions });
118+
try {
119+
let mode: "per_env" | "global" = "per_env";
120+
const evaluator = createRealTripEvaluator({
121+
getBuffer: () => buffer,
122+
options: () => ({ mode, windowMs: 5000, threshold: 2, holdMs: 5000 }),
123+
});
124+
125+
// Per-env load on env "s1": the 3rd call trips its per-env counter.
126+
await evaluator({ ...inputs, envId: "s1" });
127+
await evaluator({ ...inputs, envId: "s1" });
128+
const perEnvTrip = await evaluator({ ...inputs, envId: "s1" });
129+
expect(perEnvTrip.divert).toBe(true);
130+
131+
// Flip to global. If per-env activity had leaked into the global
132+
// counter it would already be over threshold; instead the global
133+
// counter starts at 0, so the first two ticks don't trip and the third
134+
// does — proving cold start + isolation from the per-env counters.
135+
mode = "global";
136+
expect((await evaluator({ ...inputs, envId: "s2" })).divert).toBe(false);
137+
expect((await evaluator({ ...inputs, envId: "s3" })).divert).toBe(false);
138+
const globalTrip = await evaluator({ ...inputs, envId: "s4" });
139+
140+
expect(globalTrip.divert).toBe(true);
141+
if (globalTrip.divert) {
142+
expect(globalTrip.reason).toBe("global_rate");
143+
expect(globalTrip.count).toBe(3);
144+
}
145+
} finally {
146+
await buffer.close();
147+
}
148+
},
149+
);
150+
63151
redisTest("returns divert=false when getBuffer returns null (fail-open)", async () => {
64152
const evaluator = createRealTripEvaluator({
65153
getBuffer: () => null,

0 commit comments

Comments
 (0)