Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4732f27
Created SqliteMessageQueue
2chanhaeng Jan 16, 2026
88e84b1
Created testMessageQueue
2chanhaeng Jan 16, 2026
e2ebd1c
Refactor message queue testso with testMessageQueue
2chanhaeng Jan 16, 2026
5b135e9
Fix types
2chanhaeng Jan 16, 2026
003bf01
Add docs
2chanhaeng Jan 16, 2026
b1f8387
Refactor listen() with DELETE ... RETURNING
2chanhaeng Jan 16, 2026
dc2a37c
Fix docs, temporal and export
2chanhaeng Jan 17, 2026
0d97e10
Fix test error
2chanhaeng Jan 17, 2026
ada584e
Remove initialize from TestMessageQueueOptions
2chanhaeng Jan 18, 2026
5864531
Fix comment about
2chanhaeng Jan 18, 2026
d0aa1b7
Add note about SqliteMessageQueue's DELETE ... RETURNING requirement
2chanhaeng Jan 18, 2026
67c44bb
Fix
2chanhaeng Jan 18, 2026
800a880
Optimize SqliteMessageQueue.listen
2chanhaeng Jan 18, 2026
e975ce9
Fix unchanged DATABASE_URL
2chanhaeng Jan 19, 2026
d44582d
SqliteMessageQueue: implement Disposable interface and add database c…
2chanhaeng Jan 19, 2026
3c01ff8
Refactor `testMessageQueue` usage
2chanhaeng Jan 20, 2026
4a9f888
Removed unused dependencies
2chanhaeng Jan 20, 2026
ace78bb
Add cleanup logic for `SqliteMessageQueue`
2chanhaeng Jan 21, 2026
d4fb1b0
Add mise commands for test each packages
2chanhaeng Jan 21, 2026
08daa47
Ignore plan.md
2chanhaeng Jan 21, 2026
04b547c
Add retry logic for SQLITE_BUSY errors
2chanhaeng Jan 21, 2026
cd7cc3b
Add error logging to SqliteMessageQueue
2chanhaeng Jan 21, 2026
7ef4de7
Add transaction wrappers to prevent race conditions
2chanhaeng Jan 21, 2026
d5ee831
Support journal_mode to SqliteMessageQueue
2chanhaeng Jan 21, 2026
0c28f69
Fix message queue test issues
2chanhaeng Jan 21, 2026
0be7305
Remove throw error for handling
2chanhaeng Jan 21, 2026
bca9806
Fix error message to be clear
2chanhaeng Jan 21, 2026
3a98dd5
Fix memory leak from abort listener
2chanhaeng Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -157,7 +157,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ package-lock.json
repomix-output.xml
t.ts
t2.ts
plan.md
1 change: 1 addition & 0 deletions .hongdown.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exclude = [
"GEMINI.md",
"WARP.md",
"packages/fedify/src/cfworkers/**",
"plan.md",
]

[heading]
Expand Down
25 changes: 25 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,31 @@ To be released.

