Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper

import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`
import scala.collection.mutable
import scala.util.control.NonFatal

// scalastyle:off underscore.import
import com.azure.cosmos.implementation.feedranges._
Expand Down Expand Up @@ -147,6 +148,29 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
.toArray
}

def extractMinLatestLsnFromChangeFeedContinuationOrFallback(continuation: String): Long = {
try {
val continuationTokens = extractContinuationTokensFromChangeFeedStateJson(continuation)

if (continuationTokens.nonEmpty) {
// FeedRangeContinuation.handleFeedRangeGone expands split ranges by adding child tokens, so the
// minimum across the current token set represents the planned feed range after split handling.
continuationTokens.map(_._2).min
} else {
extractLsnFromChangeFeedContinuation(continuation)
}
} catch {
case NonFatal(rangeEnumerationFailure) =>
try {
extractLsnFromChangeFeedContinuation(continuation)
} catch {
case NonFatal(fallbackFailure) =>
rangeEnumerationFailure.addSuppressed(fallbackFailure)
throw rangeEnumerationFailure
}
}
}

private[cosmos] def rangeToNormalizedRange(rangeInput: Range[String]) = {
val range = FeedRangeInternal.normalizeRange(rangeInput)
assert(range != null, "Argument 'range' must not be null.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private case class ChangeFeedPartitionReader
}

private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
log.logInfo(s"Reading from feed range ${partition.feedRange}, startLsn $getPartitionStartLsn, " +
log.logInfo(s"Reading from feed range ${partition.feedRange}, startLsn ${startLsn.map(_.toString).getOrElse("n/a")}, " +
s"endLsn ${partition.endLsn} of " +
s"container ${containerTargetConfig.database}.${containerTargetConfig.container}")
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
Expand Down Expand Up @@ -184,19 +184,17 @@ private case class ChangeFeedPartitionReader
}
}

private def getPartitionStartLsn: Long = {
if (partition.continuationState.isDefined) {
SparkBridgeImplementationInternal.extractLsnFromChangeFeedContinuation(this.partition.continuationState.get)
} else {
0
private def getPartitionStartLsn: Option[Long] = {
partition.continuationState.map { continuationState =>
SparkBridgeImplementationInternal.extractMinLatestLsnFromChangeFeedContinuationOrFallback(continuationState)
}
}

private val changeFeedRequestOptions = {

val startLsn = getPartitionStartLsn
val requestStartLsn = startLsn.map(_.toString).getOrElse("n/a")
log.logDebug(
s"Request options for Range '${partition.feedRange.min}-${partition.feedRange.max}' LSN '$startLsn'")
s"Request options for Range '${partition.feedRange.min}-${partition.feedRange.max}' LSN '$requestStartLsn'")

val options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(this.partition.continuationState.get)
Expand Down Expand Up @@ -263,7 +261,8 @@ private case class ChangeFeedPartitionReader
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple,
this.partition.endLsn
this.partition.endLsn,
startLsn
)

override def next(): Boolean = {
Expand Down Expand Up @@ -294,16 +293,13 @@ private case class ChangeFeedPartitionReader
// for cases where the feed range spans multiple physical partitions
// pick the smallest lsn
Some(SparkBridgeImplementationInternal
.extractContinuationTokensFromChangeFeedStateJson(continuationToken)
.minBy(_._2)._2)
.extractMinLatestLsnFromChangeFeedContinuationOrFallback(continuationToken))
case None =>
// for change feed, we would only reach here before the first page got fetched
// fallback to use the continuation token from the partition instead
Some(SparkBridgeImplementationInternal
.extractContinuationTokensFromChangeFeedStateJson(partition.continuationState.get)
.minBy(_._2)._2)
startLsn
}

if (latestLsnOpt.isDefined) latestLsnOpt.get - startLsn else 0
latestLsnOpt.flatMap(latestLsn => startLsn.map(latestLsn - _)).getOrElse(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private case class ItemsPartitionReader
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple,
None,
None
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.azure.cosmos.util.{CosmosPagedFlux, CosmosPagedIterable}
import java.util.concurrent.{ExecutorService, SynchronousQueue, ThreadPoolExecutor, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.util.Random
import scala.util.control.Breaks
import scala.util.control.{Breaks, NonFatal}
import scala.concurrent.{Await, ExecutionContext, Future}
import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, OperationCancelledException, SparkBridgeImplementationInternal}

Expand Down Expand Up @@ -41,7 +41,8 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
val pageSize: Int,
val pagePrefetchBufferSize: Int,
val operationContextAndListener: Option[OperationContextAndListenerTuple],
val endLsn: Option[Long]
val endLsn: Option[Long],
val startLsn: Option[Long]
) extends BufferedIterator[TSparkRow] with BasicLoggingTrait with AutoCloseable {

private[spark] var maxRetryIntervalInMs = CosmosConstants.maxRetryIntervalForTransientFailuresInMs
Expand Down Expand Up @@ -73,11 +74,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
}

def getLatestContinuationToken: Option[String] = {
if (lastContinuationToken == null) {
None
} else {
Some(lastContinuationToken.get())
}
Option(lastContinuationToken.get())
}

override def hasNext: Boolean = {
Expand Down Expand Up @@ -177,6 +174,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
None
}
} else {
validateEofProgressOrThrow()
Some(false)
}
}
Expand Down Expand Up @@ -237,6 +235,49 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
}
}

private[this] def validateEofProgressOrThrow(): Unit = {
endLsn.foreach { targetEndLsn =>
val latestMinLsn = try {
Option(lastContinuationToken.get())
.map(SparkBridgeImplementationInternal.extractMinLatestLsnFromChangeFeedContinuationOrFallback)
} catch {
case NonFatal(parseFailure) =>
val message = s"Continuation token parse failure - treating EOF as inconclusive. " +
s"startLsn: ${formatLsn(startLsn)}, endLsn: $targetEndLsn, " +
s"totalChangesCnt: ${totalChangesCnt.get()}, Context: $operationContextString"
val exception = new OperationCancelledException(message, null)
exception.addSuppressed(parseFailure)
logError(message, exception)
throw exception
}

val isEofValid = latestMinLsn match {
case Some(observedLsn) => observedLsn >= targetEndLsn
case None => startLsn.contains(targetEndLsn)
}

if (!isEofValid) {
val observedLsnText = latestMinLsn.map(_.toString).getOrElse("no page consumed")
val message = s"Bounded change feed read reached EOF before planned endLsn. " +
s"startLsn: ${formatLsn(startLsn)}, endLsn: $targetEndLsn, " +
s"observed minLatestLsn: $observedLsnText, totalChangesCnt: ${totalChangesCnt.get()}, " +
s"Context: $operationContextString. If this occurred during Spark task cancellation/decommission, " +
s"expect the task to retry from the last committed checkpoint. Continuation tokens are expected " +
s"to preserve split child ranges; range-set shrinkage is undefined behavior."
val exception = new OperationCancelledException(message, null)

// TODO: Consider moving bounded change-feed EOF validation into a dedicated decorator and short-circuiting
// deterministic zero-progress retries once this policy is separated from transient I/O retry handling.
logError(message, exception)
throw exception
}
}
}

private[this] def formatLsn(lsn: Option[Long]): String = {
lsn.map(_.toString).getOrElse("n/a")
}

// Clean up iterator references - the underlying Reactor subscription
// from Flux.toIterable() will be cleaned up when the iterator is GC'd
override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class TransientIOErrorsRetryingIteratorITest
2,
Queues.XS_BUFFER_SIZE,
None,
None,
None
)
retryingIterator.maxRetryIntervalInMs = 5
Expand Down Expand Up @@ -255,6 +256,7 @@ class TransientIOErrorsRetryingIteratorITest
2,
Queues.XS_BUFFER_SIZE,
None,
None,
None
)
retryingIterator.maxRetryIntervalInMs = 5
Expand Down
Loading
Loading