From 5fbffe857a5068394fd7914a5684beaf1c67054a Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 27 Jan 2026 15:36:22 +0900 Subject: [PATCH 1/4] Add orderingKey option to SendActivityOptions This change exposes the orderingKey capability (added in PR #540) through the sendActivity() API, allowing applications to ensure ordered delivery of related activities. https://github.com/fedify-dev/fedify/issues/536 Co-Authored-By: Claude --- CHANGES.md | 8 +- docs/manual/send.md | 79 ++++++++++++++++++ packages/fedify/src/federation/context.ts | 21 ++++- .../fedify/src/federation/middleware.test.ts | 80 +++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 65 ++++++++++----- packages/fedify/src/federation/queue.ts | 2 + 6 files changed, 231 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d857628b..b87686b3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -101,11 +101,15 @@ To be released. with different ordering keys can be processed in parallel. This helps prevent race conditions when processing related activities (e.g., ensuring a `Delete` activity is processed after a `Create` activity for the same - object). [[#538], [#540]] + object). [[#536], [#538], [#540], [#544]] - Added `MessageQueueEnqueueOptions.orderingKey` property. - All properties in `MessageQueueEnqueueOptions` are now `readonly`. - `InProcessMessageQueue` now supports the `orderingKey` option. + - Added `SendActivityOptions.orderingKey` option to ensure ordered + delivery of activities for the same object. When specified, activities + with the same `orderingKey` are guaranteed to be delivered in order + to each recipient server. [#280]: https://github.com/fedify-dev/fedify/issues/280 [#366]: https://github.com/fedify-dev/fedify/issues/366 @@ -122,8 +126,10 @@ To be released. [#466]: https://github.com/fedify-dev/fedify/issues/466 [#499]: https://github.com/fedify-dev/fedify/issues/499 [#506]: https://github.com/fedify-dev/fedify/pull/506 +[#536]: https://github.com/fedify-dev/fedify/issues/536 [#538]: https://github.com/fedify-dev/fedify/issues/538 [#540]: https://github.com/fedify-dev/fedify/pull/540 +[#544]: https://github.com/fedify-dev/fedify/pull/544 ### @fedify/cli diff --git a/docs/manual/send.md b/docs/manual/send.md index 7f8fd7b9..d8a08616 100644 --- a/docs/manual/send.md +++ b/docs/manual/send.md @@ -529,6 +529,85 @@ await ctx.sendActivity( ~~~~ +Ensuring ordered delivery +------------------------- + +*This API is available since Fedify 2.0.0.* + +When sending multiple related activities for the same object +(e.g., `Create(Note)` followed by `Delete(Note)` for the same `Note`), +it's important that they arrive at recipient servers in the correct order. +Without ordering guarantees, a `Delete(Note)` activity might arrive before +the `Create(Note)` activity, causing the recipient to ignore the deletion and +leaving a “zombie post” that should have been deleted. + +To ensure ordered delivery, you can use the `~SendActivityOptions.orderingKey` +option in the `~Context.sendActivity()` method: + +~~~~ typescript twoslash +import type { Context } from "@fedify/fedify"; +import { Create, Delete, Note, type Recipient } from "@fedify/vocab"; +const ctx = null as unknown as Context; +const recipients: Recipient[] = []; +const noteId = new URL("https://example.com/notes/123"); +// ---cut-before--- +// Create activity +await ctx.sendActivity( + { identifier: "alice" }, + recipients, + new Create({ + id: new URL("#create", noteId), + actor: ctx.getActorUri("alice"), + object: new Note({ id: noteId }), + }), + { orderingKey: noteId.href }, // [!code highlight] +); + +// Delete activity - guaranteed to arrive after Create +await ctx.sendActivity( + { identifier: "alice" }, + recipients, + new Delete({ + id: new URL("#delete", noteId), + actor: ctx.getActorUri("alice"), + object: noteId, + }), + { orderingKey: noteId.href }, // [!code highlight] +); +~~~~ + +Activities with the same `~SendActivityOptions.orderingKey` are guaranteed to be +delivered in the order they were enqueued, per recipient server. Activities +with different `~SendActivityOptions.orderingKey` values (or no +`~SendActivityOptions.orderingKey`) can be delivered in parallel for maximum +throughput. + +### When to use `~SendActivityOptions.orderingKey` + +Use `~SendActivityOptions.orderingKey` when you have activities that must be +processed in a specific order: + + - `Create(Note)` → `Update(Note)` → `Delete(Note)` for the same object + - `Follow(Person)` → `Undo(Follow(Person))` for the same target + - Any sequence where later activities depend on earlier ones + +Don't use `~SendActivityOptions.orderingKey` for unrelated activities, +as it unnecessarily reduces parallelism. + +### How it works + +When you specify an `~SendActivityOptions.orderingKey`: + +1. During fan-out, the key is passed as-is to the fanout queue +2. When individual delivery tasks are created, the key is transformed to + `${orderingKey}\n${recipientServerOrigin}` to ensure ordering is maintained + per-recipient-server while allowing parallel delivery to different servers + +This means Server A can receive a `Delete(Note)` immediately after its +`Create(Note)` completes, without waiting for Server Z's `Create(Note)` to +finish. + + Immediately sending an activity ------------------------------- diff --git a/packages/fedify/src/federation/context.ts b/packages/fedify/src/federation/context.ts index 013a4f34..411e067b 100644 --- a/packages/fedify/src/federation/context.ts +++ b/packages/fedify/src/federation/context.ts @@ -746,7 +746,7 @@ export interface SendActivityOptions { /** * Whether to prefer the shared inbox for the recipients. */ - preferSharedInbox?: boolean; + readonly preferSharedInbox?: boolean; /** * Whether to send the activity immediately, without enqueuing it. @@ -755,7 +755,7 @@ export interface SendActivityOptions { * * @since 0.3.0 */ - immediate?: boolean; + readonly immediate?: boolean; /** * Determines how activities are queued when sent to multiple recipients. @@ -773,7 +773,7 @@ export interface SendActivityOptions { * @default `"auto"` * @since 1.5.0 */ - fanout?: "auto" | "skip" | "force"; + readonly fanout?: "auto" | "skip" | "force"; /** * The base URIs to exclude from the recipients' inboxes. It is useful @@ -784,6 +784,21 @@ export interface SendActivityOptions { * @since 0.9.0 */ readonly excludeBaseUris?: readonly URL[]; + + /** + * An optional key to ensure ordered delivery of activities. Activities with + * the same `orderingKey` are guaranteed to be delivered in the order they + * were enqueued, per recipient server. + * + * Typical use case: pass the object ID (e.g., `Note` ID) to ensure that + * `Create`, `Update`, and `Delete` activities for the same object are + * delivered in order. + * + * When omitted, no ordering is guaranteed (maximum parallelism). + * + * @since 2.0.0 + */ + readonly orderingKey?: string; } /** diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 688d0cdb..a49a7550 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -2233,6 +2233,7 @@ test("ContextImpl.sendActivity()", async (t) => { }, }, keys: queue.messages[0].type === "fanout" ? queue.messages[0].keys : [], + orderingKey: undefined, traceContext: {}, }, ]); @@ -2370,6 +2371,85 @@ test("ContextImpl.sendActivity()", async (t) => { assertNotEquals(collectionSyncHeader, null); }); + queue.clear(); + + await t.step('orderingKey with fanout: "force"', async () => { + const activity = new vocab.Create({ + id: new URL("https://example.com/activity/ordering-1"), + actor: new URL("https://example.com/person"), + }); + await ctx2.sendActivity( + { username: "john" }, + { + id: new URL("https://example.com/recipient"), + inboxId: new URL("https://example.com/inbox"), + }, + activity, + { fanout: "force", orderingKey: "https://example.com/note/1" }, + ); + assertEquals(queue.messages.length, 1); + const fanoutMessage = queue.messages[0]; + assertEquals(fanoutMessage.type, "fanout"); + if (fanoutMessage.type === "fanout") { + assertEquals( + fanoutMessage.orderingKey, + "https://example.com/note/1", + ); + } + }); + + queue.clear(); + + await t.step('orderingKey with fanout: "skip"', async () => { + const activity = new vocab.Create({ + id: new URL("https://example.com/activity/ordering-2"), + actor: new URL("https://example.com/person"), + }); + await ctx2.sendActivity( + { username: "john" }, + { + id: new URL("https://example.com/recipient"), + inboxId: new URL("https://example.com/inbox"), + }, + activity, + { fanout: "skip", orderingKey: "https://example.com/note/2" }, + ); + assertEquals(queue.messages.length, 1); + const outboxMessage = queue.messages[0]; + assertEquals(outboxMessage.type, "outbox"); + // outbox message should have orderingKey transformed to include inbox origin + if (outboxMessage.type === "outbox") { + assertEquals( + outboxMessage.orderingKey, + "https://example.com/note/2\nhttps://example.com", + ); + } + }); + + queue.clear(); + + await t.step("orderingKey not specified", async () => { + const activity = new vocab.Create({ + id: new URL("https://example.com/activity/ordering-3"), + actor: new URL("https://example.com/person"), + }); + await ctx2.sendActivity( + { username: "john" }, + { + id: new URL("https://example.com/recipient"), + inboxId: new URL("https://example.com/inbox"), + }, + activity, + { fanout: "force" }, + ); + assertEquals(queue.messages.length, 1); + const fanoutMessage2 = queue.messages[0]; + assertEquals(fanoutMessage2.type, "fanout"); + if (fanoutMessage2.type === "fanout") { + assertEquals(fanoutMessage2.orderingKey, undefined); + } + }); + fetchMock.hardReset(); }); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 058b91a4..4a3e0cdb 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -579,6 +579,7 @@ export class FederationImpl ); await this.sendActivity(keys, message.inboxes, activity, { collectionSync: message.collectionSync, + orderingKey: message.orderingKey, context, }); } @@ -975,7 +976,7 @@ export class FederationImpl options: SendActivityInternalOptions, ): Promise { const logger = getLogger(["fedify", "federation", "outbox"]); - const { immediate, collectionSync, context: ctx } = options; + const { immediate, collectionSync, orderingKey, context: ctx } = options; if (activity.id == null) { throw new TypeError("The activity to send must have an id."); } @@ -1098,8 +1099,12 @@ export class FederationImpl if (!this.manuallyStartQueue) this._startQueueInternal(ctx.data); const carrier: Record = {}; propagation.inject(context.active(), carrier); - const messages: OutboxMessage[] = []; + const messages: { message: OutboxMessage; orderingKey?: string }[] = []; for (const inbox in inboxes) { + const inboxOrigin = new URL(inbox).origin; + const messageOrderingKey = orderingKey == null + ? undefined + : `${orderingKey}\n${inboxOrigin}`; const message: OutboxMessage = { type: "outbox", id: crypto.randomUUID(), @@ -1119,14 +1124,15 @@ export class FederationImpl inboxes[inbox].actorIds, ), }, + orderingKey: messageOrderingKey, traceContext: carrier, }; - messages.push(message); + messages.push({ message, orderingKey: messageOrderingKey }); } const { outboxQueue } = this; if (outboxQueue.enqueueMany == null) { const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m) + outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) ); const results = await Promise.allSettled(promises); const errors = results @@ -1146,14 +1152,23 @@ export class FederationImpl throw errors[0]; } } else { - try { - await outboxQueue.enqueueMany(messages); - } catch (error) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {error}", - { activityId: activity.id!.href, error }, + // Note: enqueueMany does not support per-message orderingKey, + // so we fall back to individual enqueues when orderingKey is specified + if (orderingKey != null) { + const promises: Promise[] = messages.map((m) => + outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) ); - throw error; + await Promise.all(promises); + } else { + try { + await outboxQueue.enqueueMany(messages.map((m) => m.message)); + } catch (error) { + logger.error( + "Failed to enqueue activity {activityId} to send later: {error}", + { activityId: activity.id!.href, error }, + ); + throw error; + } } } } @@ -2221,8 +2236,8 @@ export class ContextImpl implements Context { for (const { privateKey } of keys) { validateCryptoKey(privateKey, "private"); } - const opts: SendActivityInternalOptions = { context: this }; let expandedRecipients: Recipient[]; + let collectionSync: string | undefined; if (Array.isArray(recipients)) { expandedRecipients = recipients; } else if (recipients === "followers") { @@ -2240,16 +2255,21 @@ export class ContextImpl implements Context { } if (options.syncCollection) { try { - opts.collectionSync = this.getFollowersUri(identifier).href; + collectionSync = this.getFollowersUri(identifier).href; } catch (error) { - if (error instanceof RouterError) { - opts.collectionSync = undefined; - } else throw error; + if (!(error instanceof RouterError)) { + throw error; + } } } } else { expandedRecipients = [recipients]; } + const opts: SendActivityInternalOptions = { + context: this, + orderingKey: options.orderingKey, + collectionSync, + }; span.setAttribute("activitypub.inboxes", expandedRecipients.length); for (const activityTransformer of this.federation.activityTransformers) { activity = activityTransformer(activity, this); @@ -2307,12 +2327,16 @@ export class ContextImpl implements Context { activityId: activity.id?.href, activityType: getTypeId(activity).href, collectionSync: opts.collectionSync, + orderingKey: options.orderingKey, traceContext: carrier, }; if (!this.federation.manuallyStartQueue) { this.federation._startQueueInternal(this.data); } - this.federation.fanoutQueue.enqueue(message); + this.federation.fanoutQueue.enqueue( + message, + { orderingKey: options.orderingKey }, + ); } async *getFollowers(identifier: string): AsyncIterable { @@ -2971,9 +2995,10 @@ export class InboxContextImpl extends ContextImpl } interface SendActivityInternalOptions { - immediate?: boolean; - collectionSync?: string; - context: Context; + readonly immediate?: boolean; + readonly collectionSync?: string; + readonly orderingKey?: string; + readonly context: Context; } export class KvSpecDeterminer implements HttpMessageSignaturesSpecDeterminer { diff --git a/packages/fedify/src/federation/queue.ts b/packages/fedify/src/federation/queue.ts index 6d322afa..358f650c 100644 --- a/packages/fedify/src/federation/queue.ts +++ b/packages/fedify/src/federation/queue.ts @@ -29,6 +29,7 @@ export interface FanoutMessage { readonly activityId?: string; readonly activityType: string; readonly collectionSync?: string; + readonly orderingKey?: string; readonly traceContext: Readonly>; } @@ -45,6 +46,7 @@ export interface OutboxMessage { readonly started: string; readonly attempt: number; readonly headers: Readonly>; + readonly orderingKey?: string; readonly traceContext: Readonly>; } From bb138cb2086096b1acb0a4b71949b19a21e48c91 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 27 Jan 2026 17:04:59 +0900 Subject: [PATCH 2/4] Fix immediate option not being forwarded to SendActivityInternalOptions https://github.com/fedify-dev/fedify/pull/544#discussion_r2730518155 Co-Authored-By: Claude --- packages/fedify/src/federation/middleware.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 4a3e0cdb..779a6226 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -2269,6 +2269,7 @@ export class ContextImpl implements Context { context: this, orderingKey: options.orderingKey, collectionSync, + immediate: options.immediate, }; span.setAttribute("activitypub.inboxes", expandedRecipients.length); for (const activityTransformer of this.federation.activityTransformers) { From b9bce295922a97bdb106cb350dc43a06c31e477c Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 27 Jan 2026 17:07:16 +0900 Subject: [PATCH 3/4] Add error aggregation for orderingKey enqueue path Mirror the Promise.allSettled + aggregated logging approach used in the enqueueMany == null branch, so that all enqueue failures are reported with the same level of detail regardless of whether enqueueMany is used. https://github.com/fedify-dev/fedify/pull/544#discussion_r2730518188 Co-Authored-By: Claude --- packages/fedify/src/federation/middleware.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 779a6226..3da2cfef 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -1158,7 +1158,23 @@ export class FederationImpl const promises: Promise[] = messages.map((m) => outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) ); - await Promise.all(promises); + const results = await Promise.allSettled(promises); + const errors = results + .filter((r) => r.status === "rejected") + .map((r) => (r as PromiseRejectedResult).reason); + if (errors.length > 0) { + logger.error( + "Failed to enqueue activity {activityId} to send later: {errors}", + { activityId: activity.id!.href, errors }, + ); + if (errors.length > 1) { + throw new AggregateError( + errors, + `Failed to enqueue activity ${activityId} to send later.`, + ); + } + throw errors[0]; + } } else { try { await outboxQueue.enqueueMany(messages.map((m) => m.message)); From 22a5fb09e778133374e8f83dbb1515fa89d23f65 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 27 Jan 2026 17:48:04 +0900 Subject: [PATCH 4/4] Wire orderingKey through forwardActivity path Since orderingKey is now available on ForwardActivityOptions via the Omit alias, forwardActivityInternal() should also support it to maintain consistency with sendActivity(). https://github.com/fedify-dev/fedify/pull/544#discussion_r2730813625 Co-Authored-By: Claude --- packages/fedify/src/federation/middleware.ts | 53 ++++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 3da2cfef..890b240e 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -2956,8 +2956,10 @@ export class InboxContextImpl extends ContextImpl } const carrier: Record = {}; propagation.inject(context.active(), carrier); - const messages: OutboxMessage[] = []; + const orderingKey = options?.orderingKey; + const messages: { message: OutboxMessage; orderingKey?: string }[] = []; for (const inbox in inboxes) { + const inboxUrl = new URL(inbox); const message: OutboxMessage = { type: "outbox", id: crypto.randomUUID(), @@ -2971,14 +2973,20 @@ export class InboxContextImpl extends ContextImpl started: new Date().toISOString(), attempt: 0, headers: {}, + orderingKey: orderingKey == null + ? undefined + : `${orderingKey}\n${inboxUrl.origin}`, traceContext: carrier, }; - messages.push(message); + messages.push({ + message, + orderingKey: message.orderingKey, + }); } const { outboxQueue } = this.federation; if (outboxQueue.enqueueMany == null) { const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m) + outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) ); const results = await Promise.allSettled(promises); const errors: unknown[] = results @@ -2998,14 +3006,39 @@ export class InboxContextImpl extends ContextImpl throw errors[0]; } } else { - try { - await outboxQueue.enqueueMany(messages); - } catch (error) { - logger.error( - "Failed to enqueue activity {activityId} to forward later:\n{error}", - { activityId: this.activityId, error }, + // Note: enqueueMany does not support per-message orderingKey, + // so we fall back to individual enqueues when orderingKey is specified + if (orderingKey != null) { + const promises: Promise[] = messages.map((m) => + outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) ); - throw error; + const results = await Promise.allSettled(promises); + const errors = results + .filter((r) => r.status === "rejected") + .map((r) => (r as PromiseRejectedResult).reason); + if (errors.length > 0) { + logger.error( + "Failed to enqueue activity {activityId} to forward later:\n{errors}", + { activityId: this.activityId, errors }, + ); + if (errors.length > 1) { + throw new AggregateError( + errors, + `Failed to enqueue activity ${this.activityId} to forward later.`, + ); + } + throw errors[0]; + } + } else { + try { + await outboxQueue.enqueueMany(messages.map((m) => m.message)); + } catch (error) { + logger.error( + "Failed to enqueue activity {activityId} to forward later:\n{error}", + { activityId: this.activityId, error }, + ); + throw error; + } } } }