fix(cosmos-spark): validate bounded change feed EOF against planned endLsn (#49380)#49390
Closed
xinlian12 wants to merge 4 commits into
Closed
fix(cosmos-spark): validate bounded change feed EOF against planned endLsn (#49380)#49390xinlian12 wants to merge 4 commits into
xinlian12 wants to merge 4 commits into
Conversation
…ndLsn When TransientIOErrorsRetryingIterator reaches EOF on a bounded change feed read, validate that the latest continuation LSN (min across ranges) has reached the planned endLsn. If not, throw OperationCancelledException so the existing transient-IO retry path can recover or fail the task, preventing silent commits of unread bounded intervals. Fixes Azure#49380 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Covers issue Azure#49380 validation table rows 2-6, plus retry-path and row-level upper-bound regression checks for TransientIOErrorsRetryingIterator. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…er consolidation + diagnostics - F1: catch NonFatal in extractLatestMinLsnFromContinuation, wrap as retryable OperationCancelledException - F2: drop startLsn default; require callers to pass it explicitly - F3: consolidate min-LSN-across-ranges parsing into SparkBridgeImplementationInternal - F4: logWarning on first under-run with planned vs observed LSN - F6: tighten getLatestContinuationToken to Option(lastContinuationToken.get()) - F7: logError before throw + clearer diagnostic context - F9: render startLsn/endLsn without raw Option wrapper in exception text - S2: verified continuation includes both child ranges after split (see code comment) - S7: add cancellation-hint to exception message - F8/S8 test additions: malformed continuation, multi-page progression, degenerate endLsn==startLsn with continuation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Removed redundant logWarning before logError on EOF under-run path (deep reviewer F11; previously logged the same message twice and could double-count on log-based alerting). - Inlined SparkBridgeImplementationInternal.extractMinLatestLsnFrom- ChangeFeedContinuationOrFallback at the call site and removed the one-line extractLatestMinLsnFromContinuation private wrapper (deep reviewer F12). - Skeptic Lens NonFatal/InterruptedException concern verified factually incorrect: scala.util.control.NonFatal.apply explicitly excludes InterruptedException, so the wrap cannot convert cancellation into a retry loop. - Caller-site audit verified: all 4 production + 2 test call sites pass startLsn explicitly. Shaded azure-cosmos-spark_4-1_2-13 uses generated target/shared-sources/ mirrors that sync automatically. 17/17 unit tests still pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Member
Author
|
/azp run java - cosmos |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Member
Author
|
Retrying CI: the two failing checks are unrelated to this PR (Spark connector / Scala only).
/azp run java - cosmos |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes a silent data-loss path in the Cosmos DB Spark change feed connector. For bounded change feed reads,
TransientIOErrorsRetryingIteratornow validates that the latest continuation LSN has reached the plannedendLsnbefore reporting end-of-stream, and throws a retryableOperationCancelledExceptionotherwise so the existing transient-IO retry path can recover (or fail the Spark task instead of silently committing).Issue
Fixes #49380
Root Cause
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala::hasNextInternalCoreWhen the underlying
feedResponseIteratorreturnsfalsefromhasNext(publisher terminated), the iterator returnedSome(false)unconditionally.validateNextLsnguards the upper edge for records (no_lsn > endLsnis returned), but no symmetric check exists for the lower edge of EOF: if the publisher terminates with the latest continuation LSN still belowendLsn, Spark sees a clean end-of-stream and commits the planned end offset.Triggers for the underlying premature publisher termination can include slow replicas / slow regions in the Cosmos service, paging, Reactor, networking, or cancellation interactions. The fix is independent of the trigger: the connector must defend against EOF below the planned
endLsn.Fix
Added a
validateEofProgressOrThrow()helper invoked insidehasNextInternalCoreimmediately before returningSome(false)on underlying EOF:endLsn = None): returns immediately. Behavior is unchanged.SparkBridgeImplementationInternal.extractContinuationTokensFromChangeFeedStateJsonand takes the minimum LSN across all ranges. EOF is allowed only whenminLatestLsn >= endLsn. The min-across-ranges semantics matches the issue's validation table for multi-range continuations.startLsn == endLsn(legitimate empty bounded interval).startLsnis plumbed in via a new additive constructor parameter on the iterator and derived consistently inChangeFeedPartitionReader.OperationCancelledException, which extendsCosmosExceptionwith timeout status.Exceptions.canBeTransientFailuretreats it as transient, so the iterator's existingexecuteWithRetryloop retries up tomaxRetryCountand then propagates so the Spark task fails — preventing the silent checkpoint advance.Alternatives considered (and rejected): a boolean-returning validator (would still need to convert to retryable exception); a Reactor-side sentinel wrapper (more invasive, risks altering prefetch/subscription); using only the current-continuation helper (misses multi-range lag).
How the Fix Works
Testing
TransientIOErrorsRetryingIteratorSpeccontinue to pass (unbounded path is unaffected).endLsn=20, final continuation at LSN15⇒OperationCancelledExceptionthrown (previously returnedfalsesilently).Risk Assessment
None; default behavior for unbounded reads is unchanged.Cross-SDK Impact
Populated by the orchestrator after Phase 7.