[#437]: https://github.com/fedify-dev/fedify/issues/437

### @fedify/sqlite

- Added `SqliteMessageQueue` class implementing `MessageQueue` interface
using SQLite as the backing store. This implementation uses polling to
check for new messages and is suitable for single-node deployments and
development environments. [[#477], [#526] by ChanHaeng Lee]

- Added `SqliteMessageQueue` class.
- Added `SqliteMessageQueueOptions` interface.

[#477]: https://github.com/fedify-dev/fedify/issues/477
[#526]: https://github.com/fedify-dev/fedify/pull/526

### @fedify/testing

- Added `testMessageQueue()` utility function for standardized testing of
`MessageQueue` implementations. This function provides a reusable test
harness that covers common message queue operations including `enqueue()`,
`enqueue()` with delay, `enqueueMany()`, and multiple listener scenarios.
[[#477], [#526] by ChanHaeng Lee]

- Added `testMessageQueue()` function.
- Added `waitFor()` helper function.
- Added `getRandomKey()` helper function.


Version 1.10.1
--------------
Expand Down
112 changes: 112 additions & 0 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,115 @@ const federation = createFederation({
[`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue
[RabbitMQ]: https://www.rabbitmq.com/

### `SqliteMessageQueue`

*This API is available since Fedify 2.0.0.*

To use [`SqliteMessageQueue`], you need to install the *@fedify/sqlite* package
first:

::: code-group

~~~~ bash [Deno]
deno add jsr:@fedify/sqlite
~~~~

~~~~ bash [npm]
npm add @fedify/sqlite
~~~~

~~~~ bash [pnpm]
pnpm add @fedify/sqlite
~~~~

~~~~ bash [Yarn]
yarn add @fedify/sqlite
~~~~

~~~~ bash [Bun]
bun add @fedify/sqlite
~~~~

:::

[`SqliteMessageQueue`] is a message queue implementation that uses SQLite as
the backend. It uses polling to check for new messages and is designed for
single-node deployments. It's suitable for development, testing, and
small-scale production use where simplicity is preferred over high throughput.
It uses native sqlite modules, [`node:sqlite`] for Node.js and Deno,
[`bun:sqlite`] for Bun.

Best for
: Development and testing.

Pros
: Simple, persistent with minimal configuration.

Cons
: Limited scalability, not suitable for high-traffic production.

> [!NOTE]
> `SqliteMessageQueue` uses `DELETE ... RETURNING` to atomically fetch and
> delete the oldest message that is ready to be processed. This requires
> SQLite 3.35.0 or later.

::: code-group

~~~~ typescript twoslash [Deno]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { DatabaseSync } from "node:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new DatabaseSync(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

~~~~ typescript twoslash [Node.js]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { DatabaseSync } from "node:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new DatabaseSync(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

~~~~ typescript [Bun]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { Database } from "bun:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new Database(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

:::

[`SqliteMessageQueue`]: https://jsr.io/@fedify/sqlite/doc/mq/~/SqliteMessageQueue

### `WorkersMessageQueue` (Cloudflare Workers only)

*This API is available since Fedify 1.6.0.*
Expand Down Expand Up @@ -659,6 +768,9 @@ The following implementations do not yet support native retry:
[`AmqpMessageQueue`]
: Native retry support planned for future release.

[`SqliteMessageQueue`]
: No native retry support (`~MessageQueue.nativeRetrial` is `false`).

`ParallelMessageQueue` inherits the `~MessageQueue.nativeRetrial` value from
the wrapped queue.

Expand Down
44 changes: 44 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ env = { CI = "true" } # Prevent pnpm from prompting for confirmation
depends = ["install"]
run = "pnpm --filter '@fedify/*' --recursive --parallel build"

[tasks.prepare-each]
description = "Prepare specific package(s)"
usage = 'arg "<packages>" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1'
env = { CI = "true" }
run = '''
for PACKAGE in ${usage_packages}; do
echo "Preparing package: $PACKAGE"
pnpm --filter "@fedify/$PACKAGE" build
done
'''

# Code quality
[tasks.check]
description = "Check code formatting, linting, and type checking"
Expand Down Expand Up @@ -65,6 +76,24 @@ fi
description = "Format the codebase"
run = "deno fmt && hongdown --write"

[tasks.check-each]
description = "Check code quality for specific package(s)"
usage = 'arg "<packages>" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1'
run = '''
for PACKAGE in ${usage_packages}; do
echo "Checking package: $PACKAGE"
PACKAGE_DIR="packages/$PACKAGE"
if [ ! -d "$PACKAGE_DIR" ]; then
echo "Error: Package directory $PACKAGE_DIR not found"
exit 1
fi

deno fmt --check "$PACKAGE_DIR"
deno lint "$PACKAGE_DIR"
deno check "$PACKAGE_DIR"/**/*.ts
done
'''

# Testing
[tasks."test:deno"]
description = "Run the test suite using Deno"
Expand All @@ -85,6 +114,21 @@ run = "pnpm run --recursive --filter '!{docs}' test:bun"
description = "Run the test suite across all environments (Deno, Node.js, Bun)"
depends = ["check", "test:deno", "test:node", "test:bun"]

[tasks.test-each]
description = "Run tests for a specific package across all environments"
usage = 'arg "<packages>" help="Package name(s) (without @fedify/ prefix)" var=#true var_min=1'
run = '''
mise run prepare-each ${usage_packages}
mise run check-each ${usage_packages}

for PACKAGE in ${usage_packages}; do
echo "Running tests for package: $PACKAGE"
deno task --filter "@fedify/$PACKAGE" test
pnpm --filter "@fedify/$PACKAGE" test
pnpm --filter "@fedify/$PACKAGE" test:bun
done
'''

# Documentation
[tasks.docs]
description = "Start the documentation development server"
Expand Down
1 change: 1 addition & 0 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
},
"devDependencies": {
"@alinea/suite": "^0.6.3",
"@fedify/testing": "workspace:^",
"@js-temporal/polyfill": "catalog:",
"@std/assert": "catalog:",
"@std/async": "catalog:",
Expand Down
Loading
Loading