Skip to content

Conversation

@kibertoad
Copy link
Owner

@kibertoad kibertoad commented Jan 21, 2026

Summary by CodeRabbit

  • New Features

    • Added a new KafkaJS integration package with support for publishers, consumers, and batch message processing capabilities.
    • Implemented handler routing with type-safe topic and schema configuration.
  • Documentation

    • Comprehensive guides added for both Kafka and KafkaJS packages covering installation, usage examples, batch processing, configuration, error handling, and testing.
  • Chores

    • CI/CD workflows updated to support the new KafkaJS package.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 21, 2026

📝 Walkthrough

Walkthrough

This PR introduces a complete new KafkaJS integration package for the message queue toolkit. It adds CI/CD workflow configuration, package setup, core publisher and consumer implementations with batch processing support, a strongly-typed handler routing framework, message batching utilities, comprehensive test suites, and documentation for both the existing Kafka package and the new KafkaJS package.

Changes

Cohort / File(s) Change Summary
CI/CD Workflows
.github/workflows/ci.yml, .github/workflows/publish.yml
Added kafkajs job to ci.yml with Node 22.x/24.x matrix; updated publish.yml with kafkajs package path filters and matrix entry
Package Configuration
packages/kafkajs/package.json, packages/kafkajs/tsconfig.json, packages/kafkajs/tsconfig.build.json, packages/kafkajs/biome.json, packages/kafkajs/vitest.config.ts
Package manifest, TypeScript configs, build config, linting config, and test runner configuration for KafkaJS package
Core Service Classes
packages/kafkajs/lib/AbstractKafkaService.ts, packages/kafkajs/lib/AbstractKafkaPublisher.ts, packages/kafkajs/lib/AbstractKafkaConsumer.ts
Base service with logging and metrics; publisher with schema validation and lazy init; consumer with subscription, message parsing, retry logic, and batch processing support
Type System and Routing
packages/kafkajs/lib/types.ts, packages/kafkajs/lib/handler-routing/KafkaHandlerConfig.ts, packages/kafkajs/lib/handler-routing/KafkaHandlerRoutingBuilder.ts, packages/kafkajs/lib/handler-routing/index.ts
Comprehensive Kafka-specific types, request context, message schemas, topic config, and strongly-typed handler routing builder with batch processing support
Utilities
packages/kafkajs/lib/utils/KafkaMessageBatchStream.ts, packages/kafkajs/lib/utils/safeJsonDeserializer.ts
Per-topic-partition message batching with configurable size and timeout; safe JSON deserialization with error handling
Module Exports
packages/kafkajs/lib/index.ts
Re-exports of all public entities (consumers, publishers, handler routing, types)
Test Utilities
packages/kafkajs/test/utils/testContext.ts, packages/kafkajs/test/utils/permissionSchemas.ts, packages/kafkajs/test/publisher/PermissionPublisher.ts, packages/kafkajs/test/consumer/PermissionConsumer.ts, packages/kafkajs/test/consumer/PermissionBatchConsumer.ts
DI container setup, test schemas with zod, and test fixture publishers/consumers for integration testing
Test Suites
packages/kafkajs/test/publisher/PermissionPublisher.spec.ts, packages/kafkajs/test/consumer/PermissionConsumer.spec.ts, packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts, packages/kafkajs/lib/utils/KafkaMessageBatchStream.spec.ts, packages/kafkajs/lib/utils/safeJsonDeserializer.spec.ts, packages/kafkajs/lib/handler-routing/KafkaHandlerRoutingBuilder.spec.ts
Comprehensive test coverage for lifecycle, initialization, message consumption, batch processing, error handling, observability hooks, and metrics recording
Documentation
packages/kafka/README.md, packages/kafkajs/README.md
Extensive updates to Kafka package docs with usage examples; complete documentation for KafkaJS package including installation, usage patterns, and configuration

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Consumer as KafkaConsumer
    participant Handler as Handler/Router
    participant Kafka as Kafka Broker
    participant Metrics as MetricsManager
    participant Spy as HandlerSpy

    App->>Consumer: init()
    Consumer->>Kafka: connect admin & consumer
    Consumer->>Kafka: subscribe to topics
    Consumer->>Consumer: start message loop
    
    Kafka-->>Consumer: message received
    Consumer->>Consumer: transformMessage()
    Consumer->>Consumer: parseMessages() + schema validation
    Consumer->>Handler: resolve handler for topic
    
    alt Batch Processing Enabled
        Consumer->>Consumer: accumulate messages (KafkaMessageBatchStream)
        Consumer->>Handler: call handler(batch[], context, requestContext)
    else Single Message Mode
        Consumer->>Handler: call handler(message, context, requestContext)
    end
    
    Handler-->>Consumer: processing complete
    Consumer->>Spy: recordHandlerSuccess()
    Consumer->>Metrics: push processing metrics
    Consumer->>Kafka: commitOffset()
    
    alt Error During Processing
        Handler-->>Consumer: error thrown
        Consumer->>Consumer: retry with exponential backoff
        Consumer->>Spy: recordHandlerError()
        Consumer->>Metrics: push error metrics
    end
    
    App->>Consumer: close()
    Consumer->>Kafka: disconnect consumer & admin
