Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ To be released.
with different ordering keys can be processed in parallel. This helps
prevent race conditions when processing related activities (e.g., ensuring
a `Delete` activity is processed after a `Create` activity for the same
object). [[#538], [#540]]
object). [[#536], [#538], [#540], [#544]]

- Added `MessageQueueEnqueueOptions.orderingKey` property.
- All properties in `MessageQueueEnqueueOptions` are now `readonly`.
- `InProcessMessageQueue` now supports the `orderingKey` option.
- Added `SendActivityOptions.orderingKey` option to ensure ordered
delivery of activities for the same object. When specified, activities
with the same `orderingKey` are guaranteed to be delivered in order
to each recipient server.

[#280]: https://github.com/fedify-dev/fedify/issues/280
[#366]: https://github.com/fedify-dev/fedify/issues/366
Expand All @@ -122,8 +126,10 @@ To be released.
[#466]: https://github.com/fedify-dev/fedify/issues/466
[#499]: https://github.com/fedify-dev/fedify/issues/499
[#506]: https://github.com/fedify-dev/fedify/pull/506
[#536]: https://github.com/fedify-dev/fedify/issues/536
[#538]: https://github.com/fedify-dev/fedify/issues/538
[#540]: https://github.com/fedify-dev/fedify/pull/540
[#544]: https://github.com/fedify-dev/fedify/pull/544

### @fedify/cli

Expand Down
79 changes: 79 additions & 0 deletions docs/manual/send.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,85 @@ await ctx.sendActivity(
~~~~


Ensuring ordered delivery
-------------------------

*This API is available since Fedify 2.0.0.*

When sending multiple related activities for the same object
(e.g., `Create(Note)` followed by `Delete(Note)` for the same `Note`),
it's important that they arrive at recipient servers in the correct order.
Without ordering guarantees, a `Delete(Note)` activity might arrive before
the `Create(Note)` activity, causing the recipient to ignore the deletion and
leaving a “zombie post” that should have been deleted.

To ensure ordered delivery, you can use the `~SendActivityOptions.orderingKey`
option in the `~Context.sendActivity()` method:

~~~~ typescript twoslash
import type { Context } from "@fedify/fedify";
import { Create, Delete, Note, type Recipient } from "@fedify/vocab";
const ctx = null as unknown as Context<void>;
const recipients: Recipient[] = [];
const noteId = new URL("https://example.com/notes/123");
// ---cut-before---
// Create activity
await ctx.sendActivity(
{ identifier: "alice" },
recipients,
new Create({
id: new URL("#create", noteId),
actor: ctx.getActorUri("alice"),
object: new Note({ id: noteId }),
}),
{ orderingKey: noteId.href }, // [!code highlight]
);

// Delete activity - guaranteed to arrive after Create
await ctx.sendActivity(
{ identifier: "alice" },
recipients,
new Delete({
id: new URL("#delete", noteId),
actor: ctx.getActorUri("alice"),
object: noteId,
}),
{ orderingKey: noteId.href }, // [!code highlight]
);
~~~~

Activities with the same `~SendActivityOptions.orderingKey` are guaranteed to be
delivered in the order they were enqueued, per recipient server. Activities
with different `~SendActivityOptions.orderingKey` values (or no
`~SendActivityOptions.orderingKey`) can be delivered in parallel for maximum
throughput.

### When to use `~SendActivityOptions.orderingKey`

Use `~SendActivityOptions.orderingKey` when you have activities that must be
processed in a specific order:

- `Create(Note)` → `Update(Note)` → `Delete(Note)` for the same object
- `Follow(Person)` → `Undo(Follow(Person))` for the same target
- Any sequence where later activities depend on earlier ones

Don't use `~SendActivityOptions.orderingKey` for unrelated activities,
as it unnecessarily reduces parallelism.

### How it works

When you specify an `~SendActivityOptions.orderingKey`:

1. During fan-out, the key is passed as-is to the fanout queue
2. When individual delivery tasks are created, the key is transformed to
`${orderingKey}\n${recipientServerOrigin}` to ensure ordering is maintained
per-recipient-server while allowing parallel delivery to different servers

This means Server A can receive a `Delete(Note)` immediately after its
`Create(Note)` completes, without waiting for Server Z's `Create(Note)` to
finish.


Immediately sending an activity
-------------------------------

Expand Down
21 changes: 18 additions & 3 deletions packages/fedify/src/federation/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ export interface SendActivityOptions {
/**
* Whether to prefer the shared inbox for the recipients.
*/
preferSharedInbox?: boolean;
readonly preferSharedInbox?: boolean;

/**
* Whether to send the activity immediately, without enqueuing it.
Expand All @@ -755,7 +755,7 @@ export interface SendActivityOptions {
*
* @since 0.3.0
*/
immediate?: boolean;
readonly immediate?: boolean;

/**
* Determines how activities are queued when sent to multiple recipients.
Expand All @@ -773,7 +773,7 @@ export interface SendActivityOptions {
* @default `"auto"`
* @since 1.5.0
*/
fanout?: "auto" | "skip" | "force";
readonly fanout?: "auto" | "skip" | "force";

/**
* The base URIs to exclude from the recipients' inboxes. It is useful
Expand All @@ -784,6 +784,21 @@ export interface SendActivityOptions {
* @since 0.9.0
*/
readonly excludeBaseUris?: readonly URL[];

/**
* An optional key to ensure ordered delivery of activities. Activities with
* the same `orderingKey` are guaranteed to be delivered in the order they
* were enqueued, per recipient server.
*
* Typical use case: pass the object ID (e.g., `Note` ID) to ensure that
* `Create`, `Update`, and `Delete` activities for the same object are
* delivered in order.
*
* When omitted, no ordering is guaranteed (maximum parallelism).
*
* @since 2.0.0
*/
readonly orderingKey?: string;
}

/**
Expand Down
80 changes: 80 additions & 0 deletions packages/fedify/src/federation/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,7 @@ test("ContextImpl.sendActivity()", async (t) => {
},
},
keys: queue.messages[0].type === "fanout" ? queue.messages[0].keys : [],
orderingKey: undefined,
traceContext: {},
},
]);
Expand Down Expand Up @@ -2370,6 +2371,85 @@ test("ContextImpl.sendActivity()", async (t) => {
assertNotEquals(collectionSyncHeader, null);
});

queue.clear();

await t.step('orderingKey with fanout: "force"', async () => {
const activity = new vocab.Create({
id: new URL("https://example.com/activity/ordering-1"),
actor: new URL("https://example.com/person"),
});
await ctx2.sendActivity(
{ username: "john" },
{
id: new URL("https://example.com/recipient"),
inboxId: new URL("https://example.com/inbox"),
},
activity,
{ fanout: "force", orderingKey: "https://example.com/note/1" },
);
assertEquals(queue.messages.length, 1);
const fanoutMessage = queue.messages[0];
assertEquals(fanoutMessage.type, "fanout");
if (fanoutMessage.type === "fanout") {
assertEquals(
fanoutMessage.orderingKey,
"https://example.com/note/1",
);
}
});

queue.clear();

await t.step('orderingKey with fanout: "skip"', async () => {
const activity = new vocab.Create({
id: new URL("https://example.com/activity/ordering-2"),
actor: new URL("https://example.com/person"),
});
await ctx2.sendActivity(
{ username: "john" },
{
id: new URL("https://example.com/recipient"),
inboxId: new URL("https://example.com/inbox"),
},
activity,
{ fanout: "skip", orderingKey: "https://example.com/note/2" },
);
assertEquals(queue.messages.length, 1);
const outboxMessage = queue.messages[0];
assertEquals(outboxMessage.type, "outbox");
// outbox message should have orderingKey transformed to include inbox origin
if (outboxMessage.type === "outbox") {
assertEquals(
outboxMessage.orderingKey,
"https://example.com/note/2\nhttps://example.com",
);
}
});

queue.clear();

await t.step("orderingKey not specified", async () => {
const activity = new vocab.Create({
id: new URL("https://example.com/activity/ordering-3"),
actor: new URL("https://example.com/person"),
});
await ctx2.sendActivity(
{ username: "john" },
{
id: new URL("https://example.com/recipient"),
inboxId: new URL("https://example.com/inbox"),
},
activity,
{ fanout: "force" },
);
assertEquals(queue.messages.length, 1);
const fanoutMessage2 = queue.messages[0];
assertEquals(fanoutMessage2.type, "fanout");
if (fanoutMessage2.type === "fanout") {
assertEquals(fanoutMessage2.orderingKey, undefined);
}
});

fetchMock.hardReset();
});

Expand Down
Loading
Loading