-
Notifications
You must be signed in to change notification settings - Fork 7
KafkaJS adapter #397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
KafkaJS adapter #397
Conversation
📝 WalkthroughWalkthroughThis PR introduces a complete new KafkaJS integration package for the message queue toolkit. It adds CI/CD workflow configuration, package setup, core publisher and consumer implementations with batch processing support, a strongly-typed handler routing framework, message batching utilities, comprehensive test suites, and documentation for both the existing Kafka package and the new KafkaJS package. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Consumer as KafkaConsumer
participant Handler as Handler/Router
participant Kafka as Kafka Broker
participant Metrics as MetricsManager
participant Spy as HandlerSpy
App->>Consumer: init()
Consumer->>Kafka: connect admin & consumer
Consumer->>Kafka: subscribe to topics
Consumer->>Consumer: start message loop
Kafka-->>Consumer: message received
Consumer->>Consumer: transformMessage()
Consumer->>Consumer: parseMessages() + schema validation
Consumer->>Handler: resolve handler for topic
alt Batch Processing Enabled
Consumer->>Consumer: accumulate messages (KafkaMessageBatchStream)
Consumer->>Handler: call handler(batch[], context, requestContext)
else Single Message Mode
Consumer->>Handler: call handler(message, context, requestContext)
end
Handler-->>Consumer: processing complete
Consumer->>Spy: recordHandlerSuccess()
Consumer->>Metrics: push processing metrics
Consumer->>Kafka: commitOffset()
alt Error During Processing
Handler-->>Consumer: error thrown
Consumer->>Consumer: retry with exponential backoff
Consumer->>Spy: recordHandlerError()
Consumer->>Metrics: push error metrics
end
App->>Consumer: close()
Consumer->>Kafka: disconnect consumer & admin
sequenceDiagram
participant App as Application
participant Publisher as KafkaPublisher
participant Schema as Schema Validator
participant Kafka as Kafka Broker
participant Metrics as MetricsManager
participant Spy as HandlerSpy
App->>Publisher: publish(topic, message)
alt First Publish (Lazy Init)
Publisher->>Kafka: lazy init - connect producer
Publisher->>Kafka: create topics if autocreateTopics enabled
end
Publisher->>Schema: validateMessage(message)
alt Schema Validation Fails
Schema-->>Publisher: validation error
Publisher-->>App: throw InternalError
end
Publisher->>Publisher: transformMessage()
Publisher->>Publisher: add request context headers (x-request-id)
Publisher->>Kafka: send(topic, message, headers)
Kafka-->>Publisher: success
Publisher->>Spy: recordPublished()
Publisher->>Metrics: record metrics
Publisher-->>App: return
App->>Publisher: close()
Publisher->>Kafka: disconnect producer
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@packages/kafkajs/lib/AbstractKafkaConsumer.ts`:
- Around line 194-211: The batch mode never commits offsets because
KafkaMessageBatchStream currently sends only DeserializedMessage to the batch
callback; update the flow so the batch handler can commit offsets after
processing: modify the eachMessage handler (and/or transformMessage) to pass
through the original payload metadata (topic, partition, offset) into the items
written to messageBatchStream (or change KafkaMessageBatchStream to accept a
wrapper type that includes payload metadata), then in the batch callback invoked
by messageBatchStream (the function that calls consume(...)), after await
this.consume(batch.topic, batch.messages) successfully resolves, compute the
highest offset per partition from the batch and call
this.commitOffset(payloadInfo) (or a new helper that commits the last offset+1
per partition) so offsets are explicitly committed in batchProcessingEnabled
mode; ensure commit logic uses the same commitOffset method used in non-batch
mode.
In `@packages/kafkajs/package.json`:
- Around line 59-63: The peer dependency for zod is too broad and allows v3
which lacks the v4 export used in lib/types.ts and
lib/handler-routing/KafkaHandlerConfig.ts; update the "zod" entry in
peerDependencies of package.json from ">=3.25.76 <5.0.0" to a 4.x-only range
(for example ">=4.0.0 <5.0.0" or "^4.0.0") so consumers are required to install
Zod v4 compatible with the zod/v4 imports.
In `@packages/kafkajs/README.md`:
- Around line 93-105: The README's KafkaConfig snippet uses placeholder types
for ssl and retry; update the type annotations to match the implementation by
replacing "boolean | TLSConfig" with KafkaJsConfig['ssl'] and "RetryOptions"
with KafkaJsConfig['retry'] in the KafkaConfig block so the documented
KafkaConfig matches the actual type usage (referencing KafkaJsConfig as used in
the codebase).
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts`:
- Around line 1-5: The test uses waitAndRetry with a predicate that waits for
errorSpy to have calls but then asserts expect(errorSpy).not.toHaveBeenCalled(),
which is contradictory; update the tests to use a deterministic delay (e.g.,
sleep or waitAndRetry with a fixed timeout function) to give the consumer time
to process instead of waiting for errorSpy to be called, then assert that
errorSpy was not called; specifically modify the usages of waitAndRetry and
predicates around errorSpy in PermissionBatchConsumer.spec (the blocks using
waitAndRetry(() => errorSpy.mock.calls.length > 0) and the similar block later)
so they wait a fixed short interval and then run
expect(errorSpy).not.toHaveBeenCalled().
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.ts`:
- Around line 12-18: The import of PERMISSION_TOPIC_MESSAGES_CONFIG is currently
type-only; remove the leading `type` so it is imported as a runtime value (e.g.,
change the import list to include PERMISSION_TOPIC_MESSAGES_CONFIG without
`type`) so it can be used with `typeof PERMISSION_TOPIC_MESSAGES_CONFIG` where
referenced; leave the other `type` imports (PermissionAdded, PermissionRemoved)
as type-only.
In `@packages/kafkajs/test/consumer/PermissionConsumer.ts`:
- Around line 12-18: The import for PERMISSION_TOPIC_MESSAGES_CONFIG is
currently type-only but is later used with typeof (in the usages around the
PERMISSION_TOPIC_MESSAGES_CONFIG references on lines where typeof is applied),
which fails at compile time; remove the leading type keyword so
PERMISSION_TOPIC_MESSAGES_CONFIG is imported as a runtime value (leave type-only
imports like PermissionAdded and PermissionRemoved as-is) so the typeof
PERMISSION_TOPIC_MESSAGES_CONFIG expressions resolve correctly.
🧹 Nitpick comments (5)
packages/kafkajs/lib/utils/safeJsonDeserializer.ts (1)
1-11: Return type may not reflect actual behavior.The return type is
object | undefined, butJSON.parsecan return primitives (e.g.,JSON.parse('"hello"')returns"hello",JSON.parse('42')returns42). If the intent is to only accept JSON objects, consider adding a type guard after parsing.💡 Optional fix to enforce object return type
export const safeJsonDeserializer = (data?: Buffer | string | null): object | undefined => { if (!data) return undefined // Checking to be safe if (!Buffer.isBuffer(data) && typeof data !== 'string') return undefined try { - return JSON.parse(data.toString('utf-8')) + const parsed = JSON.parse(data.toString('utf-8')) + if (typeof parsed !== 'object' || parsed === null) return undefined + return parsed } catch (_) { return undefined } }packages/kafkajs/lib/utils/KafkaMessageBatchStream.spec.ts (1)
6-51: Consider closing the stream to clear the long timeout.With
timeoutMilliseconds: 10000, the last partial batch leaves a pending timer and a late flush. Ending the stream avoids open handles and late mutations.🧹 Possible cleanup (updates expectation)
@@ -import { setTimeout } from 'node:timers/promises' +import { once } from 'node:events' +import { setTimeout } from 'node:timers/promises' @@ for (const message of messages) { batchStream.write(message) } + batchStream.end() + await once(batchStream, 'finish') + await dataFetchingPromise @@ expect(receivedBatches).toEqual([ { topic, partition: 0, messages: [messages[0], messages[1], messages[2]] }, { topic, partition: 0, messages: [messages[3], messages[4], messages[5]] }, { topic, partition: 0, messages: [messages[6], messages[7], messages[8]] }, + { topic, partition: 0, messages: [messages[9]] }, ])packages/kafkajs/lib/AbstractKafkaPublisher.ts (1)
26-85: Memoizeinit()promise to avoid redundant concurrentproducer.connect()calls.While KafkaJS's
producer.connect()is idempotent and safe for concurrent calls, the current implementation with a boolean flag permits multiple concurrent invocations from concurrentpublish()calls. Memoizing the in-flight init promise would eliminate unnecessary concurrent attempts and clear it on error or close.♻️ Suggested refactor
export abstract class AbstractKafkaPublisher< TopicsConfig extends TopicConfig[], > extends AbstractKafkaService<TopicsConfig, KafkaPublisherOptions<TopicsConfig>> { private readonly schemaContainers: Record<string, MessageSchemaContainer<object>> private readonly kafka: Kafka private readonly producer: Producer private isInitiated: boolean + private initPromise?: Promise<void> @@ async init(): Promise<void> { if (this.isInitiated) return - - try { - await this.producer.connect() - this.isInitiated = true - } catch (e) { - throw new InternalError({ - message: 'Producer init failed', - errorCode: 'KAFKA_PRODUCER_INIT_ERROR', - cause: e, - }) - } + if (!this.initPromise) { + this.initPromise = (async () => { + try { + await this.producer.connect() + this.isInitiated = true + } catch (e) { + this.initPromise = undefined + throw new InternalError({ + message: 'Producer init failed', + errorCode: 'KAFKA_PRODUCER_INIT_ERROR', + cause: e, + }) + } + })() + } + await this.initPromise } @@ async close(): Promise<void> { if (!this.isInitiated) return await this.producer.disconnect() this.isInitiated = false + this.initPromise = undefined }packages/kafkajs/lib/AbstractKafkaConsumer.ts (2)
282-301: String-based error detection is fragile but may be necessary.The error handling relies on matching substrings in error messages (
'rebalance','not a member','generation'). This is fragile as error message text could change across KafkaJS versions.Consider extracting these patterns to a constant or helper function for easier maintenance, and add a comment explaining why string matching is used:
💡 Suggested improvement
+// KafkaJS doesn't expose typed error codes for rebalance-related failures, +// so we match on known error message patterns +const REBALANCE_ERROR_PATTERNS = ['rebalance', 'not a member', 'generation'] as const + +function isRebalanceError(error: Error): boolean { + const message = error.message ?? '' + return REBALANCE_ERROR_PATTERNS.some((pattern) => message.includes(pattern)) +}
415-437: Consider longer backoff delays for retry resilience.The current exponential backoff produces very short delays:
- Retry 1: 1ms (
2^0)- Retry 2: 2ms (
2^1)These delays may be too short for transient failures (e.g., brief network hiccups, temporary resource contention) to recover. Consider using a larger base or multiplier:
💡 Suggested adjustment
- if (retries > 0) await setTimeout(Math.pow(2, retries - 1)) + // Exponential backoff: 100ms, 200ms, 400ms... + if (retries > 0) await setTimeout(100 * Math.pow(2, retries - 1))Given the TODO comment about implementing a proper retry mechanism with DLQ, this is acceptable for now as an interim solution.
| if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { | ||
| this.messageBatchStream = new KafkaMessageBatchStream< | ||
| DeserializedMessage<SupportedMessageValues<TopicsConfig>> | ||
| >( | ||
| (batch) => | ||
| this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), | ||
| this.options.batchProcessingOptions, | ||
| ) | ||
|
|
||
| await this.consumer.run({ | ||
| autoCommit: false, | ||
| eachMessage: async (payload) => { | ||
| const message = this.transformMessage(payload) | ||
| if (message) { | ||
| this.messageBatchStream!.write(message) | ||
| } | ||
| }, | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, find KafkaMessageBatchStream implementation
find packages -name "*.ts" -type f | head -20Repository: kibertoad/message-queue-toolkit
Length of output: 997
🏁 Script executed:
# Search for KafkaMessageBatchStream class definition
rg -n "class KafkaMessageBatchStream" --type=tsRepository: kibertoad/message-queue-toolkit
Length of output: 262
🏁 Script executed:
# Search for KafkaMessageBatchStream to understand its location
rg -l "KafkaMessageBatchStream" --type=tsRepository: kibertoad/message-queue-toolkit
Length of output: 386
🏁 Script executed:
# Look for commitOffset usage in batch context and the consume method
rg -n "commitOffset|this\.consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -B2 -A2Repository: kibertoad/message-queue-toolkit
Length of output: 2676
🏁 Script executed:
cat -n packages/kafkajs/lib/utils/KafkaMessageBatchStream.tsRepository: kibertoad/message-queue-toolkit
Length of output: 5535
🏁 Script executed:
# Also check the consume method to see if it handles commits
rg -n "async consume|private consume|protected consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -A15Repository: kibertoad/message-queue-toolkit
Length of output: 721
🏁 Script executed:
rg -n "private async consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -A50Repository: kibertoad/message-queue-toolkit
Length of output: 2104
🏁 Script executed:
# Also check if there's any offset commit in batch context or elsewhere
rg -n "this\.commitOffset\|commitOffsets" packages/kafkajs/lib/AbstractKafkaConsumer.ts -B2 -A2Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
# Search for commitOffset without escaped pipes
rg -n "commitOffset" packages/kafkajs/lib/AbstractKafkaConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 250
🏁 Script executed:
# Check the full context around line 219 in the non-batch mode to understand the commit flow
sed -n '210,230p' packages/kafkajs/lib/AbstractKafkaConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 648
🏁 Script executed:
# Look at the entire batch processing vs non-batch processing section
sed -n '194,225p' packages/kafkajs/lib/AbstractKafkaConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 1196
Offset commits are missing in batch processing mode.
In non-batch mode (lines 212-223), commitOffset(payload) is explicitly called after consuming each message. In batch mode (lines 194-211), messages are written to messageBatchStream but neither the stream handler nor the batch callback invokes offset commits.
The consume() method doesn't commit offsets—it processes messages and returns. The KafkaMessageBatchStream itself is just a batching utility that collects messages and invokes the callback; it has no access to the payload information (topic, partition, offset) required by commitOffset(). This means offsets are never committed in batch processing with autoCommit: false, leading to message reprocessing after consumer restarts or rebalances.
The batch callback (line 198-199) or the eachMessage handler (lines 206-210) needs to explicitly commit offsets for the processed batch.
🤖 Prompt for AI Agents
In `@packages/kafkajs/lib/AbstractKafkaConsumer.ts` around lines 194 - 211, The
batch mode never commits offsets because KafkaMessageBatchStream currently sends
only DeserializedMessage to the batch callback; update the flow so the batch
handler can commit offsets after processing: modify the eachMessage handler
(and/or transformMessage) to pass through the original payload metadata (topic,
partition, offset) into the items written to messageBatchStream (or change
KafkaMessageBatchStream to accept a wrapper type that includes payload
metadata), then in the batch callback invoked by messageBatchStream (the
function that calls consume(...)), after await this.consume(batch.topic,
batch.messages) successfully resolves, compute the highest offset per partition
from the batch and call this.commitOffset(payloadInfo) (or a new helper that
commits the last offset+1 per partition) so offsets are explicitly committed in
batchProcessingEnabled mode; ensure commit logic uses the same commitOffset
method used in non-batch mode.
| "peerDependencies": { | ||
| "@message-queue-toolkit/core": ">=23.0.0", | ||
| "@message-queue-toolkit/schemas": ">=7.0.0", | ||
| "zod": ">=3.25.76 <5.0.0" | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for Zod 4-specific APIs in the kafkajs package
echo "=== Checking for Zod 4-specific API usage ==="
rg -n --type=ts 'z\.(enum|record|strictObject|looseObject|templateLiteral|transform)\s*\(' packages/kafkajs/ || echo "No matches found"
echo ""
echo "=== Checking all Zod usage in kafkajs ==="
rg -n 'from.*zod|import.*zod' packages/kafkajs/ --type=ts || echo "No imports found"
echo ""
echo "=== Listing TypeScript files in kafkajs/src ==="
find packages/kafkajs -name "*.ts" -type f | head -20Repository: kibertoad/message-queue-toolkit
Length of output: 1805
🏁 Script executed:
#!/bin/bash
echo "=== Check for 'zod/v4' import details ==="
rg -B2 -A2 "from.*zod/v4" packages/kafkajs/ --type=ts | head -40
echo ""
echo "=== Check if there are any generic 'from zod' imports ==="
rg "from ['\"]zod['\"]" packages/kafkajs/ --type=ts || echo "No generic zod imports found"
echo ""
echo "=== Check package.json for any Zod compatibility notes ==="
cat packages/kafkajs/package.json | jq '.description, .keywords' 2>/dev/null || cat packages/kafkajs/package.json | grep -A5 -B5 zodRepository: kibertoad/message-queue-toolkit
Length of output: 2843
Narrow Zod peer dependency to require version 4.x only.
The code explicitly imports from zod/v4 (a versioned export path specific to Zod 4.x) in production files (lib/types.ts, lib/handler-routing/KafkaHandlerConfig.ts) and test files. The zod/v4 export does not exist in Zod 3.x, so users installing this package with Zod 3.x will encounter module resolution failures at runtime. Update the peer dependency from >=3.25.76 <5.0.0 to >=4.0.0 <5.0.0 (or ^4.0.0).
🤖 Prompt for AI Agents
In `@packages/kafkajs/package.json` around lines 59 - 63, The peer dependency for
zod is too broad and allows v3 which lacks the v4 export used in lib/types.ts
and lib/handler-routing/KafkaHandlerConfig.ts; update the "zod" entry in
peerDependencies of package.json from ">=3.25.76 <5.0.0" to a 4.x-only range
(for example ">=4.0.0 <5.0.0" or "^4.0.0") so consumers are required to install
Zod v4 compatible with the zod/v4 imports.
| ### KafkaConfig | ||
|
|
||
| ```typescript | ||
| type KafkaConfig = { | ||
| brokers: string[] // List of Kafka broker addresses | ||
| clientId: string // Client identifier | ||
| ssl?: boolean | TLSConfig // SSL configuration | ||
| sasl?: SASLOptions // SASL authentication | ||
| connectionTimeout?: number // Connection timeout in ms | ||
| requestTimeout?: number // Request timeout in ms | ||
| retry?: RetryOptions // Retry configuration | ||
| } | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the KafkaJS package structure
git ls-files packages/kafkajs/ | head -20Repository: kibertoad/message-queue-toolkit
Length of output: 1053
🏁 Script executed:
# Look for type definition files in the KafkaJS package
fd -e "ts" -e "d.ts" packages/kafkajs/ | grep -E "(type|config)" | head -20Repository: kibertoad/message-queue-toolkit
Length of output: 448
🏁 Script executed:
# Search for TLSConfig definition
rg "type TLSConfig|interface TLSConfig|export.*TLSConfig" packages/kafkajs/Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
# Search for RetryOptions definition
rg "type RetryOptions|interface RetryOptions|export.*RetryOptions" packages/kafkajs/Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
# Search for KafkaConfig definition
rg "type KafkaConfig|interface KafkaConfig|export.*KafkaConfig" packages/kafkajs/Repository: kibertoad/message-queue-toolkit
Length of output: 275
🏁 Script executed:
# Read the actual KafkaConfig type definition
cat -n packages/kafkajs/lib/types.tsRepository: kibertoad/message-queue-toolkit
Length of output: 2376
🏁 Script executed:
# Also check the test context to see how KafkaConfig is used
cat -n packages/kafkajs/test/utils/testContext.tsRepository: kibertoad/message-queue-toolkit
Length of output: 3503
🏁 Script executed:
# Verify the exact README content at lines 93-105
sed -n '93,105p' packages/kafkajs/README.mdRepository: kibertoad/message-queue-toolkit
Length of output: 521
Update KafkaConfig type annotations in README to match actual implementation.
The README currently documents ssl as boolean | TLSConfig and retry as RetryOptions, but the actual KafkaConfig type in packages/kafkajs/lib/types.ts uses KafkaJsConfig['ssl'] and KafkaJsConfig['retry'] (where KafkaJsConfig is imported from the 'kafkajs' package). The placeholder type names should be replaced with the actual indexed type access to avoid documentation drift.
✏️ Doc alignment tweak
@@
- ssl?: boolean | TLSConfig // SSL configuration
+ ssl?: KafkaJsConfig['ssl'] // SSL configuration (KafkaJS)
@@
- retry?: RetryOptions // Retry configuration
+ retry?: KafkaJsConfig['retry'] // Retry configuration (KafkaJS)🤖 Prompt for AI Agents
In `@packages/kafkajs/README.md` around lines 93 - 105, The README's KafkaConfig
snippet uses placeholder types for ssl and retry; update the type annotations to
match the implementation by replacing "boolean | TLSConfig" with
KafkaJsConfig['ssl'] and "RetryOptions" with KafkaJsConfig['retry'] in the
KafkaConfig block so the documented KafkaConfig matches the actual type usage
(referencing KafkaJsConfig as used in the codebase).
| import { randomUUID } from 'node:crypto' | ||
| import { waitAndRetry } from '@lokalise/universal-ts-utils/node' | ||
| import { Kafka } from 'kafkajs' | ||
| import { afterAll, expect, type MockInstance } from 'vitest' | ||
| import z from 'zod/v4' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the test file
find . -name "PermissionBatchConsumer.spec.ts" -type fRepository: kibertoad/message-queue-toolkit
Length of output: 202
🏁 Script executed:
# Read the kafkajs version of the test file to see the full context
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | head -100Repository: kibertoad/message-queue-toolkit
Length of output: 3874
🏁 Script executed:
# Continue reading the file to find the waitAndRetry usage mentioned in the review
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | sed -n '250,360p'Repository: kibertoad/message-queue-toolkit
Length of output: 4646
🏁 Script executed:
# Now let's understand what waitAndRetry does by searching for its documentation or implementation
web_search
What is the behavior of waitAndRetry from `@lokalise/universal-ts-utils/node`?Repository: kibertoad/message-queue-toolkit
Length of output: 168
🏁 Script executed:
# Search for other uses of waitAndRetry in the test files
rg -n "waitAndRetry" ./packages/kafkajs/test/ -A 2 -B 2Repository: kibertoad/message-queue-toolkit
Length of output: 1930
🌐 Web query:
@lokalise/universal-ts-utils waitAndRetry function documentation behavior
💡 Result:
The waitAndRetry helper repeatedly calls a predicate (sync or async) until it returns a truthy value or the maximum retry count is reached. It sleeps between attempts (sleepTime) and resolves with whatever the last attempt returned (truthy value if succeeded, otherwise the last falsy value); callers should handle any thrown errors. [1][2]
🏁 Script executed:
# Let's check the package.json to see what version is being used
grep -A 5 "@lokalise/universal-ts-utils" ./packages/kafkajs/package.jsonRepository: kibertoad/message-queue-toolkit
Length of output: 289
🏁 Script executed:
# Let's look at the exact implementation or more documentation about waitAndRetry
npm info `@lokalise/universal-ts-utils`@4.8.0 2>/dev/null || echo "npm not available"Repository: kibertoad/message-queue-toolkit
Length of output: 1441
🏁 Script executed:
# Check if there's a similar issue or comment in the other test file
cat -n ./packages/kafkajs/test/consumer/PermissionConsumer.spec.ts | sed -n '280,295p'Repository: kibertoad/message-queue-toolkit
Length of output: 655
🏁 Script executed:
# Let's check the github repo or source to understand waitAndRetry better
curl -s https://raw.githubusercontent.com/lokalise/shared-ts-libs/main/packages/universal-ts-utils/src/index.ts 2>/dev/null | grep -A 20 "waitAndRetry" | head -30Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
# Alternative: Check if there's any documentation or usage in this repo that clarifies the expected behavior
rg -n "waitAndRetry" . -B 5 -A 5 --type ts --max-count=3 | head -50Repository: kibertoad/message-queue-toolkit
Length of output: 4559
🏁 Script executed:
# Let's look at the GitHub repo source for waitAndRetry to be absolutely certain
curl -s https://raw.githubusercontent.com/lokalise/shared-ts-libs/main/packages/universal-ts-utils/src/sleep.ts 2>/dev/nullRepository: kibertoad/message-queue-toolkit
Length of output: 89
🏁 Script executed:
# Alternative: check the documentation in the package
python3 << 'EOF'
import json
import urllib.request
# Fetch the package info
try:
with urllib.request.urlopen('https://registry.npmjs.org/@lokalise/universal-ts-utils/4.8.0') as response:
data = json.loads(response.read().decode())
# Look for description
print("Package description:", data.get('description', 'N/A'))
print("Repository:", data.get('repository', {}).get('url', 'N/A'))
except Exception as e:
print(f"Error: {e}")
EOFRepository: kibertoad/message-queue-toolkit
Length of output: 209
🏁 Script executed:
# Let me verify the logic one more time by looking at what the test is actually testing
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | sed -n '329,360p'Repository: kibertoad/message-queue-toolkit
Length of output: 1236
🏁 Script executed:
# Also check the similar test in PermissionConsumer.spec.ts
cat -n ./packages/kafkajs/test/consumer/PermissionConsumer.spec.ts | sed -n '270,293p'Repository: kibertoad/message-queue-toolkit
Length of output: 863
waitAndRetry predicate contradicts the assertion.
The predicate () => errorSpy.mock.calls.length > 0 waits for the error to be reported, but the next line asserts expect(errorSpy).not.toHaveBeenCalled(). This is logically backwards—you're waiting for something you expect not to happen. Use a deterministic delay instead to give the consumer time to process, then assert no calls.
🔧 Proposed fix
-import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
+// import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
@@
- await waitAndRetry(() => errorSpy.mock.calls.length > 0, 10, 100)
- expect(errorSpy).not.toHaveBeenCalled()
+ // Give the consumer a short window to process the bad message
+ await new Promise((resolve) => setTimeout(resolve, 300))
+ expect(errorSpy).not.toHaveBeenCalled()Also applies to: 329-359
🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts` around lines
1 - 5, The test uses waitAndRetry with a predicate that waits for errorSpy to
have calls but then asserts expect(errorSpy).not.toHaveBeenCalled(), which is
contradictory; update the tests to use a deterministic delay (e.g., sleep or
waitAndRetry with a fixed timeout function) to give the consumer time to process
instead of waiting for errorSpy to be called, then assert that errorSpy was not
called; specifically modify the usages of waitAndRetry and predicates around
errorSpy in PermissionBatchConsumer.spec (the blocks using waitAndRetry(() =>
errorSpy.mock.calls.length > 0) and the similar block later) so they wait a
fixed short interval and then run expect(errorSpy).not.toHaveBeenCalled().
| import { | ||
| PERMISSION_ADDED_SCHEMA, | ||
| PERMISSION_REMOVED_SCHEMA, | ||
| type PERMISSION_TOPIC_MESSAGES_CONFIG, | ||
| type PermissionAdded, | ||
| type PermissionRemoved, | ||
| } from '../utils/permissionSchemas.ts' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n packages/kafkajs/test/consumer/PermissionBatchConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 3712
🏁 Script executed:
cat -n packages/kafkajs/test/utils/permissionSchemas.tsRepository: kibertoad/message-queue-toolkit
Length of output: 1342
Remove type keyword from PERMISSION_TOPIC_MESSAGES_CONFIG import.
PERMISSION_TOPIC_MESSAGES_CONFIG is a runtime const value (not just a type) but imported as type-only on line 15. This prevents it from being used with typeof on lines 27, 39, and 58, causing TypeScript compilation errors. Import it as a regular value:
Fix
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
- type PERMISSION_TOPIC_MESSAGES_CONFIG,
+ PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { | |
| PERMISSION_ADDED_SCHEMA, | |
| PERMISSION_REMOVED_SCHEMA, | |
| type PERMISSION_TOPIC_MESSAGES_CONFIG, | |
| type PermissionAdded, | |
| type PermissionRemoved, | |
| } from '../utils/permissionSchemas.ts' | |
| import { | |
| PERMISSION_ADDED_SCHEMA, | |
| PERMISSION_REMOVED_SCHEMA, | |
| PERMISSION_TOPIC_MESSAGES_CONFIG, | |
| type PermissionAdded, | |
| type PermissionRemoved, | |
| } from '../utils/permissionSchemas.ts' |
🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.ts` around lines 12 -
18, The import of PERMISSION_TOPIC_MESSAGES_CONFIG is currently type-only;
remove the leading `type` so it is imported as a runtime value (e.g., change the
import list to include PERMISSION_TOPIC_MESSAGES_CONFIG without `type`) so it
can be used with `typeof PERMISSION_TOPIC_MESSAGES_CONFIG` where referenced;
leave the other `type` imports (PermissionAdded, PermissionRemoved) as
type-only.
| import { | ||
| PERMISSION_ADDED_SCHEMA, | ||
| PERMISSION_REMOVED_SCHEMA, | ||
| type PERMISSION_TOPIC_MESSAGES_CONFIG, | ||
| type PermissionAdded, | ||
| type PermissionRemoved, | ||
| } from '../utils/permissionSchemas.ts' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Check if the file exists and read the import section
head -30 packages/kafkajs/test/consumer/PermissionConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 206
🏁 Script executed:
# Search for all uses of PERMISSION_TOPIC_MESSAGES_CONFIG in the file
rg "PERMISSION_TOPIC_MESSAGES_CONFIG" packages/kafkajs/test/consumer/PermissionConsumer.ts -nRepository: kibertoad/message-queue-toolkit
Length of output: 315
Remove type keyword from PERMISSION_TOPIC_MESSAGES_CONFIG import.
PERMISSION_TOPIC_MESSAGES_CONFIG is imported as type-only but used with typeof on lines 27, 38, and 52. Type-only imports are erased at compile time and cannot be used with typeof. Remove the type keyword to make it a regular value import.
🐛 Proposed fix
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
- type PERMISSION_TOPIC_MESSAGES_CONFIG,
+ PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { | |
| PERMISSION_ADDED_SCHEMA, | |
| PERMISSION_REMOVED_SCHEMA, | |
| type PERMISSION_TOPIC_MESSAGES_CONFIG, | |
| type PermissionAdded, | |
| type PermissionRemoved, | |
| } from '../utils/permissionSchemas.ts' | |
| import { | |
| PERMISSION_ADDED_SCHEMA, | |
| PERMISSION_REMOVED_SCHEMA, | |
| PERMISSION_TOPIC_MESSAGES_CONFIG, | |
| type PermissionAdded, | |
| type PermissionRemoved, | |
| } from '../utils/permissionSchemas.ts' |
🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionConsumer.ts` around lines 12 - 18,
The import for PERMISSION_TOPIC_MESSAGES_CONFIG is currently type-only but is
later used with typeof (in the usages around the
PERMISSION_TOPIC_MESSAGES_CONFIG references on lines where typeof is applied),
which fails at compile time; remove the leading type keyword so
PERMISSION_TOPIC_MESSAGES_CONFIG is imported as a runtime value (leave type-only
imports like PermissionAdded and PermissionRemoved as-is) so the typeof
PERMISSION_TOPIC_MESSAGES_CONFIG expressions resolve correctly.
Summary by CodeRabbit
New Features
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.