Loading
sequenceDiagram
    participant App as Application
    participant Publisher as KafkaPublisher
    participant Schema as Schema Validator
    participant Kafka as Kafka Broker
    participant Metrics as MetricsManager
    participant Spy as HandlerSpy

    App->>Publisher: publish(topic, message)
    
    alt First Publish (Lazy Init)
        Publisher->>Kafka: lazy init - connect producer
        Publisher->>Kafka: create topics if autocreateTopics enabled
    end
    
    Publisher->>Schema: validateMessage(message)
    alt Schema Validation Fails
        Schema-->>Publisher: validation error
        Publisher-->>App: throw InternalError
    end
    
    Publisher->>Publisher: transformMessage()
    Publisher->>Publisher: add request context headers (x-request-id)
    Publisher->>Kafka: send(topic, message, headers)
    
    Kafka-->>Publisher: success
    Publisher->>Spy: recordPublished()
    Publisher->>Metrics: record metrics
    Publisher-->>App: return
    
    App->>Publisher: close()
    Publisher->>Kafka: disconnect producer
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested reviewers

  • kjamrog
  • CarlosGamero

Poem

🐰 A KafkaJS warren springs to life,
With handlers routing without strife,
Batches flow like morning dew,
Strong types guide the message through,
From broker streams to metrics bright,
The toolkit hops with newfound might! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'KafkaJS adapter' accurately summarizes the main change—introducing a complete new KafkaJS adapter package (@message-queue-toolkit/kafkajs) with publisher, consumer, handler routing, and comprehensive test utilities.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In `@packages/kafkajs/lib/AbstractKafkaConsumer.ts`:
- Around line 194-211: The batch mode never commits offsets because
KafkaMessageBatchStream currently sends only DeserializedMessage to the batch
callback; update the flow so the batch handler can commit offsets after
processing: modify the eachMessage handler (and/or transformMessage) to pass
through the original payload metadata (topic, partition, offset) into the items
written to messageBatchStream (or change KafkaMessageBatchStream to accept a
wrapper type that includes payload metadata), then in the batch callback invoked
by messageBatchStream (the function that calls consume(...)), after await
this.consume(batch.topic, batch.messages) successfully resolves, compute the
highest offset per partition from the batch and call
this.commitOffset(payloadInfo) (or a new helper that commits the last offset+1
per partition) so offsets are explicitly committed in batchProcessingEnabled
mode; ensure commit logic uses the same commitOffset method used in non-batch
mode.

In `@packages/kafkajs/package.json`:
- Around line 59-63: The peer dependency for zod is too broad and allows v3
which lacks the v4 export used in lib/types.ts and
lib/handler-routing/KafkaHandlerConfig.ts; update the "zod" entry in
peerDependencies of package.json from ">=3.25.76 <5.0.0" to a 4.x-only range
(for example ">=4.0.0 <5.0.0" or "^4.0.0") so consumers are required to install
Zod v4 compatible with the zod/v4 imports.

In `@packages/kafkajs/README.md`:
- Around line 93-105: The README's KafkaConfig snippet uses placeholder types
for ssl and retry; update the type annotations to match the implementation by
replacing "boolean | TLSConfig" with KafkaJsConfig['ssl'] and "RetryOptions"
with KafkaJsConfig['retry'] in the KafkaConfig block so the documented
KafkaConfig matches the actual type usage (referencing KafkaJsConfig as used in
the codebase).

