Skip to content

Commit a1dc3c5

Browse files
d-csclaude
andauthored
feat(webapp): mollifier API GET read-fallback — synthetic primitives + route wiring (#3755)
## Summary Synthesise QUEUED/FAILED responses from the mollifier buffer when a TaskRun row hasn't landed in Postgres yet. Wires the synthesis into: - `ApiRetrieveRunPresenter` - v1 trace GET route - v1 spans GET route - attempts route gains a GET loader (fixes pre-existing Remix "no loader" 400) The `readFallback` infra itself lives on the trigger PR (consumed by `IdempotencyKeyConcern`); this PR adds the route-level synthetic-rendering primitives. Stacked on the replay PR. ## Test plan - [x] \`pnpm run typecheck --filter webapp\` passes - [x] \`pnpm run test --filter webapp test/mollifierSyntheticRedirectInfo.test.ts\` passes - [x] \`pnpm run test --filter webapp test/mollifierSyntheticSpanRun.test.ts\` passes --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4745754 commit a1dc3c5

16 files changed

Lines changed: 1866 additions & 29 deletions

.server-changes/mollifier-reads.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and events endpoints.

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 217 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@ import {
99
logger,
1010
} from "@trigger.dev/core/v3";
1111
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
12+
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
1213
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
1314
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
1415
import assertNever from "assert-never";
1516
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1617
import { $replica, prisma } from "~/db.server";
1718
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
19+
import {
20+
findRunByIdWithMollifierFallback,
21+
type SyntheticRun,
22+
} from "~/v3/mollifier/readFallback.server";
1823
import { generatePresignedUrl } from "~/v3/objectStore.server";
1924
import { tracer } from "~/v3/tracer.server";
2025
import { startSpanWithEnv } from "~/v3/tracing.server";
@@ -64,13 +69,46 @@ type CommonRelatedRun = Prisma.Result<
6469
"findFirstOrThrow"
6570
>;
6671

67-
type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
72+
// Full shape returned by findRun() — the commonRunSelect fields plus the
73+
// extras the route handler reads. Declared explicitly (not inferred via
74+
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
75+
// run without the type becoming self-referential.
76+
// Exported so the buffer-synthesis helper below can be unit-tested
77+
// against a stable shape without re-deriving it (FoundRun's exact field
78+
// list is what the buffered run must match for `call()` not to surprise).
79+
export type FoundRun = CommonRelatedRun & {
80+
traceId: string;
81+
payload: string;
82+
payloadType: string;
83+
output: string | null;
84+
outputType: string;
85+
error: Prisma.JsonValue;
86+
attempts: { id: string }[];
87+
attemptNumber: number | null;
88+
engine: "V1" | "V2";
89+
taskEventStore: string;
90+
parentTaskRun: CommonRelatedRun | null;
91+
rootTaskRun: CommonRelatedRun | null;
92+
childRuns: CommonRelatedRun[];
93+
// True when this run was synthesised from the mollifier buffer rather
94+
// than read from Postgres. Callers that would otherwise query backing
95+
// stores keyed on PG identifiers (e.g. ClickHouse event lookups by
96+
// traceId) can short-circuit to an empty response — buffered runs
97+
// haven't executed and have no events to fetch. Devin's analysis on
98+
// PR #3755 (events endpoint) flagged the pre-fix code as making a
99+
// wasted ClickHouse round-trip when this is set; gate on this flag
100+
// instead.
101+
isBuffered: boolean;
102+
};
68103

69104
export class ApiRetrieveRunPresenter {
70105
constructor(private readonly apiVersion: API_VERSIONS) {}
71106

72-
public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
73-
return $replica.taskRun.findFirst({
107+
public static async findRun(
108+
friendlyId: string,
109+
env: AuthenticatedEnvironment,
110+
): Promise<FoundRun | null> {
111+
const pgRow = await $replica.taskRun.findFirst({
74112
where: {
75113
friendlyId,
76114
runtimeEnvironmentId: env.id,
@@ -102,6 +140,23 @@ export class ApiRetrieveRunPresenter {
102140
},
103141
},
104142
});
143+
144+
if (pgRow) return { ...pgRow, isBuffered: false };
145+
146+
// Postgres miss → fall back to the mollifier buffer. When the gate
147+
// diverted a trigger, the run lives in Redis until the drainer replays
148+
// it through engine.trigger. Synthesise the FoundRun shape so call()
149+
// returns a `QUEUED` (or `FAILED`) response with empty output, no
150+
// attempts, no relations.
151+
const buffered = await findRunByIdWithMollifierFallback({
152+
runId: friendlyId,
153+
environmentId: env.id,
154+
organizationId: env.organizationId,
155+
});
156+
157+
if (!buffered) return null;
158+
159+
return synthesiseFoundRunFromBuffer(buffered);
105160
}
106161

107162
public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
@@ -475,3 +530,162 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
475530
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
476531
}
477532
}
533+
534+
// Build a FoundRun-shaped object from a buffered (mollified) run. The run
535+
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
536+
// yet, so every field that comes from execution state (output, attempts,
537+
// completedAt, cost, relations) takes a default. The presenter's call()
538+
// handles QUEUED-state runs without surprise.
539+
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
540+
switch (status) {
541+
case "FAILED":
542+
return "SYSTEM_FAILURE";
543+
case "CANCELED":
544+
return "CANCELED";
545+
default:
546+
return "PENDING";
547+
}
548+
}
549+
550+
// The PG path stores `TaskRun.payload` as `String?`, so in production
551+
// the buffered snapshot's `payload` is always a string. We defensively
552+
// coerce other types instead of silently dropping them: an object gets
553+
// JSON-stringified (matches how the trigger path would serialise it),
554+
// anything truly unrenderable falls back to an empty string. The log
555+
// line surfaces format drift to ops without crashing the read path.
556+
function synthesisePayload(buffered: SyntheticRun): string {
557+
const payload = buffered.payload;
558+
if (typeof payload === "string") return payload;
559+
if (payload === undefined || payload === null) return "";
560+
try {
561+
const serialised = JSON.stringify(payload);
562+
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", {
563+
runFriendlyId: buffered.friendlyId,
564+
payloadType: typeof payload,
565+
});
566+
return typeof serialised === "string" ? serialised : "";
567+
} catch {
568+
logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", {
569+
runFriendlyId: buffered.friendlyId,
570+
payloadType: typeof payload,
571+
});
572+
return "";
573+
}
574+
}
575+
576+
// Mirror synthesisePayload for metadata. The PG path stores
577+
// `TaskRun.metadata` as `String?`, and the snapshot writes it from
578+
// `metadataPacket.data` (also a string), so in production it is always a
579+
// string or absent. We coerce defensively — an object gets JSON-stringified
580+
// (matching how the trigger path serialises it) rather than silently
581+
// dropped to null, and the log line surfaces format drift to ops.
582+
function synthesiseMetadata(buffered: SyntheticRun): string | null {
583+
const metadata = buffered.metadata;
584+
if (typeof metadata === "string") return metadata;
585+
if (metadata === undefined || metadata === null) return null;
586+
try {
587+
const serialised = JSON.stringify(metadata);
588+
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", {
589+
runFriendlyId: buffered.friendlyId,
590+
metadataType: typeof metadata,
591+
});
592+
return typeof serialised === "string" ? serialised : null;
593+
} catch {
594+
logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", {
595+
runFriendlyId: buffered.friendlyId,
596+
metadataType: typeof metadata,
597+
});
598+
return null;
599+
}
600+
}
601+
602+
// Exported for unit testing. Used by `findRun()` above when the
603+
// Postgres lookup misses and the buffer carries the run — keep the shape
604+
// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised
605+
// buffered run identically to a freshly-triggered PG row.
606+
export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
607+
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);
608+
609+
const errorJson: Prisma.JsonValue = buffered.error
610+
? {
611+
type: "STRING_ERROR",
612+
raw: `${buffered.error.code}: ${buffered.error.message}`,
613+
}
614+
: null;
615+
616+
const metadata: string | null = synthesiseMetadata(buffered);
617+
618+
return {
619+
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
620+
// is the user-facing `run_xxx` token. Downstream logging keyed off
621+
// `taskRun.id` correlates with other systems via the cuid — using
622+
// the friendlyId here breaks log correlation. `SyntheticRun` carries
623+
// the cuid alongside the friendlyId for exactly this reason
624+
// (RunId.fromFriendlyId in readFallback.server.ts).
625+
id: buffered.id,
626+
friendlyId: buffered.friendlyId,
627+
status,
628+
taskIdentifier: buffered.taskIdentifier ?? "",
629+
createdAt: buffered.createdAt,
630+
startedAt: null,
631+
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
632+
// PG-resident SYSTEM_FAILURE rows always have `completedAt` set by
633+
// the engine; the buffer-synth path must match so SDK consumers
634+
// that poll on `isCompleted` and then read `finishedAt` see a real
635+
// timestamp instead of `undefined`. CANCELED already had this via
636+
// `buffered.cancelledAt`; fall back to `buffered.createdAt` for
637+
// FAILED (the buffer entry has no separate "failedAt" — the
638+
// best-available approximation of when the terminal state landed
639+
// is the entry's creation time).
640+
completedAt:
641+
buffered.cancelledAt ?? (status === "SYSTEM_FAILURE" ? buffered.createdAt : null),
642+
expiredAt: null,
643+
delayUntil: buffered.delayUntil ?? null,
644+
metadata,
645+
metadataType: buffered.metadataType ?? "application/json",
646+
ttl: buffered.ttl ?? null,
647+
costInCents: 0,
648+
baseCostInCents: 0,
649+
usageDurationMs: 0,
650+
idempotencyKey: buffered.idempotencyKey ?? null,
651+
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
652+
isTest: buffered.isTest,
653+
depth: buffered.depth,
654+
// Scheduled triggers go through the same TriggerTaskService path as
655+
// API triggers and aren't bypassed by the mollifier gate, so a
656+
// scheduled run can land in the buffer with its scheduleId set on the
657+
// snapshot. Forward it so resolveSchedule() can hydrate the `schedule`
658+
// field in the API response instead of silently dropping it until the
659+
// drainer materialises.
660+
scheduleId: buffered.scheduleId ?? null,
661+
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
662+
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
663+
// Reconstruct the batch from the snapshot's internal id so a buffered
664+
// run reports the same `batchId` / triggerFunction as it will once
665+
// materialised, and so batch-scoped JWTs authorise against it (the
666+
// route authorization callbacks read `run.batch?.friendlyId`).
667+
batch: buffered.batchId
668+
? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) }
669+
: null,
670+
runTags: buffered.tags,
671+
traceId: buffered.traceId ?? "",
672+
payload: synthesisePayload(buffered),
673+
payloadType: buffered.payloadType ?? "application/json",
674+
output: null,
675+
outputType: "application/json",
676+
error: errorJson,
677+
attempts: [],
678+
attemptNumber: null,
679+
engine: "V2",
680+
taskEventStore: "taskEvent",
681+
// Empty string when absent (matches syntheticSpanRun.server.ts and lets
682+
// `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the
683+
// API response's `region` to undefined instead of advertising a
684+
// misleading "main" region for a not-yet-assigned buffered run).
685+
workerQueue: buffered.workerQueue ?? "",
686+
parentTaskRun: null,
687+
rootTaskRun: null,
688+
childRuns: [],
689+
isBuffered: true,
690+
};
691+
}

