Skip to content

Commit 7ad380a

Browse files
pranaygpclaude
andcommitted
perf: add events.createBatch() for batch event creation
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9fb5af7 commit 7ad380a

8 files changed

Lines changed: 394 additions & 97 deletions

File tree

.changeset/batch-event-creation.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-postgres": patch
6+
"@workflow/world-vercel": patch
7+
---
8+
9+
perf: add events.createBatch() for batch event creation
10+
11+
- Add `createBatch()` method to Storage interface for creating multiple events atomically
12+
- Use batch event creation in suspension handler for improved performance
13+
- Use batch event creation for wait_completed events in runtime

packages/core/src/runtime.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,14 @@ export function workflowEntrypoint(
359359
correlationId: e.correlationId,
360360
}));
361361

362-
// Create all wait_completed events
363-
for (const waitEvent of waitsToComplete) {
364-
const result = await world.events.create(runId, waitEvent);
365-
// Add the event to the events array so the workflow can see it
366-
events.push(result.event);
362+
// Batch create all wait_completed events
363+
if (waitsToComplete.length > 0) {
364+
const completedResults = await world.events.createBatch(
365+
runId,
366+
waitsToComplete
367+
);
368+
// Add the events to the events array so the workflow can see them
369+
events.push(...completedResults.map((r) => r.event));
367370
}
368371

369372
const result = await runWorkflow(

packages/core/src/runtime/suspension-handler.ts

Lines changed: 85 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export interface SuspensionHandlerResult {
3030
/**
3131
* Handles a workflow suspension by processing all pending operations (hooks, steps, waits).
3232
* Uses an event-sourced architecture where entities (steps, hooks) are created atomically
33-
* with their corresponding events via events.create().
33+
* with their corresponding events via createBatch.
3434
*
3535
* Processing order:
3636
* 1. Hooks are processed first to prevent race conditions with webhook receivers
@@ -72,7 +72,7 @@ export async function handleSuspension({
7272
});
7373

7474
// Process hooks first to prevent race conditions with webhook receivers
75-
// All hook creations run in parallel
75+
// Hooks must be processed individually (not batched) to detect hook_conflict events
7676
// Track any hook conflicts that occur - these will be handled by re-enqueueing the workflow
7777
let hasHookConflict = false;
7878

@@ -104,106 +104,101 @@ export async function handleSuspension({
104104
);
105105
}
106106

107-
// Build a map of stepId -> step event for steps that need creation
108-
const stepsNeedingCreation = new Set(
109-
stepItems
110-
.filter((queueItem) => !queueItem.hasCreatedEvent)
111-
.map((queueItem) => queueItem.correlationId)
107+
// Build step_created events only for steps that haven't been created yet
108+
// Steps with hasCreatedEvent=true already have their event in the log
109+
const stepsNeedingCreation = stepItems.filter(
110+
(queueItem) => !queueItem.hasCreatedEvent
111+
);
112+
const stepEvents: CreateEventRequest[] = stepsNeedingCreation.map(
113+
(queueItem) => {
114+
const dehydratedInput = dehydrateStepArguments(
115+
{
116+
args: queueItem.args,
117+
closureVars: queueItem.closureVars,
118+
thisVal: queueItem.thisVal,
119+
},
120+
suspension.globalThis
121+
);
122+
return {
123+
eventType: 'step_created' as const,
124+
correlationId: queueItem.correlationId,
125+
eventData: {
126+
stepName: queueItem.stepName,
127+
input: dehydratedInput as Serializable,
128+
},
129+
};
130+
}
112131
);
113132

114-
// Process steps and waits in parallel
115-
// Each step: create event (if needed) -> queue message
116-
// Each wait: create event (if needed)
117-
const ops: Promise<void>[] = [];
118-
119-
// Steps: create event then queue message, all in parallel
120-
for (const queueItem of stepItems) {
121-
ops.push(
122-
(async () => {
123-
// Create step event if not already created
124-
if (stepsNeedingCreation.has(queueItem.correlationId)) {
125-
const dehydratedInput = dehydrateStepArguments(
126-
{
127-
args: queueItem.args,
128-
closureVars: queueItem.closureVars,
129-
thisVal: queueItem.thisVal,
130-
},
131-
suspension.globalThis
132-
);
133-
const stepEvent: CreateEventRequest = {
134-
eventType: 'step_created' as const,
135-
correlationId: queueItem.correlationId,
136-
eventData: {
137-
stepName: queueItem.stepName,
138-
input: dehydratedInput as Serializable,
139-
},
140-
};
141-
try {
142-
await world.events.create(runId, stepEvent);
143-
} catch (err) {
144-
if (WorkflowAPIError.is(err) && err.status === 409) {
145-
console.warn(`Step already exists, continuing: ${err.message}`);
146-
} else {
147-
throw err;
148-
}
133+
// Build wait_created events (only for waits that haven't been created yet)
134+
const waitEvents: CreateEventRequest[] = waitItems
135+
.filter((queueItem) => !queueItem.hasCreatedEvent)
136+
.map((queueItem) => ({
137+
eventType: 'wait_created' as const,
138+
correlationId: queueItem.correlationId,
139+
eventData: {
140+
resumeAt: queueItem.resumeAt,
141+
},
142+
}));
143+
144+
// Process steps and waits in parallel using batch creation
145+
await Promise.all([
146+
// Create step events (World creates step entities atomically)
147+
// Only for steps that don't already have a step_created event
148+
stepEvents.length > 0
149+
? world.events.createBatch(runId, stepEvents).catch((err) => {
150+
if (WorkflowAPIError.is(err) && err.status === 409) {
151+
console.warn(
152+
`Some steps already exist, continuing: ${err.message}`
153+
);
154+
} else {
155+
throw err;
149156
}
150-
}
151-
152-
// Queue step execution message
153-
await queueMessage(
154-
world,
155-
`__wkf_step_${queueItem.stepName}`,
156-
{
157-
workflowName,
158-
workflowRunId: runId,
159-
workflowStartedAt,
160-
stepId: queueItem.correlationId,
161-
traceCarrier: await serializeTraceCarrier(),
162-
requestedAt: new Date(),
163-
},
164-
{
165-
idempotencyKey: queueItem.correlationId,
157+
})
158+
: Promise.resolve(),
159+
// Create wait events
160+
waitEvents.length > 0
161+
? world.events.createBatch(runId, waitEvents).catch((err) => {
162+
if (WorkflowAPIError.is(err) && err.status === 409) {
163+
console.warn(
164+
`Some waits already exist, continuing: ${err.message}`
165+
);
166+
} else {
167+
throw err;
166168
}
167-
);
168-
})()
169+
})
170+
: Promise.resolve(),
171+
]);
172+
173+
// Queue step execution messages for ALL pending steps in parallel
174+
// (both newly created and those with existing step_created events)
175+
const queueOps = stepItems.map(async (queueItem) => {
176+
await queueMessage(
177+
world,
178+
`__wkf_step_${queueItem.stepName}`,
179+
{
180+
workflowName,
181+
workflowRunId: runId,
182+
workflowStartedAt,
183+
stepId: queueItem.correlationId,
184+
traceCarrier: await serializeTraceCarrier(),
185+
requestedAt: new Date(),
186+
},
187+
{
188+
idempotencyKey: queueItem.correlationId,
189+
}
169190
);
170-
}
171-
172-
// Waits: create events in parallel (no queueing needed for waits)
173-
for (const queueItem of waitItems) {
174-
if (!queueItem.hasCreatedEvent) {
175-
ops.push(
176-
(async () => {
177-
const waitEvent: CreateEventRequest = {
178-
eventType: 'wait_created' as const,
179-
correlationId: queueItem.correlationId,
180-
eventData: {
181-
resumeAt: queueItem.resumeAt,
182-
},
183-
};
184-
try {
185-
await world.events.create(runId, waitEvent);
186-
} catch (err) {
187-
if (WorkflowAPIError.is(err) && err.status === 409) {
188-
console.warn(`Wait already exists, continuing: ${err.message}`);
189-
} else {
190-
throw err;
191-
}
192-
}
193-
})()
194-
);
195-
}
196-
}
191+
});
197192

198-
// Wait for all step and wait operations to complete
193+
// Wait for all queue operations to complete
199194
waitUntil(
200-
Promise.all(ops).catch((opErr) => {
195+
Promise.all(queueOps).catch((opErr) => {
201196
const isAbortError =
202197
opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted';
203198
if (!isAbortError) throw opErr;
204199
})
205200
);
206-
await Promise.all(ops);
201+
await Promise.all(queueOps);
207202

208203
// Calculate minimum timeout from waits
209204
const now = Date.now();

packages/world-local/src/storage.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,16 @@ export function createStorage(basedir: string): Storage {
858858
};
859859
},
860860

861+
async createBatch(runId, data, params): Promise<EventResult[]> {
862+
// createBatch is just a sequential loop over create() to ensure monotonic ULIDs
863+
const results: EventResult[] = [];
864+
for (const eventData of data) {
865+
const result = await this.create(runId, eventData, params);
866+
results.push(result);
867+
}
868+
return results;
869+
},
870+
861871
async list(params) {
862872
const { runId } = params;
863873
const resolveData = params.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION;

0 commit comments

Comments
 (0)