In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts`:
- Around line 1-5: The test uses waitAndRetry with a predicate that waits for
errorSpy to have calls but then asserts expect(errorSpy).not.toHaveBeenCalled(),
which is contradictory; update the tests to use a deterministic delay (e.g.,
sleep or waitAndRetry with a fixed timeout function) to give the consumer time
to process instead of waiting for errorSpy to be called, then assert that
errorSpy was not called; specifically modify the usages of waitAndRetry and
predicates around errorSpy in PermissionBatchConsumer.spec (the blocks using
waitAndRetry(() => errorSpy.mock.calls.length > 0) and the similar block later)
so they wait a fixed short interval and then run
expect(errorSpy).not.toHaveBeenCalled().

In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.ts`:
- Around line 12-18: The import of PERMISSION_TOPIC_MESSAGES_CONFIG is currently
type-only; remove the leading `type` so it is imported as a runtime value (e.g.,
change the import list to include PERMISSION_TOPIC_MESSAGES_CONFIG without
`type`) so it can be used with `typeof PERMISSION_TOPIC_MESSAGES_CONFIG` where
referenced; leave the other `type` imports (PermissionAdded, PermissionRemoved)
as type-only.

In `@packages/kafkajs/test/consumer/PermissionConsumer.ts`:
- Around line 12-18: The import for PERMISSION_TOPIC_MESSAGES_CONFIG is
currently type-only but is later used with typeof (in the usages around the
PERMISSION_TOPIC_MESSAGES_CONFIG references on lines where typeof is applied),
which fails at compile time; remove the leading type keyword so
PERMISSION_TOPIC_MESSAGES_CONFIG is imported as a runtime value (leave type-only
imports like PermissionAdded and PermissionRemoved as-is) so the typeof
PERMISSION_TOPIC_MESSAGES_CONFIG expressions resolve correctly.
🧹 Nitpick comments (5)
packages/kafkajs/lib/utils/safeJsonDeserializer.ts (1)

1-11: Return type may not reflect actual behavior.

The return type is object | undefined, but JSON.parse can return primitives (e.g., JSON.parse('"hello"') returns "hello", JSON.parse('42') returns 42). If the intent is to only accept JSON objects, consider adding a type guard after parsing.

💡 Optional fix to enforce object return type
 export const safeJsonDeserializer = (data?: Buffer | string | null): object | undefined => {
   if (!data) return undefined
   // Checking to be safe
   if (!Buffer.isBuffer(data) && typeof data !== 'string') return undefined
 
   try {
-    return JSON.parse(data.toString('utf-8'))
+    const parsed = JSON.parse(data.toString('utf-8'))
+    if (typeof parsed !== 'object' || parsed === null) return undefined
+    return parsed
   } catch (_) {
     return undefined
   }
 }
packages/kafkajs/lib/utils/KafkaMessageBatchStream.spec.ts (1)

6-51: Consider closing the stream to clear the long timeout.

With timeoutMilliseconds: 10000, the last partial batch leaves a pending timer and a late flush. Ending the stream avoids open handles and late mutations.

🧹 Possible cleanup (updates expectation)
@@
-import { setTimeout } from 'node:timers/promises'
+import { once } from 'node:events'
+import { setTimeout } from 'node:timers/promises'
@@
     for (const message of messages) {
       batchStream.write(message)
     }
 
+    batchStream.end()
+    await once(batchStream, 'finish')
+
     await dataFetchingPromise
@@
     expect(receivedBatches).toEqual([
       { topic, partition: 0, messages: [messages[0], messages[1], messages[2]] },
       { topic, partition: 0, messages: [messages[3], messages[4], messages[5]] },
       { topic, partition: 0, messages: [messages[6], messages[7], messages[8]] },
+      { topic, partition: 0, messages: [messages[9]] },
     ])
packages/kafkajs/lib/AbstractKafkaPublisher.ts (1)

26-85: Memoize init() promise to avoid redundant concurrent producer.connect() calls.

While KafkaJS's producer.connect() is idempotent and safe for concurrent calls, the current implementation with a boolean flag permits multiple concurrent invocations from concurrent publish() calls. Memoizing the in-flight init promise would eliminate unnecessary concurrent attempts and clear it on error or close.

