From 4732f272018111249009eadf257f9bf44324514e Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 22:23:56 +0000 Subject: [PATCH 01/28] Created SqliteMessageQueue --- packages/sqlite/src/mod.ts | 1 + packages/sqlite/src/mq.ts | 309 +++++++++++++++++++++++++++++++++++++ 2 files changed, 310 insertions(+) create mode 100644 packages/sqlite/src/mq.ts diff --git a/packages/sqlite/src/mod.ts b/packages/sqlite/src/mod.ts index d43125f71..5794ec2b3 100644 --- a/packages/sqlite/src/mod.ts +++ b/packages/sqlite/src/mod.ts @@ -3,3 +3,4 @@ * @module */ export { SqliteKvStore } from "./kv.ts"; +export { SqliteMessageQueue } from "./mq.ts"; diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts new file mode 100644 index 000000000..1a8c0d897 --- /dev/null +++ b/packages/sqlite/src/mq.ts @@ -0,0 +1,309 @@ +import { type PlatformDatabase, SqliteDatabase } from "#sqlite"; +import type { + MessageQueue, + MessageQueueEnqueueOptions, + MessageQueueListenOptions, +} from "@fedify/fedify"; +import { getLogger } from "@logtape/logtape"; +import type { SqliteDatabaseAdapter } from "./adapter.ts"; + +const logger = getLogger(["fedify", "sqlite", "mq"]); + +/** + * Options for the SQLite message queue. + */ +export interface SqliteMessageQueueOptions { + /** + * The table name to use for the message queue. + * Only letters, digits, and underscores are allowed. + * `"fedify_message"` by default. + * @default `"fedify_message"` + */ + tableName?: string; + + /** + * Whether the table has been initialized. `false` by default. + * @default `false` + */ + initialized?: boolean; + + /** + * The poll interval for the message queue. 5 seconds by default. + */ + pollInterval?: Temporal.Duration | Temporal.DurationLike; +} + +/** + * A message queue that uses SQLite as the underlying storage. + * + * This implementation is designed for single-node deployments and uses + * polling to check for new messages. It is not suitable for high-throughput + * scenarios or distributed environments. + * + * @example + * ```ts ignore + * import { createFederation } from "@fedify/fedify"; + * import { SqliteMessageQueue } from "@fedify/sqlite"; + * import { DatabaseSync } from "node:sqlite"; + * + * const db = new DatabaseSync(":memory:"); + * const federation = createFederation({ + * // ... + * queue: new SqliteMessageQueue(db), + * }); + * ``` + */ +export class SqliteMessageQueue implements MessageQueue { + static readonly #defaultTableName = "fedify_message"; + static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; + readonly #db: SqliteDatabaseAdapter; + readonly #tableName: string; + readonly #pollIntervalMs: number; + #initialized: boolean; + + /** + * SQLite message queue does not provide native retry mechanisms. + */ + readonly nativeRetrial = false; + + /** + * Creates a new SQLite message queue. + * @param db The SQLite database to use. Supports `node:sqlite` and `bun:sqlite`. + * @param options The options for the message queue. + */ + constructor( + readonly db: PlatformDatabase, + readonly options: SqliteMessageQueueOptions = {}, + ) { + this.#db = new SqliteDatabase(db); + this.#initialized = options.initialized ?? false; + this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName; + this.#pollIntervalMs = Temporal.Duration.from( + options.pollInterval ?? { seconds: 5 }, + ).total("millisecond"); + + if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { + throw new Error( + `Invalid table name for the message queue: ${this.#tableName}`, + ); + } + } + + /** + * {@inheritDoc MessageQueue.enqueue} + */ + // deno-lint-ignore require-await + async enqueue( + // deno-lint-ignore no-explicit-any + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + this.initialize(); + + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + const now = Temporal.Now.instant().epochMilliseconds; + const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 }); + const scheduled = now + delay.total({ unit: "milliseconds" }); + + if (options?.delay) { + logger.debug("Enqueuing a message with a delay of {delay}...", { + delay, + message, + }); + } else { + logger.debug("Enqueuing a message...", { message }); + } + + this.#db + .prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ) + .run(id, encodedMessage, now, scheduled); + + logger.debug("Enqueued a message.", { message }); + } + + /** + * {@inheritDoc MessageQueue.enqueueMany} + */ + // deno-lint-ignore require-await + async enqueueMany( + // deno-lint-ignore no-explicit-any + messages: any[], + options?: MessageQueueEnqueueOptions, + ): Promise { + if (messages.length === 0) return; + + this.initialize(); + + const now = Temporal.Now.instant().epochMilliseconds; + const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 }); + const scheduled = now + delay.total({ unit: "milliseconds" }); + + if (options?.delay) { + logger.debug("Enqueuing messages with a delay of {delay}...", { + delay, + messages, + }); + } else { + logger.debug("Enqueuing messages...", { messages }); + } + + try { + this.#db.exec("BEGIN IMMEDIATE"); + + const stmt = this.#db.prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ); + + for (const message of messages) { + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + stmt.run(id, encodedMessage, now, scheduled); + } + + this.#db.exec("COMMIT"); + logger.debug("Enqueued messages.", { messages }); + } catch (error) { + this.#db.exec("ROLLBACK"); + throw error; + } + } + + /** + * {@inheritDoc MessageQueue.listen} + */ + async listen( + // deno-lint-ignore no-explicit-any + handler: (message: any) => Promise | void, + options?: MessageQueueListenOptions, + ): Promise { + this.initialize(); + + const { signal } = options ?? {}; + logger.debug( + "Starting to listen for messages on table {tableName}...", + { tableName: this.#tableName }, + ); + + while (signal == null || !signal.aborted) { + const now = Temporal.Now.instant().epochMilliseconds; + + // Get the oldest message that is ready to be processed + const result = this.#db + .prepare( + `SELECT id, message + FROM "${this.#tableName}" + WHERE scheduled <= ? + ORDER BY scheduled + LIMIT 1`, + ) + .get(now) as { id: string; message: string } | undefined; + + if (result) { + // Delete the message before processing to prevent duplicate processing + this.#db + .prepare(`DELETE FROM "${this.#tableName}" WHERE id = ?`) + .run(result.id); + + const message = this.#decodeMessage(result.message); + logger.debug("Processing message {id}...", { id: result.id, message }); + + try { + await handler(message); + logger.debug("Processed message {id}.", { id: result.id }); + } catch (error) { + logger.error( + "Failed to process message {id}: {error}", + { id: result.id, error }, + ); + throw error; + } + + // Check for next message immediately + continue; + } + + // No messages available, wait before polling again + await this.#wait(this.#pollIntervalMs, signal); + } + + logger.debug("Stopped listening for messages on table {tableName}.", { + tableName: this.#tableName, + }); + } + + /** + * Creates the message queue table if it does not already exist. + * Does nothing if the table already exists. + */ + initialize(): void { + if (this.#initialized) { + return; + } + + logger.debug("Initializing the message queue table {tableName}...", { + tableName: this.#tableName, + }); + + this.#db.exec(` + CREATE TABLE IF NOT EXISTS "${this.#tableName}" ( + id TEXT PRIMARY KEY, + message TEXT NOT NULL, + created INTEGER NOT NULL, + scheduled INTEGER NOT NULL + ) + `); + + this.#db.exec(` + CREATE INDEX IF NOT EXISTS "idx_${this.#tableName}_scheduled" + ON "${this.#tableName}" (scheduled) + `); + + this.#initialized = true; + logger.debug("Initialized the message queue table {tableName}.", { + tableName: this.#tableName, + }); + } + + /** + * Drops the table used by the message queue. Does nothing if the table + * does not exist. + */ + drop(): void { + this.#db.exec(`DROP TABLE IF EXISTS "${this.#tableName}"`); + this.#initialized = false; + } + + #encodeMessage(message: unknown): string { + return JSON.stringify(message); + } + + #decodeMessage(message: string): unknown { + return JSON.parse(message); + } + + #wait(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal?.aborted) { + resolve(); + return; + } + + const timer = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, ms); + + const onAbort = () => { + clearTimeout(timer); + resolve(); + }; + + signal?.addEventListener("abort", onAbort, { once: true }); + }); + } +} From 88e84b1d5b18c4affb32c2c76ac5d37ad5c66cac Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 22:25:40 +0000 Subject: [PATCH 02/28] Created testMessageQueue --- packages/testing/deno.json | 4 +- packages/testing/package.json | 7 +- packages/testing/src/mod.ts | 5 + packages/testing/src/mq-tester.ts | 189 ++++++++++++++++++++++++++++++ packages/testing/tsdown.config.ts | 1 + 5 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 packages/testing/src/mq-tester.ts diff --git a/packages/testing/deno.json b/packages/testing/deno.json index dc5fcb7e7..b618dcd1d 100644 --- a/packages/testing/deno.json +++ b/packages/testing/deno.json @@ -12,7 +12,9 @@ "pnpm-lock.yaml" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "build": "pnpm build", diff --git a/packages/testing/package.json b/packages/testing/package.json index be59f806f..8813e4181 100644 --- a/packages/testing/package.json +++ b/packages/testing/package.json @@ -50,9 +50,14 @@ "package.json" ], "peerDependencies": { - "@fedify/fedify": "workspace:^" + "@fedify/fedify": "workspace:^", + "@fedify/fixture": "workspace:^" + }, + "dependencies": { + "es-toolkit": "catalog:" }, "devDependencies": { + "@fedify/fixture": "workspace:^", "@js-temporal/polyfill": "catalog:", "@std/assert": "catalog:", "@std/async": "catalog:", diff --git a/packages/testing/src/mod.ts b/packages/testing/src/mod.ts index eeef79f3c..dcd9a917f 100644 --- a/packages/testing/src/mod.ts +++ b/packages/testing/src/mod.ts @@ -29,3 +29,8 @@ export { createInboxContext, createRequestContext, } from "./mock.ts"; +export { + default as testMessageQueue, + getRandomKey, + waitFor, +} from "./mq-tester.ts"; diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts new file mode 100644 index 000000000..d2eb1a9ad --- /dev/null +++ b/packages/testing/src/mq-tester.ts @@ -0,0 +1,189 @@ +import type { MessageQueue } from "@fedify/fedify"; +import { test as defaultTest } from "@fedify/fixture"; +import * as temporal from "@js-temporal/polyfill"; +import { delay } from "es-toolkit"; +import { deepStrictEqual, ok } from "node:assert/strict"; + +let Temporal: typeof temporal.Temporal; +if ("Temporal" in globalThis) { + Temporal = globalThis.Temporal; +} else { + Temporal = temporal.Temporal; +} + +/** + * Options for the {@link testMessageQueue} function. + */ +export interface TestMessageQueueOptions { + /** + * A custom test function to use instead of the default one. + * It should be compatible with `node:test`'s `test` function. + */ + test?: (name: string, fn: () => Promise | void) => void; + + /** + * An optional initialization function to call before the tests run. + * This is useful for setting up database tables or other resources. + * @param mq The message queue instance to initialize. + */ + initialize?: (mq: MessageQueue) => Promise | void; +} + +/** + * Tests a {@link MessageQueue} implementation with a standard set of tests. + * + * This function runs tests for: + * - `enqueue()`: Basic message enqueueing + * - `enqueue()` with delay: Delayed message enqueueing + * - `enqueueMany()`: Bulk message enqueueing + * - `enqueueMany()` with delay: Delayed bulk message enqueueing + * - Multiple listeners: Ensures messages are processed by only one listener + * + * @example + * ```typescript ignore + * import { testMessageQueue } from "@fedify/testing"; + * import { MyMessageQueue } from "./my-mq.ts"; + * + * testMessageQueue( + * "MyMessageQueue", + * () => new MyMessageQueue(), + * async ({ mq1, mq2, controller }) => { + * controller.abort(); + * await mq1.close(); + * await mq2.close(); + * }, + * ); + * ``` + * + * @param name The name of the test suite. + * @param getMessageQueue A factory function that creates a new message queue + * instance. It should return a new instance each time + * to ensure test isolation, but both instances should + * share the same underlying storage/channel. + * @param onFinally A cleanup function called after all tests complete. + * It receives both message queue instances and the abort + * controller used for the listeners. + * @param options Additional options for the test. + */ +export default function testMessageQueue< + MQ extends MessageQueue, +>( + name: string, + getMessageQueue: () => MQ | Promise, + onFinally: ({ + mq1, + mq2, + controller, + }: { + mq1: MQ; + mq2: MQ; + controller: AbortController; + }) => Promise | void, + options: TestMessageQueueOptions = {}, +): void { + const test = options?.test ?? defaultTest; + test(name, async () => { + const mq1 = await getMessageQueue(); + const mq2 = await getMessageQueue(); + const controller = new AbortController(); + try { + // Initialize if needed (e.g., create database tables) + if (options.initialize != null) { + await options.initialize(mq1); + } + + // Set up message collection and listeners + const messages: string[] = []; + const listening1 = mq1.listen((message: string) => { + messages.push(message); + }, controller); + const listening2 = mq2.listen((message: string) => { + messages.push(message); + }, controller); + + // Test: enqueue() + await mq1.enqueue("Hello, world!"); + await waitFor(() => messages.length > 0, 15_000); + deepStrictEqual(messages, ["Hello, world!"]); + + let started = Date.now(); + await mq1.enqueue( + "Delayed message", + { delay: Temporal.Duration.from({ seconds: 3 }) }, + ); + await waitFor(() => messages.length > 1, 15_000); + deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); + ok( + Date.now() - started >= 3_000, + "Delayed message should be delivered after at least 3 seconds", + ); + + // Test: enqueueMany() (skip if not supported) + if (mq1.enqueueMany != null) { + while (messages.length > 0) messages.pop(); + const batchMessages: string[] = [ + "First batch message", + "Second batch message", + "Third batch message", + ]; + await mq1.enqueueMany(batchMessages); + await waitFor(() => messages.length >= batchMessages.length, 15_000); + deepStrictEqual(new Set(messages), new Set(batchMessages)); + + // Test: enqueueMany() with delay + while (messages.length > 0) messages.pop(); + started = Date.now(); + const delayedBatchMessages: string[] = [ + "Delayed batch 1", + "Delayed batch 2", + ]; + await mq1.enqueueMany( + delayedBatchMessages, + { delay: Temporal.Duration.from({ seconds: 2 }) }, + ); + await waitFor( + () => messages.length >= delayedBatchMessages.length, + 15_000, + ); + deepStrictEqual(new Set(messages), new Set(delayedBatchMessages)); + ok( + Date.now() - started >= 2_000, + "Delayed batch messages should be delivered after at least 2 seconds", + ); + } + + // Test: bulk enqueue (stress test) + while (messages.length > 0) messages.pop(); + const bulkCount = 100; + for (let i = 0; i < bulkCount; i++) await mq1.enqueue(`message-${i}`); + await waitFor(() => messages.length >= bulkCount, 30_000); + const expectedMessages = new Set( + Array.from({ length: bulkCount }, (_, i) => `message-${i}`), + ); + deepStrictEqual(new Set(messages), expectedMessages); + + // Cleanup listeners + controller.abort(); + await listening1; + await listening2; + } finally { + await onFinally({ mq1, mq2, controller }); + } + }); +} + +export async function waitFor( + predicate: () => boolean, + timeoutMs: number, +): Promise { + const started = Date.now(); + while (!predicate()) { + await delay(500); + if (Date.now() - started > timeoutMs) { + throw new Error("Timeout"); + } + } +} + +export const getRandomKey = (prefix: string): string => + `fedify_test_${prefix}_${crypto.randomUUID()}`; diff --git a/packages/testing/tsdown.config.ts b/packages/testing/tsdown.config.ts index 516eb89d6..077863931 100644 --- a/packages/testing/tsdown.config.ts +++ b/packages/testing/tsdown.config.ts @@ -12,5 +12,6 @@ export default defineConfig({ "@fedify/fedify/utils", "@fedify/vocab", "@fedify/fedify/webfinger", + "@fedify/fixture", ], }); From e2ebd1ca038dfa9bdf82dc362a57730a83ae7762 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 22:26:32 +0000 Subject: [PATCH 03/28] Refactor message queue testso with testMessageQueue --- packages/amqp/package.json | 1 + packages/amqp/src/mq.test.ts | 128 +++++++----------------------- packages/denokv/src/mod.test.ts | 76 ++++++------------ packages/postgres/deno.json | 4 +- packages/postgres/package.json | 2 + packages/postgres/src/kv.test.ts | 2 +- packages/postgres/src/mq.test.ts | 132 ++++++------------------------- packages/postgres/src/mq.ts | 2 +- packages/redis/deno.json | 4 +- packages/redis/package.json | 1 + packages/redis/src/mq.test.ts | 122 +++++----------------------- packages/sqlite/deno.json | 2 +- packages/sqlite/package.json | 1 + packages/sqlite/src/mq.test.ts | 28 +++++++ pnpm-lock.yaml | 21 +++++ 15 files changed, 155 insertions(+), 371 deletions(-) create mode 100644 packages/sqlite/src/mq.test.ts diff --git a/packages/amqp/package.json b/packages/amqp/package.json index f12a2f2ee..41dbd00d9 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -59,6 +59,7 @@ }, "devDependencies": { "@alinea/suite": "^0.6.3", + "@fedify/testing": "workspace:^", "@js-temporal/polyfill": "catalog:", "@std/assert": "catalog:", "@std/async": "catalog:", diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index bfc24987e..547021d03 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -1,19 +1,12 @@ import { suite } from "@alinea/suite"; import { AmqpMessageQueue } from "@fedify/amqp/mq"; -import * as temporal from "@js-temporal/polyfill"; +import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert"; import { delay } from "@std/async/delay"; // @deno-types="npm:@types/amqplib" import { type ChannelModel, connect } from "amqplib"; import process from "node:process"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - const AMQP_URL = process.env.AMQP_URL; const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip; @@ -21,86 +14,32 @@ function getConnection(): Promise { return connect(AMQP_URL!); } -test("AmqpMessageQueue", { - sanitizeOps: false, - sanitizeExit: false, - sanitizeResources: false, -}, async () => { - const conn = await getConnection(); - const conn2 = await getConnection(); - const randomSuffix = Math.random().toString(36).substring(2); - const queue = `fedify_queue_${randomSuffix}`; - const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`; - const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); - const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix }); - - const messages1: string[] = []; - const messages2: string[] = []; - const allMessages: string[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string) => { - messages1.push(message); - allMessages.push(message); - }, { signal: controller.signal }); - const listening2 = mq2.listen((message: string) => { - messages2.push(message); - allMessages.push(message); - }, { signal: controller.signal }); - - await mq.enqueue("Hello, world!"); - - await waitFor(() => allMessages.length > 0, 15_000); - - assertEquals(allMessages.includes("Hello, world!"), true); - - // enqueue() with delay - const started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => allMessages.includes("Delayed message"), 15_000); - - // listen() with delay - assertEquals(allMessages.includes("Delayed message"), true); - assertGreater(Date.now() - started, 3_000); - - await mq.enqueueMany(["Message 1", "Message 2", "Message 3"]); - - await waitFor(() => - allMessages.includes("Message 1") && - allMessages.includes("Message 2") && - allMessages.includes("Message 3"), 15_000); - - // listen() after enqueueMany() - assertEquals(allMessages.includes("Message 1"), true); - assertEquals(allMessages.includes("Message 2"), true); - assertEquals(allMessages.includes("Message 3"), true); - - // enqueueMany() with delay - const manyStarted = Date.now(); - await mq.enqueueMany( - ["Delayed batch 1", "Delayed batch 2"], - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => - allMessages.includes("Delayed batch 1") && - allMessages.includes("Delayed batch 2"), 15_000); - - // listen() after enqueueMany() with delay - assertEquals(allMessages.includes("Delayed batch 1"), true); - assertEquals(allMessages.includes("Delayed batch 2"), true); - assertGreater(Date.now() - manyStarted, 3_000); - - controller.abort(); - await listening; - await listening2; - - await conn.close(); - await conn2.close(); -}); +const connections: ChannelModel[] = []; +const queue = getRandomKey("queue"); +const delayedQueuePrefix = getRandomKey("delayed") + "_"; + +testMessageQueue( + "AmqpMessageQueue", + async () => { + const conn = await getConnection(); + connections.push(conn); + return new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); + }, + async ({ controller }) => { + controller.abort(); + for (const conn of connections) { + await conn.close(); + } + }, + { + test: (name, fn) => + test(name, { + sanitizeOps: false, + sanitizeExit: false, + sanitizeResources: false, + }, fn), + }, +); test( "AmqpMessageQueue [nativeRetrial: false]", @@ -171,16 +110,3 @@ test( assertGreater(i, 1); }, ); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } -} diff --git a/packages/denokv/src/mod.test.ts b/packages/denokv/src/mod.test.ts index 6de322a86..0019ae9ac 100644 --- a/packages/denokv/src/mod.test.ts +++ b/packages/denokv/src/mod.test.ts @@ -1,5 +1,5 @@ -import { assertEquals, assertGreater } from "@std/assert"; -import { delay } from "es-toolkit"; +import { testMessageQueue } from "@fedify/testing"; +import { assertEquals } from "@std/assert"; import { DenoKvMessageQueue, DenoKvStore } from "./mod.ts"; Deno.test("DenoKvStore", async (t) => { @@ -133,56 +133,24 @@ Deno.test("DenoKvStore", async (t) => { kv.close(); }); -Deno.test("DenoKvMessageQueue", async (t) => { - const kv = await Deno.openKv(":memory:"); - const mq = new DenoKvMessageQueue(kv); - - const messages: string[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - - await t.step("enqueue()", async () => { - await mq.enqueue("Hello, world!"); - }); - - await waitFor(() => messages.length > 0, 15_000); - - await t.step("listen()", () => { - assertEquals(messages, ["Hello, world!"]); - }); - - let started = 0; - await t.step("enqueue() with delay", async () => { - started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - }); - - await waitFor(() => messages.length > 1, 15_000); - - await t.step("listen() with delay", () => { - assertEquals(messages, ["Hello, world!", "Delayed message"]); - assertGreater(Date.now() - started, 3_000); - }); - - controller.abort(); - await listening; - mq[Symbol.dispose](); -}); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); +const kvs: Deno.Kv[] = []; + +testMessageQueue( + "DenoKvMessageQueue", + async () => { + const kv = await Deno.openKv(":memory:"); + kvs.push(kv); + return new DenoKvMessageQueue(kv); + }, + ({ controller }) => { + controller.abort(); + for (const kv of kvs) { + try { + kv.close(); + } catch { + // Ignore errors on close + } } - } -} + }, + { test: Deno.test }, +); diff --git a/packages/postgres/deno.json b/packages/postgres/deno.json index 08cc9d35f..3b1c3199e 100644 --- a/packages/postgres/deno.json +++ b/packages/postgres/deno.json @@ -12,7 +12,9 @@ "node_modules" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "check": "deno fmt --check && deno lint && deno check src/*.ts", diff --git a/packages/postgres/package.json b/packages/postgres/package.json index 84487f481..e362caa79 100644 --- a/packages/postgres/package.json +++ b/packages/postgres/package.json @@ -77,6 +77,8 @@ "postgres": "catalog:" }, "devDependencies": { + "@fedify/fixture": "workspace:^", + "@fedify/testing": "workspace:^", "@std/async": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/postgres/src/kv.test.ts b/packages/postgres/src/kv.test.ts index 0047c252f..52a305153 100644 --- a/packages/postgres/src/kv.test.ts +++ b/packages/postgres/src/kv.test.ts @@ -13,7 +13,7 @@ if ("Temporal" in globalThis) { Temporal = temporal.Temporal; } -const dbUrl = process.env.DATABASE_URL; +const dbUrl = process.env.POSTGRES_URL; function getStore(): { // deno-lint-ignore no-explicit-any diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index 715b1b983..e900c3695 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -1,116 +1,34 @@ +import { test } from "@fedify/fixture"; import { PostgresMessageQueue } from "@fedify/postgres/mq"; -import * as temporal from "@js-temporal/polyfill"; -import { delay } from "@std/async/delay"; -import assert from "node:assert/strict"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; import process from "node:process"; -import { test } from "node:test"; import postgres from "postgres"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - -const dbUrl = process.env.DATABASE_URL; +const dbUrl = process.env.POSTGRES_URL; +const sqls: postgres.Sql[] = []; -test("PostgresMessageQueue", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +function createSql() { const sql = postgres(dbUrl!); - const sql2 = postgres(dbUrl!); - const tableName = `fedify_message_test_${ - Math.random().toString(36).slice(5) - }`; - const channelName = `fedify_channel_test_${ - Math.random().toString(36).slice(5) - }`; - const mq = new PostgresMessageQueue(sql, { tableName, channelName }); - const mq2 = new PostgresMessageQueue(sql2, { tableName, channelName }); - - try { - const messages: string[] = []; - const controller = new AbortController(); - await mq.initialize(); - const listening = mq.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - const listening2 = mq2.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - - // enqueue() - await mq.enqueue("Hello, world!"); - - await waitFor(() => messages.length > 0, 15_000); - - // listen() - assert.deepStrictEqual(messages, ["Hello, world!"]); - - // enqueue() with delay - let started = 0; - started = Date.now(); - await mq.enqueue( - { msg: "Delayed message" }, - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => messages.length > 1, 15_000); - - // listen() with delay - assert.deepStrictEqual(messages, ["Hello, world!", { - msg: "Delayed message", - }]); - assert.ok(Date.now() - started > 3_000); - - // enqueueMany() - while (messages.length > 0) messages.pop(); - const batchMessages = [ - "First batch message", - { text: "Second batch message" }, - { text: "Third batch message", priority: "high" }, - ]; - await mq.enqueueMany(batchMessages); - await waitFor(() => messages.length === batchMessages.length, 15_000); - assert.deepStrictEqual(messages, batchMessages); - - // enqueueMany() with delay - while (messages.length > 0) messages.pop(); - started = Date.now(); - const delayedBatchMessages = [ - "Delayed batch 1", - "Delayed batch 2", - ]; - await mq.enqueueMany( - delayedBatchMessages, - { delay: Temporal.Duration.from({ seconds: 2 }) }, - ); - await waitFor( - () => messages.length === delayedBatchMessages.length, - 15_000, - ); - assert.deepStrictEqual(messages, delayedBatchMessages); - assert.ok(Date.now() - started > 2_000); + sqls.push(sql); + return sql; +} +testMessageQueue( + "PostgresMessageQueue", + () => + new PostgresMessageQueue(createSql(), { + tableName: getRandomKey("message"), + channelName: getRandomKey("channel"), + }), + async ({ mq1, mq2, controller }) => { controller.abort(); - await listening; - await listening2; - } finally { - await mq.drop(); - await sql.end(); - await sql2.end(); - } -}); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); + await mq1.drop(); + await mq2.drop(); + for (const sql of sqls) { + await sql.end(); } - } -} + }, + { test: (name, fn) => test(name, { ignore: dbUrl == null }, fn) }, +); + +// cspell: ignore sqls diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index 53ca0b044..67d7436bf 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -258,4 +258,4 @@ export class PostgresMessageQueue implements MessageQueue { } } -// cSpell: ignore typname +// cSpell: ignore typname unlisten diff --git a/packages/redis/deno.json b/packages/redis/deno.json index 9063255ce..62597bce9 100644 --- a/packages/redis/deno.json +++ b/packages/redis/deno.json @@ -13,7 +13,9 @@ "node_modules" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "check": "deno fmt --check && deno lint && deno check src/*.ts", diff --git a/packages/redis/package.json b/packages/redis/package.json index f68071ec9..c8d047790 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -87,6 +87,7 @@ "devDependencies": { "@std/async": "catalog:", "@fedify/fixture": "workspace:^", + "@fedify/testing": "workspace:^", "@types/node": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index cb37fa391..03f2c20bf 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -1,111 +1,25 @@ import { test } from "@fedify/fixture"; import { RedisMessageQueue } from "@fedify/redis/mq"; -import * as temporal from "@js-temporal/polyfill"; -import { delay } from "@std/async/delay"; -import { Redis } from "ioredis"; -import assert from "node:assert/strict"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; import process from "node:process"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - -const redisUrl = process.env.REDIS_URL; - -test("RedisMessageQueue", { ignore: redisUrl == null }, async () => { - const channelKey = `fedify_test_channel_${crypto.randomUUID()}`; - const queueKey = `fedify_test_queue_${crypto.randomUUID()}`; - const lockKey = `fedify_test_lock_${crypto.randomUUID()}`; - const mq = new RedisMessageQueue(() => new Redis(redisUrl!), { - pollInterval: { seconds: 1 }, - channelKey, - queueKey, - lockKey, - }); - const mq2 = new RedisMessageQueue(() => new Redis(redisUrl!), { - pollInterval: { seconds: 1 }, - channelKey, - queueKey, - lockKey, - }); - - const messages: (string | number)[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string | number) => { - messages.push(message); - }, controller); - const listening2 = mq2.listen((message: string | number) => { - messages.push(message); - }, controller); - - try { - // enqueue() - await mq.enqueue("Hello, world!"); - - await waitFor(() => messages.length > 0, 15_000); - - // listen() - assert.deepStrictEqual(messages, ["Hello, world!"]); - - // enqueue() with delay - let started = 0; - started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => messages.length > 1, 15_000); - - // listen() with delay - assert.deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); - assert.ok(Date.now() - started > 3_000); - - // enqueue() [bulk] - for (let i = 0; i < 1_000; i++) await mq.enqueue(i); - - await waitFor(() => messages.length > 1_001, 30_000); - - // listen() [bulk] - const numbers: Set = new Set(); - for (let i = 0; i < 1_000; i++) numbers.add(i); - assert.deepStrictEqual(new Set(messages.slice(2)), numbers); - - // Reset messages array for the next test: - while (messages.length > 0) messages.pop(); - - // enqueueMany() - const bulkMessages = Array.from({ length: 500 }, (_, i) => `bulk-${i}`); - await mq.enqueueMany(bulkMessages); - - await waitFor(() => messages.length >= 500, 30_000); +import { Redis } from "ioredis"; - // listen() after enqueueMany() - const expectedMessages = new Set( - Array.from({ length: 500 }, (_, i) => `bulk-${i}`), - ); - assert.deepStrictEqual(new Set(messages), expectedMessages); - } finally { +const dbUrl = process.env.REDIS_URL; + +testMessageQueue( + "RedisMessageQueue", + () => + new RedisMessageQueue(() => new Redis(dbUrl!), { + pollInterval: { seconds: 1 }, + channelKey: getRandomKey("channel"), + queueKey: getRandomKey("queue"), + lockKey: getRandomKey("lock"), + }), + ({ mq1, mq2, controller }) => { controller.abort(); - await listening; - await listening2; - mq[Symbol.dispose](); + mq1[Symbol.dispose](); mq2[Symbol.dispose](); - } -}); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } -} + }, + { test: (name, fn) => test(name, { ignore: dbUrl == null }, fn) }, +); diff --git a/packages/sqlite/deno.json b/packages/sqlite/deno.json index 83f9a6f3f..d114f3477 100644 --- a/packages/sqlite/deno.json +++ b/packages/sqlite/deno.json @@ -22,6 +22,6 @@ }, "tasks": { "check": "deno fmt --check && deno lint && deno check", - "test": "deno test --allow-net --allow-env --doc --no-check=leaks" + "test": "deno test --allow-net --allow-env --allow-read --allow-write --doc --no-check=leaks" } } diff --git a/packages/sqlite/package.json b/packages/sqlite/package.json index dc6660961..5d2f07f0e 100644 --- a/packages/sqlite/package.json +++ b/packages/sqlite/package.json @@ -65,6 +65,7 @@ "@fedify/fedify": "workspace:^" }, "devDependencies": { + "@fedify/testing": "workspace:^", "@std/async": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts new file mode 100644 index 000000000..6c2a79b6b --- /dev/null +++ b/packages/sqlite/src/mq.test.ts @@ -0,0 +1,28 @@ +import { PlatformDatabase } from "#sqlite"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; +import { SqliteMessageQueue } from "./mq.ts"; + +let Temporal: typeof temporal.Temporal; +if ("Temporal" in globalThis) { + Temporal = globalThis.Temporal; +} else { + Temporal = temporal.Temporal; +} + +const dbPath = `/tmp/${getRandomKey("sqlite")}.db`; +const db = new PlatformDatabase(dbPath); +const tableName = getRandomKey("message").replaceAll("-", "_"); + +testMessageQueue( + "SqliteMessageQueue", + () => + new SqliteMessageQueue(db, { + tableName, + pollInterval: Temporal.Duration.from({ milliseconds: 500 }), + }), + ({ controller }) => { + controller.abort(); + db.close(); + }, +); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b3660c1c9..1c6abff78 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -668,6 +668,9 @@ importers: '@alinea/suite': specifier: ^0.6.3 version: 0.6.3 + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@js-temporal/polyfill': specifier: 'catalog:' version: 0.5.1 @@ -1150,6 +1153,12 @@ importers: specifier: 'catalog:' version: 3.4.7 devDependencies: + '@fedify/fixture': + specifier: workspace:^ + version: link:../fixture + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1178,6 +1187,9 @@ importers: '@fedify/fixture': specifier: workspace:^ version: link:../fixture + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1234,6 +1246,9 @@ importers: specifier: ^1.31.0 version: 1.39.5 devDependencies: + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1265,7 +1280,13 @@ importers: '@fedify/fedify': specifier: workspace:^ version: link:../fedify + es-toolkit: + specifier: 'catalog:' + version: 1.43.0 devDependencies: + '@fedify/fixture': + specifier: workspace:^ + version: link:../fixture '@js-temporal/polyfill': specifier: 'catalog:' version: 0.5.1 From 5b135e9dc074bc2b000817950ed5691d75b1272a Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 22:56:00 +0000 Subject: [PATCH 04/28] Fix types --- packages/sqlite/src/mq.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 1a8c0d897..1421f1bf2 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -92,8 +92,7 @@ export class SqliteMessageQueue implements MessageQueue { /** * {@inheritDoc MessageQueue.enqueue} */ - // deno-lint-ignore require-await - async enqueue( + enqueue( // deno-lint-ignore no-explicit-any message: any, options?: MessageQueueEnqueueOptions, @@ -123,18 +122,18 @@ export class SqliteMessageQueue implements MessageQueue { .run(id, encodedMessage, now, scheduled); logger.debug("Enqueued a message.", { message }); + return Promise.resolve(); } /** * {@inheritDoc MessageQueue.enqueueMany} */ - // deno-lint-ignore require-await - async enqueueMany( + enqueueMany( // deno-lint-ignore no-explicit-any - messages: any[], + messages: readonly any[], options?: MessageQueueEnqueueOptions, ): Promise { - if (messages.length === 0) return; + if (messages.length === 0) return Promise.resolve(); this.initialize(); @@ -171,6 +170,7 @@ export class SqliteMessageQueue implements MessageQueue { this.#db.exec("ROLLBACK"); throw error; } + return Promise.resolve(); } /** From 003bf0169f2a5587403f5a8a3794d8517d2536ae Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 23:21:15 +0000 Subject: [PATCH 05/28] Add docs --- CHANGES.md | 26 ++++++++++++++++++ docs/manual/mq.md | 68 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index f1d8505ec..7744db8fb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -275,6 +275,32 @@ To be released. [#437]: https://github.com/fedify-dev/fedify/issues/437 +### @fedify/sqlite + + - Added `SqliteMessageQueue` class implementing `MessageQueue` interface + using SQLite as the backing store. This implementation uses polling to + check for new messages and is suitable for single-node deployments and + development environments. [[#477], [#526] by ChanHaeng Lee] + + - Added `SqliteMessageQueue` class. + - Added `SqliteMessageQueueOptions` interface. + +[#477]: https://github.com/fedify-dev/fedify/issues/477 +[#526]: https://github.com/fedify-dev/fedify/pull/526 + +### @fedify/testing + + - Added `testMessageQueue()` utility function for standardized testing of + `MessageQueue` implementations. This function provides a reusable test + harness that covers common message queue operations including `enqueue()`, + `enqueue()` with delay, `enqueueMany()`, and multiple listener scenarios. + [[#477], [#526] by ChanHaeng Lee] + + - Added `testMessageQueue()` function. + - Added `TestMessageQueueOptions` interface. + - Added `waitFor()` helper function. + - Added `getRandomKey()` helper function. + Version 1.10.1 -------------- diff --git a/docs/manual/mq.md b/docs/manual/mq.md index ed9c3af31..896b64db7 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -293,6 +293,71 @@ const federation = createFederation({ [`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue [RabbitMQ]: https://www.rabbitmq.com/ +### `SqliteMessageQueue` + +*This API is available since Fedify 2.0.0.* + +To use [`SqliteMessageQueue`], you need to install the *@fedify/sqlite* package +first: + +::: code-group + +~~~~ bash [Deno] +deno add jsr:@fedify/sqlite +~~~~ + +~~~~ bash [npm] +npm add @fedify/sqlite +~~~~ + +~~~~ bash [pnpm] +pnpm add @fedify/sqlite +~~~~ + +~~~~ bash [Yarn] +yarn add @fedify/sqlite +~~~~ + +~~~~ bash [Bun] +bun add @fedify/sqlite +~~~~ + +::: + +[`SqliteMessageQueue`] is a message queue implementation that uses SQLite as +the backend. It uses polling to check for new messages and is designed for +single-node deployments. It's suitable for development, testing, and +small-scale production use where simplicity is preferred over high throughput. + +Best for +: Development, testing, and single-node deployments. + +Pros +: Simple setup, persistent, no external dependencies beyond SQLite. + +Cons +: Not suitable for distributed environments or high-throughput scenarios, + uses polling instead of push-based messaging. + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; +import Database from "better-sqlite3"; + +const db = new Database("federation.db"); +const federation = createFederation({ +// ---cut-start--- + kv: null as unknown as KvStore, +// ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] + // ... other options +}); +~~~~ + +[`SqliteMessageQueue`]: https://jsr.io/@fedify/sqlite/doc/mq/~/SqliteMessageQueue + ### `WorkersMessageQueue` (Cloudflare Workers only) *This API is available since Fedify 1.6.0.* @@ -659,6 +724,9 @@ The following implementations do not yet support native retry: [`AmqpMessageQueue`] : Native retry support planned for future release. +[`SqliteMessageQueue`] +: No native retry support (`~MessageQueue.nativeRetrial` is `false`). + `ParallelMessageQueue` inherits the `~MessageQueue.nativeRetrial` value from the wrapped queue. From b1f8387e3ee3e2f437f307e2f4e5659df613a4c6 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Fri, 16 Jan 2026 23:46:54 +0000 Subject: [PATCH 06/28] Refactor listen() with DELETE ... RETURNING --- packages/sqlite/src/mq.ts | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 1421f1bf2..05d7b99b9 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -192,23 +192,22 @@ export class SqliteMessageQueue implements MessageQueue { while (signal == null || !signal.aborted) { const now = Temporal.Now.instant().epochMilliseconds; - // Get the oldest message that is ready to be processed + // Atomically fetch and delete the oldest message that is ready to be + // processed using DELETE ... RETURNING (SQLite >= 3.35.0) const result = this.#db .prepare( - `SELECT id, message - FROM "${this.#tableName}" - WHERE scheduled <= ? - ORDER BY scheduled - LIMIT 1`, + `DELETE FROM "${this.#tableName}" + WHERE id = ( + SELECT id FROM "${this.#tableName}" + WHERE scheduled <= ? + ORDER BY scheduled + LIMIT 1 + ) + RETURNING id, message`, ) .get(now) as { id: string; message: string } | undefined; if (result) { - // Delete the message before processing to prevent duplicate processing - this.#db - .prepare(`DELETE FROM "${this.#tableName}" WHERE id = ?`) - .run(result.id); - const message = this.#decodeMessage(result.message); logger.debug("Processing message {id}...", { id: result.id, message }); From dc2a37c9cd06a0542d434dc6cec78b97754f31cf Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 17 Jan 2026 00:29:39 +0000 Subject: [PATCH 07/28] Fix docs, temporal and export --- docs/manual/mq.md | 61 +++++++++++++++++++++++++++++------- packages/sqlite/package.json | 10 ++++++ packages/sqlite/src/mq.ts | 8 +++++ 3 files changed, 68 insertions(+), 11 deletions(-) diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 896b64db7..1dfcf4bff 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -328,34 +328,73 @@ bun add @fedify/sqlite the backend. It uses polling to check for new messages and is designed for single-node deployments. It's suitable for development, testing, and small-scale production use where simplicity is preferred over high throughput. +It uses native sqlite modules, [`node:sqlite`] for Node.js and Deno, +[`bun:sqlite`] for Bun. Best for -: Development, testing, and single-node deployments. +: Development and testing. Pros -: Simple setup, persistent, no external dependencies beyond SQLite. +: Simple, persistent with minimal configuration. Cons -: Not suitable for distributed environments or high-throughput scenarios, - uses polling instead of push-based messaging. +: Limited scalability, not suitable for high-traffic production. -~~~~ typescript twoslash +::: code-group + +~~~~ typescript twoslash [Deno] import type { KvStore } from "@fedify/fedify"; // ---cut-before--- +import { DatabaseSync } from "node:sqlite"; import { createFederation } from "@fedify/fedify"; import { SqliteMessageQueue } from "@fedify/sqlite"; -import Database from "better-sqlite3"; -const db = new Database("federation.db"); +const db = new DatabaseSync(":memory:"); const federation = createFederation({ -// ---cut-start--- - kv: null as unknown as KvStore, -// ---cut-end--- + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- queue: new SqliteMessageQueue(db), // [!code highlight] - // ... other options }); ~~~~ +~~~~ typescript twoslash [Node.js] +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { DatabaseSync } from "node:sqlite"; +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; + +const db = new DatabaseSync(":memory:"); +const federation = createFederation({ + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] +}); +~~~~ + +~~~~ typescript [Bun] +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { Database } from "bun:sqlite"; +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; + +const db = new Database(":memory:"); +const federation = createFederation({ + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] +}); +~~~~ + +::: + [`SqliteMessageQueue`]: https://jsr.io/@fedify/sqlite/doc/mq/~/SqliteMessageQueue ### `WorkersMessageQueue` (Cloudflare Workers only) diff --git a/packages/sqlite/package.json b/packages/sqlite/package.json index 5d2f07f0e..da7e927b4 100644 --- a/packages/sqlite/package.json +++ b/packages/sqlite/package.json @@ -46,6 +46,16 @@ "require": "./dist/kv.cjs", "default": "./dist/kv.js" }, + "./mq": { + "types": { + "import": "./dist/mq.d.ts", + "require": "./dist/mq.d.cts", + "default": "./dist/mq.d.ts" + }, + "import": "./dist/mq.js", + "require": "./dist/mq.cjs", + "default": "./dist/mq.js" + }, "./package.json": "./package.json" }, "imports": { diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 05d7b99b9..476bb3e12 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -4,9 +4,17 @@ import type { MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; +import * as temporal from "@js-temporal/polyfill"; import { getLogger } from "@logtape/logtape"; import type { SqliteDatabaseAdapter } from "./adapter.ts"; +let Temporal: typeof temporal.Temporal; +if ("Temporal" in globalThis) { + Temporal = globalThis.Temporal; +} else { + Temporal = temporal.Temporal; +} + const logger = getLogger(["fedify", "sqlite", "mq"]); /** From 0d97e104b24d3c23a85e2beff7c1e2e9baa8f518 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 17 Jan 2026 22:15:20 +0000 Subject: [PATCH 08/28] Fix test error - Remove `let Temporal ...` - Export `@fedify/sqlite` - Add missing dependencies --- packages/cli/package.json | 1 + packages/sqlite/deno.json | 3 ++- packages/sqlite/src/mq.test.ts | 16 ++-------------- packages/sqlite/src/mq.ts | 10 +--------- packages/sqlite/tsdown.config.ts | 8 +++++++- packages/testing/src/mq-tester.ts | 8 -------- packages/testing/tsdown.config.ts | 12 ++++++++++++ pnpm-lock.yaml | 3 +++ 8 files changed, 28 insertions(+), 33 deletions(-) diff --git a/packages/cli/package.json b/packages/cli/package.json index d264dc734..757a63342 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -50,6 +50,7 @@ "@fedify/vocab": "workspace:*", "@fedify/vocab-runtime": "workspace:*", "@fedify/vocab-tools": "workspace:*", + "@fedify/webfinger": "workspace:*", "@fxts/core": "catalog:", "@hongminhee/localtunnel": "^0.3.0", "@inquirer/prompts": "^7.8.4", diff --git a/packages/sqlite/deno.json b/packages/sqlite/deno.json index d114f3477..837c689db 100644 --- a/packages/sqlite/deno.json +++ b/packages/sqlite/deno.json @@ -4,7 +4,8 @@ "license": "MIT", "exports": { ".": "./src/mod.ts", - "./kv": "./src/kv.ts" + "./kv": "./src/kv.ts", + "./mq": "./src/mq.ts" }, "imports": { "#sqlite": "./src/sqlite.node.ts" diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 6c2a79b6b..2b6847cce 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -1,14 +1,6 @@ import { PlatformDatabase } from "#sqlite"; +import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; -import * as temporal from "@js-temporal/polyfill"; -import { SqliteMessageQueue } from "./mq.ts"; - -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} const dbPath = `/tmp/${getRandomKey("sqlite")}.db`; const db = new PlatformDatabase(dbPath); @@ -16,11 +8,7 @@ const tableName = getRandomKey("message").replaceAll("-", "_"); testMessageQueue( "SqliteMessageQueue", - () => - new SqliteMessageQueue(db, { - tableName, - pollInterval: Temporal.Duration.from({ milliseconds: 500 }), - }), + () => new SqliteMessageQueue(db, { tableName }), ({ controller }) => { controller.abort(); db.close(); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 476bb3e12..5cb6c7982 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -4,17 +4,9 @@ import type { MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; -import * as temporal from "@js-temporal/polyfill"; import { getLogger } from "@logtape/logtape"; import type { SqliteDatabaseAdapter } from "./adapter.ts"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - const logger = getLogger(["fedify", "sqlite", "mq"]); /** @@ -87,7 +79,7 @@ export class SqliteMessageQueue implements MessageQueue { this.#initialized = options.initialized ?? false; this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName; this.#pollIntervalMs = Temporal.Duration.from( - options.pollInterval ?? { seconds: 5 }, + options.pollInterval ?? { milliseconds: 500 }, ).total("millisecond"); if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { diff --git a/packages/sqlite/tsdown.config.ts b/packages/sqlite/tsdown.config.ts index 978459047..7f006fdf9 100644 --- a/packages/sqlite/tsdown.config.ts +++ b/packages/sqlite/tsdown.config.ts @@ -1,7 +1,13 @@ import { defineConfig } from "tsdown"; export default defineConfig({ - entry: ["src/mod.ts", "src/kv.ts", "src/sqlite.node.ts", "src/sqlite.bun.ts"], + entry: [ + "src/mod.ts", + "src/kv.ts", + "src/mq.ts", + "src/sqlite.node.ts", + "src/sqlite.bun.ts", + ], dts: true, unbundle: true, format: ["esm", "cjs"], diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts index d2eb1a9ad..160184880 100644 --- a/packages/testing/src/mq-tester.ts +++ b/packages/testing/src/mq-tester.ts @@ -1,16 +1,8 @@ import type { MessageQueue } from "@fedify/fedify"; import { test as defaultTest } from "@fedify/fixture"; -import * as temporal from "@js-temporal/polyfill"; import { delay } from "es-toolkit"; import { deepStrictEqual, ok } from "node:assert/strict"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - /** * Options for the {@link testMessageQueue} function. */ diff --git a/packages/testing/tsdown.config.ts b/packages/testing/tsdown.config.ts index 077863931..b96b3814e 100644 --- a/packages/testing/tsdown.config.ts +++ b/packages/testing/tsdown.config.ts @@ -14,4 +14,16 @@ export default defineConfig({ "@fedify/fedify/webfinger", "@fedify/fixture", ], + outputOptions(outputOptions, format) { + if (format === "cjs") { + outputOptions.intro = ` + const { Temporal } = require("@js-temporal/polyfill"); + `; + } else { + outputOptions.intro = ` + import { Temporal } from "@js-temporal/polyfill"; + `; + } + return outputOptions; + }, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1c6abff78..7c84413a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -735,6 +735,9 @@ importers: '@fedify/vocab-tools': specifier: workspace:* version: link:../vocab-tools + '@fedify/webfinger': + specifier: workspace:* + version: link:../webfinger '@fxts/core': specifier: 'catalog:' version: 1.20.0 From ada584ece96e74e784cf6ae6c6fe5afb7dc3aae5 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 18 Jan 2026 15:37:51 +0000 Subject: [PATCH 09/28] Remove initialize from TestMessageQueueOptions https://github.com/fedify-dev/fedify/pull/526#discussion_r2701805847 --- packages/testing/src/mq-tester.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts index 160184880..b431eaad1 100644 --- a/packages/testing/src/mq-tester.ts +++ b/packages/testing/src/mq-tester.ts @@ -12,13 +12,6 @@ export interface TestMessageQueueOptions { * It should be compatible with `node:test`'s `test` function. */ test?: (name: string, fn: () => Promise | void) => void; - - /** - * An optional initialization function to call before the tests run. - * This is useful for setting up database tables or other resources. - * @param mq The message queue instance to initialize. - */ - initialize?: (mq: MessageQueue) => Promise | void; } /** @@ -79,11 +72,6 @@ export default function testMessageQueue< const mq2 = await getMessageQueue(); const controller = new AbortController(); try { - // Initialize if needed (e.g., create database tables) - if (options.initialize != null) { - await options.initialize(mq1); - } - // Set up message collection and listeners const messages: string[] = []; const listening1 = mq1.listen((message: string) => { From 58645319841ac7d8f1ddcafbffec0f087389fb66 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 18 Jan 2026 15:47:28 +0000 Subject: [PATCH 10/28] Fix comment about https://github.com/fedify-dev/fedify/pull/526#discussion_r2701910267 --- packages/sqlite/src/mq.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 5cb6c7982..1e84ad7ae 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -28,7 +28,7 @@ export interface SqliteMessageQueueOptions { initialized?: boolean; /** - * The poll interval for the message queue. 5 seconds by default. + * The poll interval for the message queue. 500 milliseconds by default. */ pollInterval?: Temporal.Duration | Temporal.DurationLike; } From d0aa1b7c8f27d60f721ef64184e532388944444a Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 18 Jan 2026 15:52:33 +0000 Subject: [PATCH 11/28] Add note about SqliteMessageQueue's DELETE ... RETURNING requirement https://github.com/fedify-dev/fedify/pull/526#discussion_r2701910753 --- docs/manual/mq.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 1dfcf4bff..e6fcc928f 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -340,6 +340,11 @@ Pros Cons : Limited scalability, not suitable for high-traffic production. +> [!NOTE] +> `SqliteMessageQueue` uses `DELETE ... RETURNING` to atomically fetch and +> delete the oldest message that is ready to be processed. This requires +> SQLite 3.35.0 or later. + ::: code-group ~~~~ typescript twoslash [Deno] From 67c44bb50f0ec277bcef9def6ac0b7723dfae8c0 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 18 Jan 2026 17:02:33 +0000 Subject: [PATCH 12/28] Fix SqliteMessageQueue.listen https://github.com/fedify-dev/fedify/pull/526#discussion_r2701934856 --- packages/sqlite/src/mq.ts | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 1e84ad7ae..2e2aecddf 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -28,7 +28,8 @@ export interface SqliteMessageQueueOptions { initialized?: boolean; /** - * The poll interval for the message queue. 500 milliseconds by default. + * The poll interval for the message queue. + * @default `{ milliseconds: 500 }` */ pollInterval?: Temporal.Duration | Temporal.DurationLike; } @@ -210,17 +211,8 @@ export class SqliteMessageQueue implements MessageQueue { if (result) { const message = this.#decodeMessage(result.message); logger.debug("Processing message {id}...", { id: result.id, message }); - - try { - await handler(message); - logger.debug("Processed message {id}.", { id: result.id }); - } catch (error) { - logger.error( - "Failed to process message {id}: {error}", - { id: result.id, error }, - ); - throw error; - } + await handler(message); + logger.debug("Processed message {id}.", { id: result.id }); // Check for next message immediately continue; From 800a880bfeb4febed5d4e0de79d050833d12679c Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 18 Jan 2026 19:37:08 +0000 Subject: [PATCH 13/28] Optimize SqliteMessageQueue.listen --- packages/sqlite/src/mq.test.ts | 6 +- packages/sqlite/src/mq.ts | 156 ++++++++++++++++++++++----------- 2 files changed, 110 insertions(+), 52 deletions(-) diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 2b6847cce..081080959 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -1,8 +1,12 @@ import { PlatformDatabase } from "#sqlite"; import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import { mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; -const dbPath = `/tmp/${getRandomKey("sqlite")}.db`; +const dbDir = await mkdtemp(join(tmpdir(), "fedify-sqlite-")); +const dbPath = join(dbDir, `${getRandomKey("sqlite")}.db`); const db = new PlatformDatabase(dbPath); const tableName = getRandomKey("message").replaceAll("-", "_"); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 2e2aecddf..f428a125c 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -9,6 +9,14 @@ import type { SqliteDatabaseAdapter } from "./adapter.ts"; const logger = getLogger(["fedify", "sqlite", "mq"]); +class EnqueueEvent extends Event { + readonly delayMs: number; + constructor(delayMs: number) { + super("enqueue"); + this.delayMs = delayMs; + } +} + /** * Options for the SQLite message queue. */ @@ -29,7 +37,7 @@ export interface SqliteMessageQueueOptions { /** * The poll interval for the message queue. - * @default `{ milliseconds: 500 }` + * @default `{ seconds: 5 }` */ pollInterval?: Temporal.Duration | Temporal.DurationLike; } @@ -57,6 +65,19 @@ export interface SqliteMessageQueueOptions { export class SqliteMessageQueue implements MessageQueue { static readonly #defaultTableName = "fedify_message"; static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; + // In-memory event emitter for notifying listeners when messages are enqueued. + // Scoped per table name to allow multiple queues to coexist. + static readonly #notifyChannels = new Map(); + + static #getNotifyChannel(tableName: string): EventTarget { + let channel = SqliteMessageQueue.#notifyChannels.get(tableName); + if (channel == null) { + channel = new EventTarget(); + SqliteMessageQueue.#notifyChannels.set(tableName, channel); + } + return channel; + } + readonly #db: SqliteDatabaseAdapter; readonly #tableName: string; readonly #pollIntervalMs: number; @@ -69,7 +90,7 @@ export class SqliteMessageQueue implements MessageQueue { /** * Creates a new SQLite message queue. - * @param db The SQLite database to use. Supports `node:sqlite` and `bun:sqlite`. + * @param db The SQLite database to use. Supports `node:sqlite`, `bun:sqlite`. * @param options The options for the message queue. */ constructor( @@ -80,7 +101,7 @@ export class SqliteMessageQueue implements MessageQueue { this.#initialized = options.initialized ?? false; this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName; this.#pollIntervalMs = Temporal.Duration.from( - options.pollInterval ?? { milliseconds: 500 }, + options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { @@ -123,6 +144,13 @@ export class SqliteMessageQueue implements MessageQueue { .run(id, encodedMessage, now, scheduled); logger.debug("Enqueued a message.", { message }); + + // Notify listeners that a message has been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + return Promise.resolve(); } @@ -167,6 +195,12 @@ export class SqliteMessageQueue implements MessageQueue { this.#db.exec("COMMIT"); logger.debug("Enqueued messages.", { messages }); + + // Notify listeners that messages have been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); } catch (error) { this.#db.exec("ROLLBACK"); throw error; @@ -190,36 +224,77 @@ export class SqliteMessageQueue implements MessageQueue { { tableName: this.#tableName }, ); - while (signal == null || !signal.aborted) { - const now = Temporal.Now.instant().epochMilliseconds; - - // Atomically fetch and delete the oldest message that is ready to be - // processed using DELETE ... RETURNING (SQLite >= 3.35.0) - const result = this.#db - .prepare( - `DELETE FROM "${this.#tableName}" - WHERE id = ( - SELECT id FROM "${this.#tableName}" - WHERE scheduled <= ? - ORDER BY scheduled - LIMIT 1 + const channel = SqliteMessageQueue.#getNotifyChannel(this.#tableName); + const timeouts = new Set>(); + + const poll = async () => { + while (signal == null || !signal.aborted) { + const now = Temporal.Now.instant().epochMilliseconds; + + // Atomically fetch and delete the oldest message that is ready to be + // processed using DELETE ... RETURNING (SQLite >= 3.35.0) + const result = this.#db + .prepare( + `DELETE FROM "${this.#tableName}" + WHERE id = ( + SELECT id FROM "${this.#tableName}" + WHERE scheduled <= ? + ORDER BY scheduled + LIMIT 1 + ) + RETURNING id, message`, ) - RETURNING id, message`, - ) - .get(now) as { id: string; message: string } | undefined; - - if (result) { - const message = this.#decodeMessage(result.message); - logger.debug("Processing message {id}...", { id: result.id, message }); - await handler(message); - logger.debug("Processed message {id}.", { id: result.id }); - - // Check for next message immediately - continue; + .get(now) as { id: string; message: string } | undefined; + + if (result) { + const message = this.#decodeMessage(result.message); + logger.debug("Processing message {id}...", { + id: result.id, + message, + }); + await handler(message); + logger.debug("Processed message {id}.", { id: result.id }); + + // Check for next message immediately + continue; + } + + // No more messages ready to process + break; + } + }; + + const onEnqueue = (event: Event) => { + const delayMs = (event as EnqueueEvent).delayMs; + if (delayMs < 1) { + poll(); + } else { + timeouts.add(setTimeout(poll, delayMs)); } + }; - // No messages available, wait before polling again - await this.#wait(this.#pollIntervalMs, signal); + channel.addEventListener("enqueue", onEnqueue); + signal?.addEventListener("abort", () => { + channel.removeEventListener("enqueue", onEnqueue); + for (const timeout of timeouts) clearTimeout(timeout); + }); + + // Initial poll + await poll(); + + // Periodic polling as fallback + while (signal == null || !signal.aborted) { + let timeout: ReturnType | undefined; + await new Promise((resolve) => { + signal?.addEventListener("abort", resolve); + timeout = setTimeout(() => { + signal?.removeEventListener("abort", resolve); + resolve(0); + }, this.#pollIntervalMs); + timeouts.add(timeout); + }); + if (timeout != null) timeouts.delete(timeout); + await poll(); } logger.debug("Stopped listening for messages on table {tableName}.", { @@ -276,25 +351,4 @@ export class SqliteMessageQueue implements MessageQueue { #decodeMessage(message: string): unknown { return JSON.parse(message); } - - #wait(ms: number, signal?: AbortSignal): Promise { - return new Promise((resolve) => { - if (signal?.aborted) { - resolve(); - return; - } - - const timer = setTimeout(() => { - signal?.removeEventListener("abort", onAbort); - resolve(); - }, ms); - - const onAbort = () => { - clearTimeout(timer); - resolve(); - }; - - signal?.addEventListener("abort", onAbort, { once: true }); - }); - } } From e975ce9fd70d8984dbb533173c5d46cc851adf0a Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Mon, 19 Jan 2026 06:43:20 +0000 Subject: [PATCH 14/28] Fix unchanged DATABASE_URL --- .github/workflows/main.yaml | 6 +++--- packages/cli/src/init/json/kv.json | 4 ++-- packages/cli/src/init/json/mq.json | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index a25156614..e48b5fbb5 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -53,7 +53,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 @@ -116,7 +116,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 @@ -157,7 +157,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 diff --git a/packages/cli/src/init/json/kv.json b/packages/cli/src/init/json/kv.json index 00d844754..fbbb79230 100644 --- a/packages/cli/src/init/json/kv.json +++ b/packages/cli/src/init/json/kv.json @@ -24,9 +24,9 @@ "@fedify/postgres": { "PostgresKvStore": "PostgresKvStore" }, "postgres": { "default": "postgres" } }, - "object": "new PostgresKvStore(postgres(process.env.DATABASE_URL))", + "object": "new PostgresKvStore(postgres(process.env.POSTGRES_URL))", "env": { - "DATABASE_URL": "postgres://postgres@localhost:5432/postgres" + "POSTGRES_URL": "postgres://postgres@localhost:5432/postgres" } }, "denokv": { diff --git a/packages/cli/src/init/json/mq.json b/packages/cli/src/init/json/mq.json index 4ec3ff9c3..2dd6143e8 100644 --- a/packages/cli/src/init/json/mq.json +++ b/packages/cli/src/init/json/mq.json @@ -44,9 +44,9 @@ "default": "postgres" } }, - "object": "new PostgresMessageQueue(postgres(process.env.DATABASE_URL))", + "object": "new PostgresMessageQueue(postgres(process.env.POSTGRES_URL))", "env": { - "DATABASE_URL": "postgres://postgres@localhost:5432/postgres" + "POSTGRES_URL": "postgres://postgres@localhost:5432/postgres" } }, "amqp": { From d44582d0ef4b8d0d149cc8687b292f77a733707e Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Mon, 19 Jan 2026 07:04:00 +0000 Subject: [PATCH 15/28] SqliteMessageQueue: implement Disposable interface and add database close method --- packages/sqlite/src/mq.test.ts | 6 ++++-- packages/sqlite/src/mq.ts | 9 ++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 081080959..3d3bf1888 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -13,8 +13,10 @@ const tableName = getRandomKey("message").replaceAll("-", "_"); testMessageQueue( "SqliteMessageQueue", () => new SqliteMessageQueue(db, { tableName }), - ({ controller }) => { + ({ mq1, mq2, controller }) => { controller.abort(); - db.close(); + mq1.drop(); + mq2.drop(); + mq1[Symbol.dispose](); }, ); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index f428a125c..137e031ca 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -62,7 +62,7 @@ export interface SqliteMessageQueueOptions { * }); * ``` */ -export class SqliteMessageQueue implements MessageQueue { +export class SqliteMessageQueue implements MessageQueue, Disposable { static readonly #defaultTableName = "fedify_message"; static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; // In-memory event emitter for notifying listeners when messages are enqueued. @@ -344,6 +344,13 @@ export class SqliteMessageQueue implements MessageQueue { this.#initialized = false; } + /** + * Closes the database connection. + */ + [Symbol.dispose](): void { + this.#db.close(); + } + #encodeMessage(message: unknown): string { return JSON.stringify(message); } From 3c01ff8a55f7c8762d162418d8287b800df56f55 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Tue, 20 Jan 2026 13:22:53 +0000 Subject: [PATCH 16/28] Refactor `testMessageQueue` usage --- CHANGES.md | 1 - packages/amqp/src/mq.test.ts | 36 +++--- packages/denokv/src/mod.test.ts | 35 +++--- packages/postgres/src/mq.test.ts | 33 +++--- packages/redis/src/mq.test.ts | 31 +++-- packages/sqlite/src/mq.test.ts | 21 ++-- packages/testing/src/mq-tester.ts | 190 ++++++++++++++---------------- 7 files changed, 162 insertions(+), 185 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7744db8fb..5ba579e3b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -297,7 +297,6 @@ To be released. [[#477], [#526] by ChanHaeng Lee] - Added `testMessageQueue()` function. - - Added `TestMessageQueueOptions` interface. - Added `waitFor()` helper function. - Added `getRandomKey()` helper function. diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 547021d03..a32bd146e 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -18,27 +18,23 @@ const connections: ChannelModel[] = []; const queue = getRandomKey("queue"); const delayedQueuePrefix = getRandomKey("delayed") + "_"; -testMessageQueue( +test( "AmqpMessageQueue", - async () => { - const conn = await getConnection(); - connections.push(conn); - return new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); - }, - async ({ controller }) => { - controller.abort(); - for (const conn of connections) { - await conn.close(); - } - }, - { - test: (name, fn) => - test(name, { - sanitizeOps: false, - sanitizeExit: false, - sanitizeResources: false, - }, fn), - }, + { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false }, + () => + testMessageQueue( + async () => { + const conn = await getConnection(); + connections.push(conn); + return new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); + }, + async ({ controller }) => { + controller.abort(); + for (const conn of connections) { + await conn.close(); + } + }, + ), ); test( diff --git a/packages/denokv/src/mod.test.ts b/packages/denokv/src/mod.test.ts index 0019ae9ac..77df71c64 100644 --- a/packages/denokv/src/mod.test.ts +++ b/packages/denokv/src/mod.test.ts @@ -135,22 +135,21 @@ Deno.test("DenoKvStore", async (t) => { const kvs: Deno.Kv[] = []; -testMessageQueue( - "DenoKvMessageQueue", - async () => { - const kv = await Deno.openKv(":memory:"); - kvs.push(kv); - return new DenoKvMessageQueue(kv); - }, - ({ controller }) => { - controller.abort(); - for (const kv of kvs) { - try { - kv.close(); - } catch { - // Ignore errors on close +Deno.test("DenoKvMessageQueue", () => + testMessageQueue( + async () => { + const kv = await Deno.openKv(":memory:"); + kvs.push(kv); + return new DenoKvMessageQueue(kv); + }, + ({ controller }) => { + controller.abort(); + for (const kv of kvs) { + try { + kv.close(); + } catch { + // Ignore errors on close + } } - } - }, - { test: Deno.test }, -); + }, + )); diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index e900c3695..b5faf61e2 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -13,22 +13,21 @@ function createSql() { return sql; } -testMessageQueue( - "PostgresMessageQueue", - () => - new PostgresMessageQueue(createSql(), { - tableName: getRandomKey("message"), - channelName: getRandomKey("channel"), - }), - async ({ mq1, mq2, controller }) => { - controller.abort(); - await mq1.drop(); - await mq2.drop(); - for (const sql of sqls) { - await sql.end(); - } - }, - { test: (name, fn) => test(name, { ignore: dbUrl == null }, fn) }, -); +test("PostgresMessageQueue", { ignore: dbUrl == null }, () => + testMessageQueue( + () => + new PostgresMessageQueue(createSql(), { + tableName: getRandomKey("message"), + channelName: getRandomKey("channel"), + }), + async ({ mq1, mq2, controller }) => { + controller.abort(); + await mq1.drop(); + await mq2.drop(); + for (const sql of sqls) { + await sql.end(); + } + }, + )); // cspell: ignore sqls diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index 03f2c20bf..c07029025 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -7,19 +7,18 @@ import { Redis } from "ioredis"; const dbUrl = process.env.REDIS_URL; -testMessageQueue( - "RedisMessageQueue", - () => - new RedisMessageQueue(() => new Redis(dbUrl!), { - pollInterval: { seconds: 1 }, - channelKey: getRandomKey("channel"), - queueKey: getRandomKey("queue"), - lockKey: getRandomKey("lock"), - }), - ({ mq1, mq2, controller }) => { - controller.abort(); - mq1[Symbol.dispose](); - mq2[Symbol.dispose](); - }, - { test: (name, fn) => test(name, { ignore: dbUrl == null }, fn) }, -); +test("RedisMessageQueue", { ignore: dbUrl == null }, () => + testMessageQueue( + () => + new RedisMessageQueue(() => new Redis(dbUrl!), { + pollInterval: { seconds: 1 }, + channelKey: getRandomKey("channel"), + queueKey: getRandomKey("queue"), + lockKey: getRandomKey("lock"), + }), + ({ mq1, mq2, controller }) => { + controller.abort(); + mq1[Symbol.dispose](); + mq2[Symbol.dispose](); + }, + )); diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 3d3bf1888..b14718401 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -1,4 +1,5 @@ import { PlatformDatabase } from "#sqlite"; +import { test } from "@fedify/fixture"; import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; import { mkdtemp } from "node:fs/promises"; @@ -10,13 +11,13 @@ const dbPath = join(dbDir, `${getRandomKey("sqlite")}.db`); const db = new PlatformDatabase(dbPath); const tableName = getRandomKey("message").replaceAll("-", "_"); -testMessageQueue( - "SqliteMessageQueue", - () => new SqliteMessageQueue(db, { tableName }), - ({ mq1, mq2, controller }) => { - controller.abort(); - mq1.drop(); - mq2.drop(); - mq1[Symbol.dispose](); - }, -); +test("SqliteMessageQueue", () => + testMessageQueue( + () => new SqliteMessageQueue(db, { tableName }), + ({ mq1, mq2, controller }) => { + controller.abort(); + mq1.drop(); + mq2.drop(); + mq1[Symbol.dispose](); + }, + )); diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts index b431eaad1..2c4434a98 100644 --- a/packages/testing/src/mq-tester.ts +++ b/packages/testing/src/mq-tester.ts @@ -1,19 +1,7 @@ import type { MessageQueue } from "@fedify/fedify"; -import { test as defaultTest } from "@fedify/fixture"; import { delay } from "es-toolkit"; import { deepStrictEqual, ok } from "node:assert/strict"; -/** - * Options for the {@link testMessageQueue} function. - */ -export interface TestMessageQueueOptions { - /** - * A custom test function to use instead of the default one. - * It should be compatible with `node:test`'s `test` function. - */ - test?: (name: string, fn: () => Promise | void) => void; -} - /** * Tests a {@link MessageQueue} implementation with a standard set of tests. * @@ -26,21 +14,22 @@ export interface TestMessageQueueOptions { * * @example * ```typescript ignore + * import { test } from "@fedify/fixture"; * import { testMessageQueue } from "@fedify/testing"; * import { MyMessageQueue } from "./my-mq.ts"; * - * testMessageQueue( - * "MyMessageQueue", - * () => new MyMessageQueue(), - * async ({ mq1, mq2, controller }) => { - * controller.abort(); - * await mq1.close(); - * await mq2.close(); - * }, + * test("MyMessageQueue", () => + * testMessageQueue( + * () => new MyMessageQueue(), + * async ({ mq1, mq2, controller }) => { + * controller.abort(); + * await mq1.close(); + * await mq2.close(); + * }, + * ) * ); * ``` * - * @param name The name of the test suite. * @param getMessageQueue A factory function that creates a new message queue * instance. It should return a new instance each time * to ensure test isolation, but both instances should @@ -48,12 +37,11 @@ export interface TestMessageQueueOptions { * @param onFinally A cleanup function called after all tests complete. * It receives both message queue instances and the abort * controller used for the listeners. - * @param options Additional options for the test. + * @returns A promise that resolves when all tests pass. */ -export default function testMessageQueue< +export default async function testMessageQueue< MQ extends MessageQueue, >( - name: string, getMessageQueue: () => MQ | Promise, onFinally: ({ mq1, @@ -64,92 +52,88 @@ export default function testMessageQueue< mq2: MQ; controller: AbortController; }) => Promise | void, - options: TestMessageQueueOptions = {}, -): void { - const test = options?.test ?? defaultTest; - test(name, async () => { - const mq1 = await getMessageQueue(); - const mq2 = await getMessageQueue(); - const controller = new AbortController(); - try { - // Set up message collection and listeners - const messages: string[] = []; - const listening1 = mq1.listen((message: string) => { - messages.push(message); - }, controller); - const listening2 = mq2.listen((message: string) => { - messages.push(message); - }, controller); - - // Test: enqueue() - await mq1.enqueue("Hello, world!"); - await waitFor(() => messages.length > 0, 15_000); - deepStrictEqual(messages, ["Hello, world!"]); +): Promise { + const mq1 = await getMessageQueue(); + const mq2 = await getMessageQueue(); + const controller = new AbortController(); + try { + // Set up message collection and listeners + const messages: string[] = []; + const listening1 = mq1.listen((message: string) => { + messages.push(message); + }, controller); + const listening2 = mq2.listen((message: string) => { + messages.push(message); + }, controller); - let started = Date.now(); - await mq1.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - await waitFor(() => messages.length > 1, 15_000); - deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); - ok( - Date.now() - started >= 3_000, - "Delayed message should be delivered after at least 3 seconds", - ); + // Test: enqueue() + await mq1.enqueue("Hello, world!"); + await waitFor(() => messages.length > 0, 15_000); + deepStrictEqual(messages, ["Hello, world!"]); - // Test: enqueueMany() (skip if not supported) - if (mq1.enqueueMany != null) { - while (messages.length > 0) messages.pop(); - const batchMessages: string[] = [ - "First batch message", - "Second batch message", - "Third batch message", - ]; - await mq1.enqueueMany(batchMessages); - await waitFor(() => messages.length >= batchMessages.length, 15_000); - deepStrictEqual(new Set(messages), new Set(batchMessages)); + let started = Date.now(); + await mq1.enqueue( + "Delayed message", + { delay: Temporal.Duration.from({ seconds: 3 }) }, + ); + await waitFor(() => messages.length > 1, 15_000); + deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); + ok( + Date.now() - started >= 3_000, + "Delayed message should be delivered after at least 3 seconds", + ); - // Test: enqueueMany() with delay - while (messages.length > 0) messages.pop(); - started = Date.now(); - const delayedBatchMessages: string[] = [ - "Delayed batch 1", - "Delayed batch 2", - ]; - await mq1.enqueueMany( - delayedBatchMessages, - { delay: Temporal.Duration.from({ seconds: 2 }) }, - ); - await waitFor( - () => messages.length >= delayedBatchMessages.length, - 15_000, - ); - deepStrictEqual(new Set(messages), new Set(delayedBatchMessages)); - ok( - Date.now() - started >= 2_000, - "Delayed batch messages should be delivered after at least 2 seconds", - ); - } + // Test: enqueueMany() (skip if not supported) + if (mq1.enqueueMany != null) { + while (messages.length > 0) messages.pop(); + const batchMessages: string[] = [ + "First batch message", + "Second batch message", + "Third batch message", + ]; + await mq1.enqueueMany(batchMessages); + await waitFor(() => messages.length >= batchMessages.length, 15_000); + deepStrictEqual(new Set(messages), new Set(batchMessages)); - // Test: bulk enqueue (stress test) + // Test: enqueueMany() with delay while (messages.length > 0) messages.pop(); - const bulkCount = 100; - for (let i = 0; i < bulkCount; i++) await mq1.enqueue(`message-${i}`); - await waitFor(() => messages.length >= bulkCount, 30_000); - const expectedMessages = new Set( - Array.from({ length: bulkCount }, (_, i) => `message-${i}`), + started = Date.now(); + const delayedBatchMessages: string[] = [ + "Delayed batch 1", + "Delayed batch 2", + ]; + await mq1.enqueueMany( + delayedBatchMessages, + { delay: Temporal.Duration.from({ seconds: 2 }) }, + ); + await waitFor( + () => messages.length >= delayedBatchMessages.length, + 15_000, + ); + deepStrictEqual(new Set(messages), new Set(delayedBatchMessages)); + ok( + Date.now() - started >= 2_000, + "Delayed batch messages should be delivered after at least 2 seconds", ); - deepStrictEqual(new Set(messages), expectedMessages); - - // Cleanup listeners - controller.abort(); - await listening1; - await listening2; - } finally { - await onFinally({ mq1, mq2, controller }); } - }); + + // Test: bulk enqueue (stress test) + while (messages.length > 0) messages.pop(); + const bulkCount = 100; + for (let i = 0; i < bulkCount; i++) await mq1.enqueue(`message-${i}`); + await waitFor(() => messages.length >= bulkCount, 30_000); + const expectedMessages = new Set( + Array.from({ length: bulkCount }, (_, i) => `message-${i}`), + ); + deepStrictEqual(new Set(messages), expectedMessages); + + // Cleanup listeners + controller.abort(); + await listening1; + await listening2; + } finally { + await onFinally({ mq1, mq2, controller }); + } } export async function waitFor( From 4a9f8886133c4d42574214266497cf2751ea52b8 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Tue, 20 Jan 2026 13:42:25 +0000 Subject: [PATCH 17/28] Removed unused dependencies --- packages/testing/package.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/testing/package.json b/packages/testing/package.json index 8813e4181..b4ffe09d7 100644 --- a/packages/testing/package.json +++ b/packages/testing/package.json @@ -50,8 +50,7 @@ "package.json" ], "peerDependencies": { - "@fedify/fedify": "workspace:^", - "@fedify/fixture": "workspace:^" + "@fedify/fedify": "workspace:^" }, "dependencies": { "es-toolkit": "catalog:" From ace78bb77467d5d86797d013a9df7e20b221a786 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 13:45:01 +0000 Subject: [PATCH 18/28] Add cleanup logic for `SqliteMessageQueue` https://github.com/fedify-dev/fedify/pull/526#discussion_r2709223608 --- packages/sqlite/src/mq.ts | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 137e031ca..f8caf5083 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -68,6 +68,8 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { // In-memory event emitter for notifying listeners when messages are enqueued. // Scoped per table name to allow multiple queues to coexist. static readonly #notifyChannels = new Map(); + // Track active instance IDs per table name for accurate cleanup + static readonly #activeInstances = new Map>(); static #getNotifyChannel(tableName: string): EventTarget { let channel = SqliteMessageQueue.#notifyChannels.get(tableName); @@ -81,6 +83,7 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { readonly #db: SqliteDatabaseAdapter; readonly #tableName: string; readonly #pollIntervalMs: number; + readonly #instanceId: string; #initialized: boolean; /** @@ -100,6 +103,7 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { this.#db = new SqliteDatabase(db); this.#initialized = options.initialized ?? false; this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName; + this.#instanceId = crypto.randomUUID(); this.#pollIntervalMs = Temporal.Duration.from( options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); @@ -109,6 +113,18 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { `Invalid table name for the message queue: ${this.#tableName}`, ); } + + // Register this instance ID for this table + this.#registerInstance(); + } + + #registerInstance(): void { + let instances = SqliteMessageQueue.#activeInstances.get(this.#tableName); + if (instances == null) { + instances = new Set(); + SqliteMessageQueue.#activeInstances.set(this.#tableName, instances); + } + instances.add(this.#instanceId); } /** @@ -349,6 +365,20 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { */ [Symbol.dispose](): void { this.#db.close(); + this.#unregisterInstance(); + } + + #unregisterInstance(): void { + const instances = SqliteMessageQueue.#activeInstances.get(this.#tableName); + if (instances == null) return; + + instances.delete(this.#instanceId); + + // If no more instances exist for this table, cleanup EventTarget to prevent memory leak + if (instances.size === 0) { + SqliteMessageQueue.#activeInstances.delete(this.#tableName); + SqliteMessageQueue.#notifyChannels.delete(this.#tableName); + } } #encodeMessage(message: unknown): string { From d4fb1b0743fbf35cc326bf27a3f3a7f3bd760ca8 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 15:09:36 +0000 Subject: [PATCH 19/28] Add mise commands for test each packages --- mise.toml | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/mise.toml b/mise.toml index bab7b41a5..ed7b46e7a 100644 --- a/mise.toml +++ b/mise.toml @@ -26,6 +26,17 @@ env = { CI = "true" } # Prevent pnpm from prompting for confirmation depends = ["install"] run = "pnpm --filter '@fedify/*' --recursive --parallel build" +[tasks.prepare-each] +description = "Prepare specific package(s)" +usage = 'arg "" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1' +env = { CI = "true" } +run = ''' +for PACKAGE in ${usage_packages}; do + echo "Preparing package: $PACKAGE" + pnpm --filter "@fedify/$PACKAGE" build +done +''' + # Code quality [tasks.check] description = "Check code formatting, linting, and type checking" @@ -65,6 +76,24 @@ fi description = "Format the codebase" run = "deno fmt && hongdown --write" +[tasks.check-each] +description = "Check code quality for specific package(s)" +usage = 'arg "" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1' +run = ''' +for PACKAGE in ${usage_packages}; do + echo "Checking package: $PACKAGE" + PACKAGE_DIR="packages/$PACKAGE" + if [ ! -d "$PACKAGE_DIR" ]; then + echo "Error: Package directory $PACKAGE_DIR not found" + exit 1 + fi + + deno fmt --check "$PACKAGE_DIR" + deno lint "$PACKAGE_DIR" + deno check "$PACKAGE_DIR"/**/*.ts +done +''' + # Testing [tasks."test:deno"] description = "Run the test suite using Deno" @@ -85,6 +114,21 @@ run = "pnpm run --recursive --filter '!{docs}' test:bun" description = "Run the test suite across all environments (Deno, Node.js, Bun)" depends = ["check", "test:deno", "test:node", "test:bun"] +[tasks.test-each] +description = "Run tests for a specific package across all environments" +usage = 'arg "" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1' +run = ''' +mise run prepare-each ${usage_packages} +mise run check-each ${usage_packages} + +for PACKAGE in ${usage_packages}; do + echo "Running tests for package: $PACKAGE" + deno task --filter "@fedify/$PACKAGE" test + pnpm --filter "@fedify/$PACKAGE" test + pnpm --filter "@fedify/$PACKAGE" test:bun +done +''' + # Documentation [tasks.docs] description = "Start the documentation development server" From 08daa47f210a9acd9f144f4041e1cbad89c1ddf0 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 15:10:03 +0000 Subject: [PATCH 20/28] Ignore plan.md --- .gitignore | 1 + .hongdown.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 73b005a5c..3300d196f 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ package-lock.json repomix-output.xml t.ts t2.ts +plan.md diff --git a/.hongdown.toml b/.hongdown.toml index fc0140961..b2d012667 100644 --- a/.hongdown.toml +++ b/.hongdown.toml @@ -8,6 +8,7 @@ exclude = [ "GEMINI.md", "WARP.md", "packages/fedify/src/cfworkers/**", + "plan.md", ] [heading] From 04b547ce814c98aca1b0112d42418f2b7639f732 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 15:13:13 +0000 Subject: [PATCH 21/28] Add retry logic for SQLITE_BUSY errors https://github.com/fedify-dev/fedify/pull/526#discussion_r2709223608 --- packages/sqlite/src/mq.ts | 195 ++++++++++++++++++++++++++++---------- 1 file changed, 146 insertions(+), 49 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index f8caf5083..f0259f012 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -40,6 +40,19 @@ export interface SqliteMessageQueueOptions { * @default `{ seconds: 5 }` */ pollInterval?: Temporal.Duration | Temporal.DurationLike; + + /** + * Maximum number of retries for SQLITE_BUSY errors. + * @default `5` + */ + maxRetries?: number; + + /** + * Initial retry delay in milliseconds for SQLITE_BUSY errors. + * Uses exponential backoff. + * @default `100` + */ + retryDelayMs?: number; } /** @@ -84,6 +97,8 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { readonly #tableName: string; readonly #pollIntervalMs: number; readonly #instanceId: string; + readonly #maxRetries: number; + readonly #retryDelayMs: number; #initialized: boolean; /** @@ -107,6 +122,8 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { this.#pollIntervalMs = Temporal.Duration.from( options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); + this.#maxRetries = options.maxRetries ?? 5; + this.#retryDelayMs = options.retryDelayMs ?? 100; if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { throw new Error( @@ -152,22 +169,22 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { logger.debug("Enqueuing a message...", { message }); } - this.#db - .prepare( - `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) - VALUES (?, ?, ?, ?)`, - ) - .run(id, encodedMessage, now, scheduled); - - logger.debug("Enqueued a message.", { message }); + return this.#retryOnBusy(() => { + this.#db + .prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ) + .run(id, encodedMessage, now, scheduled); - // Notify listeners that a message has been enqueued - const delayMs = delay.total("millisecond"); - SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( - new EnqueueEvent(delayMs), - ); + logger.debug("Enqueued a message.", { message }); - return Promise.resolve(); + // Notify listeners that a message has been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + }); } /** @@ -195,33 +212,34 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { logger.debug("Enqueuing messages...", { messages }); } - try { - this.#db.exec("BEGIN IMMEDIATE"); - - const stmt = this.#db.prepare( - `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) - VALUES (?, ?, ?, ?)`, - ); + return this.#retryOnBusy(() => { + try { + this.#db.exec("BEGIN IMMEDIATE"); - for (const message of messages) { - const id = crypto.randomUUID(); - const encodedMessage = this.#encodeMessage(message); - stmt.run(id, encodedMessage, now, scheduled); - } + const stmt = this.#db.prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ); - this.#db.exec("COMMIT"); - logger.debug("Enqueued messages.", { messages }); + for (const message of messages) { + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + stmt.run(id, encodedMessage, now, scheduled); + } - // Notify listeners that messages have been enqueued - const delayMs = delay.total("millisecond"); - SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( - new EnqueueEvent(delayMs), - ); - } catch (error) { - this.#db.exec("ROLLBACK"); - throw error; - } - return Promise.resolve(); + this.#db.exec("COMMIT"); + logger.debug("Enqueued messages.", { messages }); + + // Notify listeners that messages have been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + } catch (error) { + this.#db.exec("ROLLBACK"); + throw error; + } + }); } /** @@ -249,18 +267,20 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { // Atomically fetch and delete the oldest message that is ready to be // processed using DELETE ... RETURNING (SQLite >= 3.35.0) - const result = this.#db - .prepare( - `DELETE FROM "${this.#tableName}" - WHERE id = ( - SELECT id FROM "${this.#tableName}" - WHERE scheduled <= ? - ORDER BY scheduled - LIMIT 1 + const result = await this.#retryOnBusy(() => { + return this.#db + .prepare( + `DELETE FROM "${this.#tableName}" + WHERE id = ( + SELECT id FROM "${this.#tableName}" + WHERE scheduled <= ? + ORDER BY scheduled + LIMIT 1 + ) + RETURNING id, message`, ) - RETURNING id, message`, - ) - .get(now) as { id: string; message: string } | undefined; + .get(now) as { id: string; message: string } | undefined; + }); if (result) { const message = this.#decodeMessage(result.message); @@ -381,6 +401,83 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { } } + /** + * Checks if an error is a SQLITE_BUSY error. + * Handles different error formats from node:sqlite and bun:sqlite. + */ + #isBusyError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + + // Check error message for SQLITE_BUSY + if ( + error.message.includes("SQLITE_BUSY") || + error.message.includes("database is locked") + ) { + return true; + } + + // Check error code property (node:sqlite) + const errorWithCode = error as Error & { code?: string }; + if (errorWithCode.code === "SQLITE_BUSY") { + return true; + } + + // Check errno property (bun:sqlite) + const errorWithErrno = error as Error & { errno?: number }; + if (errorWithErrno.errno === 5) { // SQLITE_BUSY = 5 + return true; + } + + return false; + } + + /** + * Retries a database operation with exponential backoff on SQLITE_BUSY errors. + */ + async #retryOnBusy(operation: () => T): Promise { + let lastError: unknown; + + for (let attempt = 0; attempt <= this.#maxRetries; attempt++) { + try { + return operation(); + } catch (error) { + lastError = error; + + if (!this.#isBusyError(error)) { + throw error; + } + + if (attempt === this.#maxRetries) { + logger.error( + "Max retries ({maxRetries}) reached for SQLITE_BUSY error on table {tableName}.", + { + maxRetries: this.#maxRetries, + tableName: this.#tableName, + error, + }, + ); + throw error; + } + + // Exponential backoff: retryDelayMs * 2^attempt + const delayMs = this.#retryDelayMs * Math.pow(2, attempt); + logger.debug( + "SQLITE_BUSY error on table {tableName}, retrying in {delayMs}ms (attempt {attempt}/{maxRetries})...", + { + tableName: this.#tableName, + delayMs, + attempt: attempt + 1, + maxRetries: this.#maxRetries, + }, + ); + + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + + throw lastError; + } + #encodeMessage(message: unknown): string { return JSON.stringify(message); } From cd7cc3bf0b9968a763dd5514ea73bc919573aeaa Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 15:19:00 +0000 Subject: [PATCH 22/28] Add error logging to SqliteMessageQueue --- packages/sqlite/src/mq.ts | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index f0259f012..140785840 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -237,6 +237,14 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { ); } catch (error) { this.#db.exec("ROLLBACK"); + logger.error( + "Failed to enqueue messages to table {tableName}: {error}", + { + tableName: this.#tableName, + messageCount: messages.length, + error, + }, + ); throw error; } }); @@ -288,8 +296,21 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { id: result.id, message, }); - await handler(message); - logger.debug("Processed message {id}.", { id: result.id }); + try { + await handler(message); + logger.debug("Processed message {id}.", { id: result.id }); + } catch (error) { + logger.error( + "Failed to process message {id} from table {tableName}: {error}", + { + id: result.id, + tableName: this.#tableName, + message, + error, + }, + ); + throw error; + } // Check for next message immediately continue; @@ -444,6 +465,13 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { lastError = error; if (!this.#isBusyError(error)) { + logger.error( + "Database operation failed on table {tableName}: {error}", + { + tableName: this.#tableName, + error, + }, + ); throw error; } From 7ef4de7b053ade80041fdbe32f75c328da3963ec Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 19:08:34 +0000 Subject: [PATCH 23/28] Add transaction wrappers to prevent race conditions --- packages/sqlite/src/mq.ts | 127 ++++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 47 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 140785840..d76d14e32 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -212,41 +212,35 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { logger.debug("Enqueuing messages...", { messages }); } - return this.#retryOnBusy(() => { - try { - this.#db.exec("BEGIN IMMEDIATE"); - - const stmt = this.#db.prepare( - `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) - VALUES (?, ?, ?, ?)`, - ); + return this.#withTransactionRetries(() => { + const stmt = this.#db.prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ); - for (const message of messages) { - const id = crypto.randomUUID(); - const encodedMessage = this.#encodeMessage(message); - stmt.run(id, encodedMessage, now, scheduled); - } + for (const message of messages) { + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + stmt.run(id, encodedMessage, now, scheduled); + } - this.#db.exec("COMMIT"); - logger.debug("Enqueued messages.", { messages }); + logger.debug("Enqueued messages.", { messages }); - // Notify listeners that messages have been enqueued - const delayMs = delay.total("millisecond"); - SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( - new EnqueueEvent(delayMs), - ); - } catch (error) { - this.#db.exec("ROLLBACK"); - logger.error( - "Failed to enqueue messages to table {tableName}: {error}", - { - tableName: this.#tableName, - messageCount: messages.length, - error, - }, - ); - throw error; - } + // Notify listeners that messages have been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + }).catch((error) => { + logger.error( + "Failed to enqueue messages to table {tableName}: {error}", + { + tableName: this.#tableName, + messageCount: messages.length, + error, + }, + ); + throw error; }); } @@ -275,7 +269,9 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { // Atomically fetch and delete the oldest message that is ready to be // processed using DELETE ... RETURNING (SQLite >= 3.35.0) - const result = await this.#retryOnBusy(() => { + // Wrapped in BEGIN IMMEDIATE transaction to ensure proper locking + // and prevent race conditions in multi-process scenarios + const result = await this.#withTransactionRetries(() => { return this.#db .prepare( `DELETE FROM "${this.#tableName}" @@ -372,19 +368,21 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { tableName: this.#tableName, }); - this.#db.exec(` - CREATE TABLE IF NOT EXISTS "${this.#tableName}" ( - id TEXT PRIMARY KEY, - message TEXT NOT NULL, - created INTEGER NOT NULL, - scheduled INTEGER NOT NULL - ) - `); + this.#withTransaction(() => { + this.#db.exec(` + CREATE TABLE IF NOT EXISTS "${this.#tableName}" ( + id TEXT PRIMARY KEY, + message TEXT NOT NULL, + created INTEGER NOT NULL, + scheduled INTEGER NOT NULL + ) + `); - this.#db.exec(` - CREATE INDEX IF NOT EXISTS "idx_${this.#tableName}_scheduled" - ON "${this.#tableName}" (scheduled) - `); + this.#db.exec(` + CREATE INDEX IF NOT EXISTS "idx_${this.#tableName}_scheduled" + ON "${this.#tableName}" (scheduled) + `); + }); this.#initialized = true; logger.debug("Initialized the message queue table {tableName}.", { @@ -423,7 +421,7 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { } /** - * Checks if an error is a SQLITE_BUSY error. + * Checks if an error is a SQLITE_BUSY error or transaction conflict. * Handles different error formats from node:sqlite and bun:sqlite. */ #isBusyError(error: unknown): boolean { @@ -432,7 +430,8 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { // Check error message for SQLITE_BUSY if ( error.message.includes("SQLITE_BUSY") || - error.message.includes("database is locked") + error.message.includes("database is locked") || + error.message.includes("transaction within a transaction") ) { return true; } @@ -506,6 +505,40 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { throw lastError; } + /** + * Executes a database operation within a transaction. + * Automatically handles BEGIN IMMEDIATE, COMMIT, and ROLLBACK. + */ + #withTransaction(operation: () => T): T { + let transactionStarted = false; + try { + this.#db.exec("BEGIN IMMEDIATE"); + transactionStarted = true; + const result = operation(); + this.#db.exec("COMMIT"); + return result; + } catch (error) { + // Only rollback if transaction was successfully started + if (transactionStarted) { + try { + this.#db.exec("ROLLBACK"); + } catch { + // Ignore rollback errors - transaction might have been rolled back already + } + } + throw error; + } + } + + /** + * Executes a database operation within a transaction with retry logic. + * Automatically handles BEGIN IMMEDIATE, COMMIT, and ROLLBACK. + * Retries on SQLITE_BUSY errors with exponential backoff. + */ + async #withTransactionRetries(operation: () => T): Promise { + return await this.#retryOnBusy(() => this.#withTransaction(operation)); + } + #encodeMessage(message: unknown): string { return JSON.stringify(message); } From d5ee831f3f01608bd444e5e431c22bbd8c5cc3c7 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 19:19:34 +0000 Subject: [PATCH 24/28] Support journal_mode to SqliteMessageQueue --- packages/sqlite/src/mq.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index d76d14e32..092372fa5 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -53,6 +53,15 @@ export interface SqliteMessageQueueOptions { * @default `100` */ retryDelayMs?: number; + + /** + * SQLite journal mode to use. + * WAL (Write-Ahead Logging) mode is recommended for better concurrency + * in multi-process environments. + * Note: WAL mode is persistent per database file, not per connection. + * @default `"WAL"` + */ + journalMode?: "WAL" | "DELETE" | "TRUNCATE" | "PERSIST" | "MEMORY"; } /** @@ -99,6 +108,7 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { readonly #instanceId: string; readonly #maxRetries: number; readonly #retryDelayMs: number; + readonly #journalMode: string; #initialized: boolean; /** @@ -124,6 +134,7 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { ).total("millisecond"); this.#maxRetries = options.maxRetries ?? 5; this.#retryDelayMs = options.retryDelayMs ?? 100; + this.#journalMode = options.journalMode ?? "WAL"; if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { throw new Error( @@ -358,6 +369,10 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { /** * Creates the message queue table if it does not already exist. * Does nothing if the table already exists. + * + * This method also configures the SQLite journal mode for better concurrency. + * WAL (Write-Ahead Logging) mode is enabled by default to improve + * concurrent access in multi-process environments. */ initialize(): void { if (this.#initialized) { @@ -368,6 +383,10 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { tableName: this.#tableName, }); + // Set journal mode for better concurrency + // Note: This is persistent per database file and must be set outside a transaction + this.#db.exec(`PRAGMA journal_mode=${this.#journalMode}`); + this.#withTransaction(() => { this.#db.exec(` CREATE TABLE IF NOT EXISTS "${this.#tableName}" ( From 0c28f69a7db98929b0e9fa42bff1e972f1cf096c Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 20:11:31 +0000 Subject: [PATCH 25/28] Fix message queue test issues https://github.com/fedify-dev/fedify/pull/526#pullrequestreview-3689015684 - Fixed `testMessageQueue` to pass `{ signal }` options object instead of `AbortController` directly to `listen()` method - Fixed PostgreSQL test to share same table and channel between `mq1` and `mq2` instances by moving key generation outside factory function - Fixed Redis test to share same queue keys between `mq1` and `mq2` instances by moving key generation outside factory function - Fixed SQLite test cleanup to dispose both `mq1` and `mq2` instances, preventing resource leak, and removed redundant `mq2.drop()` call --- packages/postgres/src/mq.test.ts | 30 +++++++++++++++++------------- packages/redis/src/mq.test.ts | 16 ++++++++++------ packages/sqlite/src/mq.test.ts | 2 +- packages/sqlite/src/mq.ts | 11 +++++++++-- packages/testing/src/mq-tester.ts | 4 ++-- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index b5faf61e2..fd50e73f9 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -5,21 +5,24 @@ import process from "node:process"; import postgres from "postgres"; const dbUrl = process.env.POSTGRES_URL; -const sqls: postgres.Sql[] = []; -function createSql() { - const sql = postgres(dbUrl!); - sqls.push(sql); - return sql; -} +test("PostgresMessageQueue", { ignore: dbUrl == null }, () => { + const tableName = getRandomKey("message"); + const channelName = getRandomKey("channel"); + const sqls: postgres.Sql[] = []; -test("PostgresMessageQueue", { ignore: dbUrl == null }, () => - testMessageQueue( + function createSql() { + const sql = postgres(dbUrl!); + sqls.push(sql); + return sql; + } + + return testMessageQueue( () => - new PostgresMessageQueue(createSql(), { - tableName: getRandomKey("message"), - channelName: getRandomKey("channel"), - }), + new PostgresMessageQueue( + createSql(), + { tableName, channelName }, + ), async ({ mq1, mq2, controller }) => { controller.abort(); await mq1.drop(); @@ -28,6 +31,7 @@ test("PostgresMessageQueue", { ignore: dbUrl == null }, () => await sql.end(); } }, - )); + ); +}); // cspell: ignore sqls diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index c07029025..878664e82 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -7,18 +7,22 @@ import { Redis } from "ioredis"; const dbUrl = process.env.REDIS_URL; -test("RedisMessageQueue", { ignore: dbUrl == null }, () => - testMessageQueue( +test("RedisMessageQueue", { ignore: dbUrl == null }, () => { + const channelKey = getRandomKey("channel"); + const queueKey = getRandomKey("queue"); + const lockKey = getRandomKey("lock"); + return testMessageQueue( () => new RedisMessageQueue(() => new Redis(dbUrl!), { pollInterval: { seconds: 1 }, - channelKey: getRandomKey("channel"), - queueKey: getRandomKey("queue"), - lockKey: getRandomKey("lock"), + channelKey, + queueKey, + lockKey, }), ({ mq1, mq2, controller }) => { controller.abort(); mq1[Symbol.dispose](); mq2[Symbol.dispose](); }, - )); + ); +}); diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index b14718401..1258a56ea 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -17,7 +17,7 @@ test("SqliteMessageQueue", () => ({ mq1, mq2, controller }) => { controller.abort(); mq1.drop(); - mq2.drop(); mq1[Symbol.dispose](); + mq2[Symbol.dispose](); }, )); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 092372fa5..97f5e4878 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -422,8 +422,15 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { * Closes the database connection. */ [Symbol.dispose](): void { - this.#db.close(); - this.#unregisterInstance(); + try { + this.#db.close(); + this.#unregisterInstance(); + } catch (error) { + logger.error( + "Failed to close the database connection for table {tableName}: {error}", + { tableName: this.#tableName, error }, + ); + } } #unregisterInstance(): void { diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts index 2c4434a98..f8695084e 100644 --- a/packages/testing/src/mq-tester.ts +++ b/packages/testing/src/mq-tester.ts @@ -61,10 +61,10 @@ export default async function testMessageQueue< const messages: string[] = []; const listening1 = mq1.listen((message: string) => { messages.push(message); - }, controller); + }, { signal: controller.signal }); const listening2 = mq2.listen((message: string) => { messages.push(message); - }, controller); + }, { signal: controller.signal }); // Test: enqueue() await mq1.enqueue("Hello, world!"); From 0be7305af5fa677adac0f6cbcdf20352842fa95a Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 20:49:14 +0000 Subject: [PATCH 26/28] Remove throw error for handling --- packages/sqlite/src/mq.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 97f5e4878..893c7b839 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -316,7 +316,6 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { error, }, ); - throw error; } // Check for next message immediately From bca9806c9e9fef1d6ae5a7b054ea1b8bdc187618 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Wed, 21 Jan 2026 23:51:31 +0000 Subject: [PATCH 27/28] Fix error message to be clear --- packages/sqlite/src/mq.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 893c7b839..a317e1a51 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -138,7 +138,8 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { throw new Error( - `Invalid table name for the message queue: ${this.#tableName}`, + `Invalid table name for the message queue: ${this.#tableName}.\ + Only letters, digits, and underscores are allowed.`, ); } From 3a98dd5522e10bd77d479e2e6eb0d7c9deadae0f Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Thu, 22 Jan 2026 00:00:55 +0000 Subject: [PATCH 28/28] Fix memory leak from abort listener --- packages/sqlite/src/mq.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index a317e1a51..d6b703004 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -350,9 +350,13 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { while (signal == null || !signal.aborted) { let timeout: ReturnType | undefined; await new Promise((resolve) => { - signal?.addEventListener("abort", resolve); + const onAbort = () => { + signal?.removeEventListener("abort", onAbort); + resolve(undefined); + }; + signal?.addEventListener("abort", onAbort); timeout = setTimeout(() => { - signal?.removeEventListener("abort", resolve); + signal?.removeEventListener("abort", onAbort); resolve(0); }, this.#pollIntervalMs); timeouts.add(timeout);