-
-
Notifications
You must be signed in to change notification settings - Fork 967
fix(fair-queue): ensure concurrency is released when a message reaches visibility timeout to prevent concurrency leaks #2907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…s visibility timeout to prevent concurrency leaks
|
WalkthroughThe pull request refactors the message reclaim mechanism in the fair queue system. The Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/redis-worker/src/fair-queue/visibility.ts (1)
414-447: Silent fallback when stored message data is unavailable.When
storedMessageis null (either becausedataJsonis missing or JSON parsing fails), the message is still reclaimed back to the queue, but noReclaimedMessageInfois added to the result array. This means the calling code inindex.tswon't release the concurrency slot for that message.This could lead to a concurrency leak if the in-flight data hash gets corrupted or out of sync with the in-flight sorted set. Consider logging a warning when this happens to aid debugging:
💡 Suggested improvement
// Track reclaimed message for concurrency release if (storedMessage) { reclaimedMessages.push({ messageId, queueId, tenantId: storedMessage.tenantId, metadata: storedMessage.metadata, }); + } else { + this.logger.error("Cannot release concurrency for reclaimed message: stored data unavailable", { + messageId, + queueId, + }); }
🧹 Nitpick comments (2)
packages/redis-worker/src/fair-queue/types.ts (1)
144-153: Consider usingtypeinstead ofinterfaceper coding guidelines.The coding guidelines specify "Use types over interfaces for TypeScript". While this is a minor stylistic point, consider converting to a type alias for consistency:
♻️ Suggested change
-export interface ReclaimedMessageInfo { +export type ReclaimedMessageInfo = { /** Message ID */ messageId: string; /** Queue ID */ queueId: string; /** Tenant ID for concurrency release */ tenantId: string; /** Additional metadata for concurrency group extraction */ metadata?: Record<string, unknown>; -} +};packages/redis-worker/src/fair-queue/tests/visibility.test.ts (1)
720-777: Consider adding a test for the edge case when stored message data is unavailable.The current tests cover happy paths, but there's no test verifying behavior when the in-flight data hash doesn't contain the stored message (e.g., data corruption scenario). This would help ensure the silent fallback behavior is intentional and documented.
Would you like me to help draft a test case for this edge case?
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
packages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/tests/visibility.test.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/visibility.ts
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
**/*.{ts,tsx}: Always import tasks from@trigger.dev/sdk, never use@trigger.dev/sdk/v3or deprecatedclient.defineJobpattern
Every Trigger.dev task must be exported and have a uniqueidproperty with no timeouts in the run function
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/visibility.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Import from
@trigger.dev/coreusing subpaths only, never import from root
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/visibility.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/visibility.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier before committing
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/visibility.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
{packages,integrations}/**/*
📄 CodeRabbit inference engine (CLAUDE.md)
Add a changeset when modifying any public package in
packages/*orintegrations/*usingpnpm run changeset:add
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/visibility.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
packages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Tests should avoid mocks or stubs and use the helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for running unit tests
**/*.test.{ts,tsx,js,jsx}: Use vitest exclusively for testing and never mock anything - use testcontainers instead
Place test files next to source files with naming pattern: source file (e.g.,MyService.ts) →MyService.test.ts
Files:
packages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
Use testcontainers helpers (
redisTest,postgresTest,containerTest) from@internal/testcontainersfor Redis/PostgreSQL testing instead of mocks
Files:
packages/redis-worker/src/fair-queue/tests/raceConditions.test.tspackages/redis-worker/src/fair-queue/tests/visibility.test.ts
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
Applied to files:
packages/redis-worker/src/fair-queue/visibility.ts
📚 Learning: 2026-01-15T11:50:06.044Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-15T11:50:06.044Z
Learning: Applies to **/*.test.{ts,tsx} : Use testcontainers helpers (`redisTest`, `postgresTest`, `containerTest`) from `internal/testcontainers` for Redis/PostgreSQL testing instead of mocks
Applied to files:
packages/redis-worker/src/fair-queue/tests/visibility.test.ts
📚 Learning: 2026-01-15T10:48:02.673Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-15T10:48:02.673Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Tests should avoid mocks or stubs and use the helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
packages/redis-worker/src/fair-queue/tests/visibility.test.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/visibility.ts (2)
packages/redis-worker/src/fair-queue/types.ts (2)
ReclaimedMessageInfo(144-153)StoredMessage(62-79)packages/redis-worker/src/fair-queue/keyProducer.ts (1)
inflightDataKey(61-63)
packages/redis-worker/src/fair-queue/index.ts (2)
packages/redis-worker/src/fair-queue/visibility.ts (2)
shardId(548-557)queueId(529-531)packages/redis-worker/src/fair-queue/masterQueue.ts (1)
queueId(195-197)
packages/redis-worker/src/fair-queue/tests/visibility.test.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
ReclaimedMessageInfo(144-153)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (22)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (6)
packages/redis-worker/src/fair-queue/visibility.ts (2)
3-9: LGTM!The imports are correctly updated to include the new
ReclaimedMessageInfoandStoredMessagetypes needed for the enhanced reclaim functionality.
377-386: LGTM!The method signature and documentation are properly updated to reflect the new return type
Promise<ReclaimedMessageInfo[]>, clearly communicating the purpose of returning per-message info for concurrency release.packages/redis-worker/src/fair-queue/index.ts (1)
1346-1376: Well-implemented fix for concurrency leak.The implementation correctly addresses the core issue: when a message times out, its concurrency slot is now properly released. Key strengths:
- Per-message error handling ensures one failure doesn't prevent releasing other slots
- Clear comments explain the critical nature of this code path
- The
QueueDescriptoris properly constructed from the reclaimed message infoThis is a solid fix for preventing concurrency leaks on visibility timeout.
packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts (1)
627-632: LGTM!The test correctly adapts to the new
reclaimTimedOutreturn type by using.lengthto get the count of reclaimed messages. The variable rename fromreclaimedtoreclaimedMessagesimproves clarity.packages/redis-worker/src/fair-queue/tests/visibility.test.ts (2)
600-778: Comprehensive test coverage for the new reclaimTimedOut behavior.The tests thoroughly validate the key scenarios:
- Successful reclaim returns complete message info including
tenantIdandmetadata- No messages returned when nothing has timed out
- Multiple messages from different tenants are correctly reclaimed
The tests properly use
redisTestfrom@internal/testcontainersas per coding guidelines and correctly clean up resources.
5-5: LGTM!The import of
ReclaimedMessageInfotype is correctly added to support type-safe assertions in the new tests.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Review CompleteYour review story is ready! Comment !reviewfast on this PR to re-generate the story. |
Summary
Fixes a concurrency leak in the batch queue where visibility timeout reclaims do not release concurrency slots.
The bug: When a message visibility timeout expires (60s),
reclaimTimedOutputs the message back in the queue but does NOT release the concurrency slot. The messageId stays in the concurrency set (engine:batch:concurrency:tenant:{envId}), counting against the tenant limit even though the message is no longer in-flight.This causes:
SCARD >= limitThe fix:
reclaimTimedOutto capture message data (including tenantId) BEFORE releasing from in-flightReclaimedMessageInfo[]with messageId, queueId, tenantId, and metadata#reclaimTimedOutMessagesnow iterates over reclaimed messages and callsconcurrencyManager.release()for eachTest plan
should return reclaimed message info with tenantId for concurrency releaseshould return empty array when no messages have timed outshould reclaim multiple timed-out messages and return all their inforaceConditions.test.tsfor new return typerefs TRI-7049