♻️ Suggested refactor
 export abstract class AbstractKafkaPublisher<
   TopicsConfig extends TopicConfig[],
 > extends AbstractKafkaService<TopicsConfig, KafkaPublisherOptions<TopicsConfig>> {
   private readonly schemaContainers: Record<string, MessageSchemaContainer<object>>
   private readonly kafka: Kafka
   private readonly producer: Producer
   private isInitiated: boolean
+  private initPromise?: Promise<void>
@@
   async init(): Promise<void> {
     if (this.isInitiated) return
-
-    try {
-      await this.producer.connect()
-      this.isInitiated = true
-    } catch (e) {
-      throw new InternalError({
-        message: 'Producer init failed',
-        errorCode: 'KAFKA_PRODUCER_INIT_ERROR',
-        cause: e,
-      })
-    }
+    if (!this.initPromise) {
+      this.initPromise = (async () => {
+        try {
+          await this.producer.connect()
+          this.isInitiated = true
+        } catch (e) {
+          this.initPromise = undefined
+          throw new InternalError({
+            message: 'Producer init failed',
+            errorCode: 'KAFKA_PRODUCER_INIT_ERROR',
+            cause: e,
+          })
+        }
+      })()
+    }
+    await this.initPromise
   }
@@
   async close(): Promise<void> {
     if (!this.isInitiated) return
 
     await this.producer.disconnect()
     this.isInitiated = false
+    this.initPromise = undefined
   }
packages/kafkajs/lib/AbstractKafkaConsumer.ts (2)

282-301: String-based error detection is fragile but may be necessary.

The error handling relies on matching substrings in error messages ('rebalance', 'not a member', 'generation'). This is fragile as error message text could change across KafkaJS versions.

Consider extracting these patterns to a constant or helper function for easier maintenance, and add a comment explaining why string matching is used:

💡 Suggested improvement
+// KafkaJS doesn't expose typed error codes for rebalance-related failures,
+// so we match on known error message patterns
+const REBALANCE_ERROR_PATTERNS = ['rebalance', 'not a member', 'generation'] as const
+
+function isRebalanceError(error: Error): boolean {
+  const message = error.message ?? ''
+  return REBALANCE_ERROR_PATTERNS.some((pattern) => message.includes(pattern))
+}

415-437: Consider longer backoff delays for retry resilience.

The current exponential backoff produces very short delays:

  • Retry 1: 1ms (2^0)
  • Retry 2: 2ms (2^1)

These delays may be too short for transient failures (e.g., brief network hiccups, temporary resource contention) to recover. Consider using a larger base or multiplier:

💡 Suggested adjustment
-     if (retries > 0) await setTimeout(Math.pow(2, retries - 1))
+     // Exponential backoff: 100ms, 200ms, 400ms...
+     if (retries > 0) await setTimeout(100 * Math.pow(2, retries - 1))

Given the TODO comment about implementing a proper retry mechanism with DLQ, this is acceptable for now as an interim solution.

Comment on lines +194 to +211
if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
this.messageBatchStream = new KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>(
(batch) =>
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
this.options.batchProcessingOptions,
)

await this.consumer.run({
autoCommit: false,
eachMessage: async (payload) => {
const message = this.transformMessage(payload)
if (message) {
this.messageBatchStream!.write(message)
}
},
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, find KafkaMessageBatchStream implementation
find packages -name "*.ts" -type f | head -20

Repository: kibertoad/message-queue-toolkit

Length of output: 997


🏁 Script executed:

# Search for KafkaMessageBatchStream class definition
rg -n "class KafkaMessageBatchStream" --type=ts

Repository: kibertoad/message-queue-toolkit

Length of output: 262


🏁 Script executed:

# Search for KafkaMessageBatchStream to understand its location
rg -l "KafkaMessageBatchStream" --type=ts

Repository: kibertoad/message-queue-toolkit

Length of output: 386


🏁 Script executed:

# Look for commitOffset usage in batch context and the consume method
rg -n "commitOffset|this\.consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -B2 -A2

Repository: kibertoad/message-queue-toolkit

Length of output: 2676


🏁 Script executed:

cat -n packages/kafkajs/lib/utils/KafkaMessageBatchStream.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 5535


🏁 Script executed:

# Also check the consume method to see if it handles commits
rg -n "async consume|private consume|protected consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -A15

Repository: kibertoad/message-queue-toolkit

Length of output: 721


🏁 Script executed:

rg -n "private async consume" packages/kafkajs/lib/AbstractKafkaConsumer.ts -A50

Repository: kibertoad/message-queue-toolkit

Length of output: 2104


🏁 Script executed:

# Also check if there's any offset commit in batch context or elsewhere
rg -n "this\.commitOffset\|commitOffsets" packages/kafkajs/lib/AbstractKafkaConsumer.ts -B2 -A2

Repository: kibertoad/message-queue-toolkit

Length of output: 57


🏁 Script executed:

# Search for commitOffset without escaped pipes
rg -n "commitOffset" packages/kafkajs/lib/AbstractKafkaConsumer.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 250


🏁 Script executed:

# Check the full context around line 219 in the non-batch mode to understand the commit flow
sed -n '210,230p' packages/kafkajs/lib/AbstractKafkaConsumer.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 648


🏁 Script executed:

# Look at the entire batch processing vs non-batch processing section
sed -n '194,225p' packages/kafkajs/lib/AbstractKafkaConsumer.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 1196


Offset commits are missing in batch processing mode.

In non-batch mode (lines 212-223), commitOffset(payload) is explicitly called after consuming each message. In batch mode (lines 194-211), messages are written to messageBatchStream but neither the stream handler nor the batch callback invokes offset commits.

The consume() method doesn't commit offsets—it processes messages and returns. The KafkaMessageBatchStream itself is just a batching utility that collects messages and invokes the callback; it has no access to the payload information (topic, partition, offset) required by commitOffset(). This means offsets are never committed in batch processing with autoCommit: false, leading to message reprocessing after consumer restarts or rebalances.

The batch callback (line 198-199) or the eachMessage handler (lines 206-210) needs to explicitly commit offsets for the processed batch.

🤖 Prompt for AI Agents
In `@packages/kafkajs/lib/AbstractKafkaConsumer.ts` around lines 194 - 211, The
batch mode never commits offsets because KafkaMessageBatchStream currently sends
only DeserializedMessage to the batch callback; update the flow so the batch
handler can commit offsets after processing: modify the eachMessage handler
(and/or transformMessage) to pass through the original payload metadata (topic,
partition, offset) into the items written to messageBatchStream (or change
KafkaMessageBatchStream to accept a wrapper type that includes payload
metadata), then in the batch callback invoked by messageBatchStream (the
function that calls consume(...)), after await this.consume(batch.topic,
batch.messages) successfully resolves, compute the highest offset per partition
from the batch and call this.commitOffset(payloadInfo) (or a new helper that
commits the last offset+1 per partition) so offsets are explicitly committed in
batchProcessingEnabled mode; ensure commit logic uses the same commitOffset
method used in non-batch mode.

Comment on lines +59 to +63
"peerDependencies": {
"@message-queue-toolkit/core": ">=23.0.0",
"@message-queue-toolkit/schemas": ">=7.0.0",
"zod": ">=3.25.76 <5.0.0"
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for Zod 4-specific APIs in the kafkajs package
echo "=== Checking for Zod 4-specific API usage ==="
rg -n --type=ts 'z\.(enum|record|strictObject|looseObject|templateLiteral|transform)\s*\(' packages/kafkajs/ || echo "No matches found"

echo ""
echo "=== Checking all Zod usage in kafkajs ==="
rg -n 'from.*zod|import.*zod' packages/kafkajs/ --type=ts || echo "No imports found"

echo ""
echo "=== Listing TypeScript files in kafkajs/src ==="
find packages/kafkajs -name "*.ts" -type f | head -20

Repository: kibertoad/message-queue-toolkit

Length of output: 1805


🏁 Script executed:

#!/bin/bash
echo "=== Check for 'zod/v4' import details ==="
rg -B2 -A2 "from.*zod/v4" packages/kafkajs/ --type=ts | head -40

echo ""
echo "=== Check if there are any generic 'from zod' imports ==="
rg "from ['\"]zod['\"]" packages/kafkajs/ --type=ts || echo "No generic zod imports found"

echo ""
echo "=== Check package.json for any Zod compatibility notes ==="
cat packages/kafkajs/package.json | jq '.description, .keywords' 2>/dev/null || cat packages/kafkajs/package.json | grep -A5 -B5 zod

Repository: kibertoad/message-queue-toolkit

Length of output: 2843


Narrow Zod peer dependency to require version 4.x only.

The code explicitly imports from zod/v4 (a versioned export path specific to Zod 4.x) in production files (lib/types.ts, lib/handler-routing/KafkaHandlerConfig.ts) and test files. The zod/v4 export does not exist in Zod 3.x, so users installing this package with Zod 3.x will encounter module resolution failures at runtime. Update the peer dependency from >=3.25.76 <5.0.0 to >=4.0.0 <5.0.0 (or ^4.0.0).

🤖 Prompt for AI Agents
In `@packages/kafkajs/package.json` around lines 59 - 63, The peer dependency for
zod is too broad and allows v3 which lacks the v4 export used in lib/types.ts
and lib/handler-routing/KafkaHandlerConfig.ts; update the "zod" entry in
peerDependencies of package.json from ">=3.25.76 <5.0.0" to a 4.x-only range
(for example ">=4.0.0 <5.0.0" or "^4.0.0") so consumers are required to install
Zod v4 compatible with the zod/v4 imports.

Comment on lines +93 to +105
### KafkaConfig

```typescript
type KafkaConfig = {
brokers: string[] // List of Kafka broker addresses
clientId: string // Client identifier
ssl?: boolean | TLSConfig // SSL configuration
sasl?: SASLOptions // SASL authentication
connectionTimeout?: number // Connection timeout in ms
requestTimeout?: number // Request timeout in ms
retry?: RetryOptions // Retry configuration
}
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the KafkaJS package structure
git ls-files packages/kafkajs/ | head -20

Repository: kibertoad/message-queue-toolkit

Length of output: 1053


🏁 Script executed:

# Look for type definition files in the KafkaJS package
fd -e "ts" -e "d.ts" packages/kafkajs/ | grep -E "(type|config)" | head -20

Repository: kibertoad/message-queue-toolkit

Length of output: 448


🏁 Script executed:

# Search for TLSConfig definition
rg "type TLSConfig|interface TLSConfig|export.*TLSConfig" packages/kafkajs/

Repository: kibertoad/message-queue-toolkit

Length of output: 57


🏁 Script executed:

# Search for RetryOptions definition
rg "type RetryOptions|interface RetryOptions|export.*RetryOptions" packages/kafkajs/

Repository: kibertoad/message-queue-toolkit

Length of output: 57


🏁 Script executed:

# Search for KafkaConfig definition
rg "type KafkaConfig|interface KafkaConfig|export.*KafkaConfig" packages/kafkajs/

Repository: kibertoad/message-queue-toolkit

Length of output: 275


🏁 Script executed:

# Read the actual KafkaConfig type definition
cat -n packages/kafkajs/lib/types.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 2376


🏁 Script executed:

# Also check the test context to see how KafkaConfig is used
cat -n packages/kafkajs/test/utils/testContext.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 3503


🏁 Script executed:

# Verify the exact README content at lines 93-105
sed -n '93,105p' packages/kafkajs/README.md

Repository: kibertoad/message-queue-toolkit

Length of output: 521


Update KafkaConfig type annotations in README to match actual implementation.

The README currently documents ssl as boolean | TLSConfig and retry as RetryOptions, but the actual KafkaConfig type in packages/kafkajs/lib/types.ts uses KafkaJsConfig['ssl'] and KafkaJsConfig['retry'] (where KafkaJsConfig is imported from the 'kafkajs' package). The placeholder type names should be replaced with the actual indexed type access to avoid documentation drift.

✏️ Doc alignment tweak
@@
-  ssl?: boolean | TLSConfig   // SSL configuration
+  ssl?: KafkaJsConfig['ssl']  // SSL configuration (KafkaJS)
@@
-  retry?: RetryOptions        // Retry configuration
+  retry?: KafkaJsConfig['retry'] // Retry configuration (KafkaJS)
🤖 Prompt for AI Agents
In `@packages/kafkajs/README.md` around lines 93 - 105, The README's KafkaConfig
snippet uses placeholder types for ssl and retry; update the type annotations to
match the implementation by replacing "boolean | TLSConfig" with
KafkaJsConfig['ssl'] and "RetryOptions" with KafkaJsConfig['retry'] in the
KafkaConfig block so the documented KafkaConfig matches the actual type usage
(referencing KafkaJsConfig as used in the codebase).

Comment on lines +1 to +5
import { randomUUID } from 'node:crypto'
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
import { Kafka } from 'kafkajs'
import { afterAll, expect, type MockInstance } from 'vitest'
import z from 'zod/v4'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and read the test file
find . -name "PermissionBatchConsumer.spec.ts" -type f

Repository: kibertoad/message-queue-toolkit

Length of output: 202


🏁 Script executed:

# Read the kafkajs version of the test file to see the full context
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | head -100

Repository: kibertoad/message-queue-toolkit

Length of output: 3874


🏁 Script executed:

# Continue reading the file to find the waitAndRetry usage mentioned in the review
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | sed -n '250,360p'

Repository: kibertoad/message-queue-toolkit

Length of output: 4646


🏁 Script executed:

# Now let's understand what waitAndRetry does by searching for its documentation or implementation
web_search
What is the behavior of waitAndRetry from `@lokalise/universal-ts-utils/node`?

Repository: kibertoad/message-queue-toolkit

Length of output: 168


🏁 Script executed:

# Search for other uses of waitAndRetry in the test files
rg -n "waitAndRetry" ./packages/kafkajs/test/ -A 2 -B 2

Repository: kibertoad/message-queue-toolkit

Length of output: 1930


🌐 Web query:

@lokalise/universal-ts-utils waitAndRetry function documentation behavior

💡 Result:

The waitAndRetry helper repeatedly calls a predicate (sync or async) until it returns a truthy value or the maximum retry count is reached. It sleeps between attempts (sleepTime) and resolves with whatever the last attempt returned (truthy value if succeeded, otherwise the last falsy value); callers should handle any thrown errors. [1][2]


🏁 Script executed:

# Let's check the package.json to see what version is being used
grep -A 5 "@lokalise/universal-ts-utils" ./packages/kafkajs/package.json

Repository: kibertoad/message-queue-toolkit

Length of output: 289


🏁 Script executed:

# Let's look at the exact implementation or more documentation about waitAndRetry
npm info `@lokalise/universal-ts-utils`@4.8.0 2>/dev/null || echo "npm not available"

Repository: kibertoad/message-queue-toolkit

Length of output: 1441


🏁 Script executed:

# Check if there's a similar issue or comment in the other test file
cat -n ./packages/kafkajs/test/consumer/PermissionConsumer.spec.ts | sed -n '280,295p'

Repository: kibertoad/message-queue-toolkit

Length of output: 655


🏁 Script executed:

# Let's check the github repo or source to understand waitAndRetry better
curl -s https://raw.githubusercontent.com/lokalise/shared-ts-libs/main/packages/universal-ts-utils/src/index.ts 2>/dev/null | grep -A 20 "waitAndRetry" | head -30

Repository: kibertoad/message-queue-toolkit

Length of output: 57


🏁 Script executed:

# Alternative: Check if there's any documentation or usage in this repo that clarifies the expected behavior
rg -n "waitAndRetry" . -B 5 -A 5 --type ts --max-count=3 | head -50

Repository: kibertoad/message-queue-toolkit

Length of output: 4559


🏁 Script executed:

# Let's look at the GitHub repo source for waitAndRetry to be absolutely certain
curl -s https://raw.githubusercontent.com/lokalise/shared-ts-libs/main/packages/universal-ts-utils/src/sleep.ts 2>/dev/null

Repository: kibertoad/message-queue-toolkit

Length of output: 89


🏁 Script executed:

# Alternative: check the documentation in the package
python3 << 'EOF'
import json
import urllib.request

# Fetch the package info
try:
    with urllib.request.urlopen('https://registry.npmjs.org/@lokalise/universal-ts-utils/4.8.0') as response:
        data = json.loads(response.read().decode())
        # Look for description
        print("Package description:", data.get('description', 'N/A'))
        print("Repository:", data.get('repository', {}).get('url', 'N/A'))
except Exception as e:
    print(f"Error: {e}")
EOF

Repository: kibertoad/message-queue-toolkit

Length of output: 209


🏁 Script executed:

# Let me verify the logic one more time by looking at what the test is actually testing
cat -n ./packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts | sed -n '329,360p'

Repository: kibertoad/message-queue-toolkit

Length of output: 1236


🏁 Script executed:

# Also check the similar test in PermissionConsumer.spec.ts
cat -n ./packages/kafkajs/test/consumer/PermissionConsumer.spec.ts | sed -n '270,293p'

Repository: kibertoad/message-queue-toolkit

Length of output: 863


waitAndRetry predicate contradicts the assertion.

The predicate () => errorSpy.mock.calls.length > 0 waits for the error to be reported, but the next line asserts expect(errorSpy).not.toHaveBeenCalled(). This is logically backwards—you're waiting for something you expect not to happen. Use a deterministic delay instead to give the consumer time to process, then assert no calls.

🔧 Proposed fix
-import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
+// import { waitAndRetry } from '@lokalise/universal-ts-utils/node'

@@
-      await waitAndRetry(() => errorSpy.mock.calls.length > 0, 10, 100)
-      expect(errorSpy).not.toHaveBeenCalled()
+      // Give the consumer a short window to process the bad message
+      await new Promise((resolve) => setTimeout(resolve, 300))
+      expect(errorSpy).not.toHaveBeenCalled()

Also applies to: 329-359

🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.spec.ts` around lines
1 - 5, The test uses waitAndRetry with a predicate that waits for errorSpy to
have calls but then asserts expect(errorSpy).not.toHaveBeenCalled(), which is
contradictory; update the tests to use a deterministic delay (e.g., sleep or
waitAndRetry with a fixed timeout function) to give the consumer time to process
instead of waiting for errorSpy to be called, then assert that errorSpy was not
called; specifically modify the usages of waitAndRetry and predicates around
errorSpy in PermissionBatchConsumer.spec (the blocks using waitAndRetry(() =>
errorSpy.mock.calls.length > 0) and the similar block later) so they wait a
fixed short interval and then run expect(errorSpy).not.toHaveBeenCalled().

Comment on lines +12 to +18
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
type PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n packages/kafkajs/test/consumer/PermissionBatchConsumer.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 3712


🏁 Script executed:

cat -n packages/kafkajs/test/utils/permissionSchemas.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 1342


Remove type keyword from PERMISSION_TOPIC_MESSAGES_CONFIG import.

PERMISSION_TOPIC_MESSAGES_CONFIG is a runtime const value (not just a type) but imported as type-only on line 15. This prevents it from being used with typeof on lines 27, 39, and 58, causing TypeScript compilation errors. Import it as a regular value:

Fix
 import {
   PERMISSION_ADDED_SCHEMA,
   PERMISSION_REMOVED_SCHEMA,
-  type PERMISSION_TOPIC_MESSAGES_CONFIG,
+  PERMISSION_TOPIC_MESSAGES_CONFIG,
   type PermissionAdded,
   type PermissionRemoved,
 } from '../utils/permissionSchemas.ts'
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
type PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionBatchConsumer.ts` around lines 12 -
18, The import of PERMISSION_TOPIC_MESSAGES_CONFIG is currently type-only;
remove the leading `type` so it is imported as a runtime value (e.g., change the
import list to include PERMISSION_TOPIC_MESSAGES_CONFIG without `type`) so it
can be used with `typeof PERMISSION_TOPIC_MESSAGES_CONFIG` where referenced;
leave the other `type` imports (PermissionAdded, PermissionRemoved) as
type-only.

Comment on lines +12 to +18
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
type PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Check if the file exists and read the import section
head -30 packages/kafkajs/test/consumer/PermissionConsumer.ts

Repository: kibertoad/message-queue-toolkit

Length of output: 206


🏁 Script executed:

# Search for all uses of PERMISSION_TOPIC_MESSAGES_CONFIG in the file
rg "PERMISSION_TOPIC_MESSAGES_CONFIG" packages/kafkajs/test/consumer/PermissionConsumer.ts -n

Repository: kibertoad/message-queue-toolkit

Length of output: 315


Remove type keyword from PERMISSION_TOPIC_MESSAGES_CONFIG import.

PERMISSION_TOPIC_MESSAGES_CONFIG is imported as type-only but used with typeof on lines 27, 38, and 52. Type-only imports are erased at compile time and cannot be used with typeof. Remove the type keyword to make it a regular value import.

🐛 Proposed fix
 import {
   PERMISSION_ADDED_SCHEMA,
   PERMISSION_REMOVED_SCHEMA,
-  type PERMISSION_TOPIC_MESSAGES_CONFIG,
+  PERMISSION_TOPIC_MESSAGES_CONFIG,
   type PermissionAdded,
   type PermissionRemoved,
 } from '../utils/permissionSchemas.ts'
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
type PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
PERMISSION_TOPIC_MESSAGES_CONFIG,
type PermissionAdded,
type PermissionRemoved,
} from '../utils/permissionSchemas.ts'
🤖 Prompt for AI Agents
In `@packages/kafkajs/test/consumer/PermissionConsumer.ts` around lines 12 - 18,
The import for PERMISSION_TOPIC_MESSAGES_CONFIG is currently type-only but is
later used with typeof (in the usages around the
PERMISSION_TOPIC_MESSAGES_CONFIG references on lines where typeof is applied),
which fails at compile time; remove the leading type keyword so
PERMISSION_TOPIC_MESSAGES_CONFIG is imported as a runtime value (leave type-only
imports like PermissionAdded and PermissionRemoved as-is) so the typeof
PERMISSION_TOPIC_MESSAGES_CONFIG expressions resolve correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants