Skip to content

SolaceIO - fix for data loss during scaling/rebalancing (#36991)#38603

Open
iht wants to merge 2 commits into
apache:masterfrom
iht:fix_solaceio
Open

SolaceIO - fix for data loss during scaling/rebalancing (#36991)#38603
iht wants to merge 2 commits into
apache:masterfrom
iht:fix_solaceio

Conversation

@iht
Copy link
Copy Markdown
Contributor

@iht iht commented May 22, 2026

This PR may solve #36991.

Problem Statement

The SolaceIO connector was experiencing data loss during Dataflow scaling/rebalancing events.

  • The original code acknowledged messages prematurely (inside advance() and close() ), before they were committed by Dataflow. If a work item failed after being read, those messages were already acked in Solace and could not be redelivered, leading to data loss.
  • A previous fix attempt (SolaceIO - fix for data loss during late finalization #37007) removed these premature acks but introduced a regression where the pipeline got "stuck." If a checkpoint finalization call (finalizeCheckpoint) was lost or delayed, the associated messages remained unacknowledged indefinitely on the Solace broker, eventually hitting the max-delivered-unacked-msgs-per-flow limit and halting message delivery. That fix was subsequently reverted in Revert "SolaceIO data loss - remove message ack from close and advanc… #37162

Proposed Solution

This PR introduces a robust, thread-safe sequential finalization catch-up mechanism that ensures we only acknowledge committed messages without risking leaks or stuckness if finalizations are lost.

1. Active Reader Tracking ( ActiveReadersRegistry )

We introduce a JVM-global ActiveReadersRegistry that tracks active UnboundedSolaceReader instances using WeakReferences.

  • When SolaceCheckpointMark is serialized and sent to the runner, it loses direct in-memory references.
  • Upon finalization, the mark uses a serializable readerUuid to resolve the active reader from the registry and delegate the acknowledgment.

2. Sequential Catch-Up Finalization

Instead of a shared queue, the reader now maintains a TreeMap<Long, List<BytesXMLMessage>> pendingCheckpoints to track messages per individual checkpoint ID.

  • Sequential Acks: When finalizeCheckpoint(checkpointId) is called, the reader acknowledges the messages for that specific checkpoint and any older pending checkpoints (e.g., pendingId <= checkpointId ).
  • Self-Healing: Since Dataflow (and other runners) commits state sequentially per split, if checkpoint T1 finalization is lost but T2 succeeds, the finalization of T2 will automatically "catch up" and acknowledge/clean up the messages for T1. This completely prevents unacknowledged message leaks on the broker.

3. Concurrency & Thread Safety

To avoid blocking the critical reader thread (calling advance and getCheckpointMark ) during network operations in the finalizer thread:

  • We use extremely short synchronized blocks to copy state (extracting pending messages to a local list).
  • All actual Solace network I/O ( msg.ackMessage() ) is performed outside the synchronized blocks, ensuring zero-blocking concurrency.

Changes Made

  1. ActiveReadersRegistry.java (New): Added a thread-safe, weak-reference-based global registry for active readers.
  2. UnboundedSolaceReader.java :
    • Removed safeToAckMessages queue and the premature finalizeReadyMessages() helper.
    • Added pendingCheckpoints ( TreeMap ) and sequential finalizeCheckpoint(long) logic with non-blocking thread safety.
    • Added synchronized protection around receivedMessages in advance() .
    • Registers with ActiveReadersRegistry post-construction and unregisters in close() .
  3. SolaceCheckpointMark.java :
    • Removed transient safeToAck queue.
    • Added serializable readerUuid (String) and checkpointId (long) fields.
    • Delegates finalizeCheckpoint() to the resolved reader via the registry.
  4. UnboundedSolaceSource.java :
    • Modified createReader to register the reader immediately after construction, avoiding Checker Framework initialization order warnings.
  5. SolaceIOReadTest.java :
    • Updated testCheckpointMarkAndFinalizeSeparately to align with the fact that advance() no longer triggers premature acks.
    • Added testLostCheckpointCatchUp : Explicitly simulates a lost finalization (T1 lost, T2 finalized) and verifies that both T1 and T2 messages (7 messages in total) are successfully acknowledged, confirming the catch-up behavior works.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Introduced a sequential pending checkpoints tracking mechanism using a TreeMap in the reader.
Created a JVM-global ActiveReadersRegistry using weak references to resolve serialized checkpoint marks back to their originating active reader.
This enables reliable sequential acknowledgments of checkpoints, ensuring we only ack committed data, while allowing subsequent finalizations to catch up and prevent message leaks (stuckness) if intermediate finalizations are lost.
Also synchronized received messages access for thread safety with minimal lock duration (network I/O done outside locks).
Fixed initialization order by registering the reader post-construction.
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses data loss issues in the SolaceIO connector during Dataflow scaling and rebalancing events. By moving away from premature message acknowledgments and implementing a robust, sequential, and thread-safe finalization mechanism, the changes ensure that messages are only acknowledged after being successfully committed by the runner. The solution includes a global reader registry and a self-healing checkpoint finalization logic that prevents message leaks and pipeline stalls.

Highlights

  • ActiveReadersRegistry: Introduced a JVM-global registry using WeakReferences to track active UnboundedSolaceReader instances, allowing serialized checkpoint marks to resolve their originating reader.
  • Sequential Catch-Up Finalization: Replaced the shared queue with a TreeMap-based pending checkpoints tracker. Finalization now acknowledges messages for the target checkpoint and all preceding pending checkpoints, ensuring self-healing if finalization calls are lost.
  • Concurrency Improvements: Moved Solace network I/O operations outside of synchronized blocks to prevent blocking the critical reader thread, while maintaining thread safety for state updates.
  • Test Coverage: Updated existing tests to reflect the new non-premature acknowledgment behavior and added a new test case to verify the catch-up mechanism during lost checkpoint scenarios.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Solace checkpointing mechanism to support sequential acknowledgments and decouple the checkpoint mark from the reader's lifecycle using a global registry. Key changes include the introduction of the ActiveReadersRegistry to track active readers via UUIDs, updating SolaceCheckpointMark to store a reader UUID and checkpoint ID, and modifying UnboundedSolaceReader to manage pending checkpoints in a TreeMap. Review feedback identifies several improvement opportunities: using Guava's Cache with weak values for the registry to prevent potential memory leaks of keys, storing UUID objects directly in the checkpoint mark to avoid redundant string parsing, using private lock objects instead of synchronizing on this, reducing method visibility to package-private where appropriate, and restoring detailed message identifiers in error logs for better debugging.

- Use Guava Cache with weak values in ActiveReadersRegistry to prevent memory leaks.
- Standardize on String for readerUuid in SolaceCheckpointMark and UnboundedSolaceReader to avoid Avro serialization issues with UUID on JDK 17+, while still eliminating UUID.fromString() overhead.
- Use private lock object in UnboundedSolaceReader instead of synchronizing on 'this'.
- Reduce visibility of UnboundedSolaceReader.finalizeCheckpoint to package-private.
- Restore applicationMessageId and ackMessageId in failed ack logs.
@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@stankiewicz
Copy link
Copy Markdown
Contributor

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the SolaceIO acknowledgment logic by introducing a global ActiveReadersRegistry and a sequence-based checkpointing system. SolaceCheckpointMark now uses a reader UUID and a checkpoint ID to trigger acknowledgments via the originating reader, allowing for sequential processing and catch-up of missed checkpoints. Feedback was provided regarding an unnecessary synchronization block in the advance() method, as the thread model in Apache Beam ensures that advance() and getCheckpointMark() are called by the same thread, making the lock on receivedMessages redundant.

Comment on lines +159 to +161
synchronized (lock) {
receivedMessages.add(receivedXmlMessage);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The synchronization here is unnecessary and can cause minor performance degradation. In Apache Beam, advance() and getCheckpointMark() are guaranteed to be called by the same reader thread. Since receivedMessages is only accessed by these two methods and not by the finalizer thread (which only touches pendingCheckpoints), there is no cross-thread contention on this specific queue. Removing this synchronized block avoids potential blocking of the critical reader thread when the finalizer thread holds the lock during its own (short) synchronized blocks.

Suggested change
synchronized (lock) {
receivedMessages.add(receivedXmlMessage);
}
solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage);
receivedMessages.add(receivedXmlMessage);
return true;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants