Skip to content

fix(cosmos-spark): validate bounded change feed EOF against planned endLsn (#49380)#49390

Closed
xinlian12 wants to merge 4 commits into
Azure:mainfrom
xinlian12:fix/issue-49380-spark-bounded-changefeed-eof
Closed

fix(cosmos-spark): validate bounded change feed EOF against planned endLsn (#49380)#49390
xinlian12 wants to merge 4 commits into
Azure:mainfrom
xinlian12:fix/issue-49380-spark-bounded-changefeed-eof

Conversation

@xinlian12

Copy link
Copy Markdown
Member

Summary

Closes a silent data-loss path in the Cosmos DB Spark change feed connector. For bounded change feed reads, TransientIOErrorsRetryingIterator now validates that the latest continuation LSN has reached the planned endLsn before reporting end-of-stream, and throws a retryable OperationCancelledException otherwise so the existing transient-IO retry path can recover (or fail the Spark task instead of silently committing).

Issue

Fixes #49380

Root Cause

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala::hasNextInternalCore

When the underlying feedResponseIterator returns false from hasNext (publisher terminated), the iterator returned Some(false) unconditionally. validateNextLsn guards the upper edge for records (no _lsn > endLsn is returned), but no symmetric check exists for the lower edge of EOF: if the publisher terminates with the latest continuation LSN still below endLsn, Spark sees a clean end-of-stream and commits the planned end offset.

Triggers for the underlying premature publisher termination can include slow replicas / slow regions in the Cosmos service, paging, Reactor, networking, or cancellation interactions. The fix is independent of the trigger: the connector must defend against EOF below the planned endLsn.

Fix

Added a validateEofProgressOrThrow() helper invoked inside hasNextInternalCore immediately before returning Some(false) on underlying EOF:

  1. Unbounded path (endLsn = None): returns immediately. Behavior is unchanged.
  2. Page(s) consumed: parses the latest continuation token with SparkBridgeImplementationInternal.extractContinuationTokensFromChangeFeedStateJson and takes the minimum LSN across all ranges. EOF is allowed only when minLatestLsn >= endLsn. The min-across-ranges semantics matches the issue's validation table for multi-range continuations.
  3. No page consumed: EOF is allowed only when startLsn == endLsn (legitimate empty bounded interval). startLsn is plumbed in via a new additive constructor parameter on the iterator and derived consistently in ChangeFeedPartitionReader.
  4. Failure case: throws OperationCancelledException, which extends CosmosException with timeout status. Exceptions.canBeTransientFailure treats it as transient, so the iterator's existing executeWithRetry loop retries up to maxRetryCount and then propagates so the Spark task fails — preventing the silent checkpoint advance.

Alternatives considered (and rejected): a boolean-returning validator (would still need to convert to retryable exception); a Reactor-side sentinel wrapper (more invasive, risks altering prefetch/subscription); using only the current-continuation helper (misses multi-range lag).

How the Fix Works

underlying feedResponseIterator.hasNext == false
                │
                ▼
   validateEofProgressOrThrow()
                │
   endLsn isEmpty? ─── yes ──► return Some(false)   (unbounded, unchanged)
                │
                no
                │
                ▼
   lastContinuationToken null?
        │                       │
       yes                      no
        │                       │
        ▼                       ▼
  startLsn == endLsn?     parse min LSN across ranges
        │                       │
   yes/no                       │
        │                       ▼
        ▼                  minLatestLsn >= endLsn?
  complete / throw              │
                          yes ──┤
                                ▼
                          complete
                                │
                          no ──► throw OperationCancelledException
                                       │
                                       ▼
                               executeWithRetry retries
                                       │
                                       ▼
                               exhausted ─► Spark task fails
                                                (no silent commit)

Testing

  • Existing tests in TransientIOErrorsRetryingIteratorSpec continue to pass (unbounded path is unaffected).
  • Canonical repro test added: bounded read with endLsn=20, final continuation at LSN 15OperationCancelledException thrown (previously returned false silently).
  • Additional bounded-EOF validation cases (rows 2–6 from the issue's validation matrix) are added by the Test Planner in a follow-up commit in this same PR.

Risk Assessment

  • Breaking change: No. New constructor parameter has a default of None; default behavior for unbounded reads is unchanged.
  • Behavior change: Additive — bounded reads gain an EOF-progress validation step.
  • Performance impact: None on the hot path (validation runs only at EOF, once per iterator lifecycle).
  • Customer-visible consequence: Some workloads that today silently skip records on bounded change feed EOF will now visibly fail the Spark task after retries are exhausted (desired: at-least-once delivery preserved over silent data loss).

Cross-SDK Impact

Populated by the orchestrator after Phase 7.

…ndLsn

When TransientIOErrorsRetryingIterator reaches EOF on a bounded change feed read, validate that the latest continuation LSN (min across ranges) has reached the planned endLsn. If not, throw OperationCancelledException so the existing transient-IO retry path can recover or fail the task, preventing silent commits of unread bounded intervals.

Fixes Azure#49380

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@github-actions github-actions Bot added the Cosmos label Jun 5, 2026
xinlian12 and others added 3 commits June 5, 2026 09:40
Covers issue Azure#49380 validation table rows 2-6, plus retry-path and
row-level upper-bound regression checks for TransientIOErrorsRetryingIterator.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…er consolidation + diagnostics

- F1: catch NonFatal in extractLatestMinLsnFromContinuation, wrap as retryable OperationCancelledException
- F2: drop startLsn default; require callers to pass it explicitly
- F3: consolidate min-LSN-across-ranges parsing into SparkBridgeImplementationInternal
- F4: logWarning on first under-run with planned vs observed LSN
- F6: tighten getLatestContinuationToken to Option(lastContinuationToken.get())
- F7: logError before throw + clearer diagnostic context
- F9: render startLsn/endLsn without raw Option wrapper in exception text
- S2: verified continuation includes both child ranges after split (see code comment)
- S7: add cancellation-hint to exception message
- F8/S8 test additions: malformed continuation, multi-page progression, degenerate endLsn==startLsn with continuation

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Removed redundant logWarning before logError on EOF under-run path
  (deep reviewer F11; previously logged the same message twice and
  could double-count on log-based alerting).
- Inlined SparkBridgeImplementationInternal.extractMinLatestLsnFrom-
  ChangeFeedContinuationOrFallback at the call site and removed the
  one-line extractLatestMinLsnFromContinuation private wrapper
  (deep reviewer F12).
- Skeptic Lens NonFatal/InterruptedException concern verified
  factually incorrect: scala.util.control.NonFatal.apply explicitly
  excludes InterruptedException, so the wrap cannot convert
  cancellation into a retry loop.
- Caller-site audit verified: all 4 production + 2 test call sites
  pass startLsn explicitly. Shaded azure-cosmos-spark_4-1_2-13 uses
  generated target/shared-sources/ mirrors that sync automatically.

17/17 unit tests still pass.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@xinlian12

Copy link
Copy Markdown
Member Author

/azp run java - cosmos

@azure-pipelines

Copy link
Copy Markdown
Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12

Copy link
Copy Markdown
Member Author

Retrying CI: the two failing checks are unrelated to this PR (Spark connector / Scala only).

/azp run java - cosmos
/azp run java - cosmos - ci

@xinlian12 xinlian12 closed this Jun 5, 2026
@xinlian12 xinlian12 deleted the fix/issue-49380-spark-bounded-changefeed-eof branch June 5, 2026 19:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Cosmos Spark change feed can silently complete below the planned end LSN

1 participant