apps/webapp/app/routes/api.v1.runs.$runId.events.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ export const loader = createLoaderApiRoute(
3838
},
3939
},
4040
async ({ resource: run, authentication }) => {
41+
// Short-circuit for mollifier-buffered runs. The drainer hasn't
42+
// materialised execution events yet (the gate intercepts before
43+
// any trace event is written), so a ClickHouse round-trip is
44+
// guaranteed to come back empty. `findRun` now sets `isBuffered`
45+
// explicitly on its return value — gate on that rather than
46+
// probing surrogate fields like `traceId === ""`.
47+
if (run.isBuffered) {
48+
return json({ events: [] }, { status: 200 });
49+
}
50+
4151
const eventRepository = await getEventRepositoryForStore(
4252
run.taskEventStore,
4353
authentication.environment.organization.id

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,69 @@ import {
99
} from "~/services/routeBuilders/apiBuilder.server";
1010
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1111
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
12+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
13+
import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server";
1214

1315
const ParamsSchema = z.object({
1416
runId: z.string(),
1517
spanId: z.string(),
1618
});
1719

20+
// Resolve the run from either Postgres or the mollifier buffer.
21+
// Buffered runs only have one valid spanId (the queued span recorded at
22+
// gate time and reused as the run's root spanId when the drainer
23+
// materialises). Any other spanId returns a deterministic 404; the queued
24+
// span returns a minimal synthesised shape so the customer's SDK sees the
25+
// same 200 contract they'd get for a freshly-triggered run.
26+
type ResolvedRun =
27+
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
28+
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
29+
30+
async function findPgRun(runId: string, environmentId: string) {
31+
return $replica.taskRun.findFirst({
32+
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
33+
});
34+
}
35+
1836
export const loader = createLoaderApiRoute(
1937
{
2038
params: ParamsSchema,
2139
allowJWT: true,
2240
corsStrategy: "all",
23-
findResource: (params, auth) => {
24-
return $replica.taskRun.findFirst({
25-
where: {
26-
friendlyId: params.runId,
27-
runtimeEnvironmentId: auth.environment.id,
28-
},
41+
findResource: async (params, auth): Promise<ResolvedRun | null> => {
42+
const pgRun = await findPgRun(params.runId, auth.environment.id);
43+
if (pgRun) return { source: "pg", run: pgRun };
44+
45+
const buffered = await findRunByIdWithMollifierFallback({
46+
runId: params.runId,
47+
environmentId: auth.environment.id,
48+
organizationId: auth.environment.organizationId,
2949
});
50+
if (buffered) return { source: "buffer", run: buffered };
51+
52+
return null;
3053
},
3154
shouldRetryNotFound: true,
3255
authorization: {
3356
action: "read",
34-
resource: (run) => {
57+
resource: (resolved) => {
58+
if (resolved.source === "pg") {
59+
const run = resolved.run;
60+
const resources = [
61+
{ type: "runs", id: run.friendlyId },
62+
{ type: "tasks", id: run.taskIdentifier },
63+
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
64+
];
65+
if (run.batchId) {
66+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
67+
}
68+
return anyResource(resources);
69+
}
70+
const run = resolved.run;
3571
const resources = [
3672
{ type: "runs", id: run.friendlyId },
37-
{ type: "tasks", id: run.taskIdentifier },
38-
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
73+
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
74+
...run.tags.map((tag) => ({ type: "tags", id: tag })),
3975
];
4076
if (run.batchId) {
4177
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
@@ -44,7 +80,20 @@ export const loader = createLoaderApiRoute(
4480
},
4581
},
4682
},
47-
async ({ params, resource: run, authentication }) => {
83+
async ({ params, resource: resolved, authentication }) => {
84+
if (resolved.source === "buffer") {
85+
// Buffered runs have exactly one valid spanId — the queued span the
86+
// mollifier gate recorded at trigger time, which becomes the run's
87+
// root spanId once the drainer materialises. Any other spanId is a
88+
// deterministic 404. The matching spanId returns a minimal shape
89+
// representing "span exists, no execution data yet."
90+
if (resolved.run.spanId !== params.spanId) {
91+
return json({ error: "Span not found" }, { status: 404 });
92+
}
93+
return json(buildSyntheticSpanDetailBody(resolved.run), { status: 200 });
94+
}
95+
96+
const run = resolved.run;
4897
const eventRepository = await getEventRepositoryForStore(
4998
run.taskEventStore,
5099
authentication.environment.organization.id

0 commit comments

Comments
 (0)