SolaceIO - fix for data loss during scaling/rebalancing (#36991)#38603
SolaceIO - fix for data loss during scaling/rebalancing (#36991)#38603iht wants to merge 2 commits into
Conversation
Introduced a sequential pending checkpoints tracking mechanism using a TreeMap in the reader. Created a JVM-global ActiveReadersRegistry using weak references to resolve serialized checkpoint marks back to their originating active reader. This enables reliable sequential acknowledgments of checkpoints, ensuring we only ack committed data, while allowing subsequent finalizations to catch up and prevent message leaks (stuckness) if intermediate finalizations are lost. Also synchronized received messages access for thread safety with minimal lock duration (network I/O done outside locks). Fixed initialization order by registering the reader post-construction.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses data loss issues in the SolaceIO connector during Dataflow scaling and rebalancing events. By moving away from premature message acknowledgments and implementing a robust, sequential, and thread-safe finalization mechanism, the changes ensure that messages are only acknowledged after being successfully committed by the runner. The solution includes a global reader registry and a self-healing checkpoint finalization logic that prevents message leaks and pipeline stalls. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors the Solace checkpointing mechanism to support sequential acknowledgments and decouple the checkpoint mark from the reader's lifecycle using a global registry. Key changes include the introduction of the ActiveReadersRegistry to track active readers via UUIDs, updating SolaceCheckpointMark to store a reader UUID and checkpoint ID, and modifying UnboundedSolaceReader to manage pending checkpoints in a TreeMap. Review feedback identifies several improvement opportunities: using Guava's Cache with weak values for the registry to prevent potential memory leaks of keys, storing UUID objects directly in the checkpoint mark to avoid redundant string parsing, using private lock objects instead of synchronizing on this, reducing method visibility to package-private where appropriate, and restoring detailed message identifiers in error logs for better debugging.
- Use Guava Cache with weak values in ActiveReadersRegistry to prevent memory leaks. - Standardize on String for readerUuid in SolaceCheckpointMark and UnboundedSolaceReader to avoid Avro serialization issues with UUID on JDK 17+, while still eliminating UUID.fromString() overhead. - Use private lock object in UnboundedSolaceReader instead of synchronizing on 'this'. - Reduce visibility of UnboundedSolaceReader.finalizeCheckpoint to package-private. - Restore applicationMessageId and ackMessageId in failed ack logs.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the SolaceIO acknowledgment logic by introducing a global ActiveReadersRegistry and a sequence-based checkpointing system. SolaceCheckpointMark now uses a reader UUID and a checkpoint ID to trigger acknowledgments via the originating reader, allowing for sequential processing and catch-up of missed checkpoints. Feedback was provided regarding an unnecessary synchronization block in the advance() method, as the thread model in Apache Beam ensures that advance() and getCheckpointMark() are called by the same thread, making the lock on receivedMessages redundant.
| synchronized (lock) { | ||
| receivedMessages.add(receivedXmlMessage); | ||
| } |
There was a problem hiding this comment.
The synchronization here is unnecessary and can cause minor performance degradation. In Apache Beam, advance() and getCheckpointMark() are guaranteed to be called by the same reader thread. Since receivedMessages is only accessed by these two methods and not by the finalizer thread (which only touches pendingCheckpoints), there is no cross-thread contention on this specific queue. Removing this synchronized block avoids potential blocking of the critical reader thread when the finalizer thread holds the lock during its own (short) synchronized blocks.
| synchronized (lock) { | |
| receivedMessages.add(receivedXmlMessage); | |
| } | |
| solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); | |
| receivedMessages.add(receivedXmlMessage); | |
| return true; |
This PR may solve #36991.
Problem Statement
The SolaceIO connector was experiencing data loss during Dataflow scaling/rebalancing events.
finalizeCheckpoint) was lost or delayed, the associated messages remained unacknowledged indefinitely on the Solace broker, eventually hitting the max-delivered-unacked-msgs-per-flow limit and halting message delivery. That fix was subsequently reverted in Revert "SolaceIO data loss - remove message ack from close and advanc… #37162Proposed Solution
This PR introduces a robust, thread-safe sequential finalization catch-up mechanism that ensures we only acknowledge committed messages without risking leaks or stuckness if finalizations are lost.
1. Active Reader Tracking ( ActiveReadersRegistry )
We introduce a JVM-global ActiveReadersRegistry that tracks active UnboundedSolaceReader instances using WeakReferences.
2. Sequential Catch-Up Finalization
Instead of a shared queue, the reader now maintains a
TreeMap<Long, List<BytesXMLMessage>> pendingCheckpointsto track messages per individual checkpoint ID.3. Concurrency & Thread Safety
To avoid blocking the critical reader thread (calling advance and getCheckpointMark ) during network operations in the finalizer thread:
Changes Made
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.