Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,19 @@ object State extends Logging {
private def reset(flow: ResolvedFlow, env: PipelineUpdateContext, graph: DataflowGraph): Unit = {
logInfo(log"Clearing out state for flow ${MDC(LogKeys.FLOW_NAME, flow.displayName)}")
val flowMetadata = FlowSystemMetadata(env, flow, graph)
flow match {
case f if flowMetadata.latestCheckpointLocationOpt().isEmpty =>
logInfo(
s"Skipping resetting flow ${f.identifier} since its destination not been previously" +
s"materialized and we can't find the checkpoint location."
)
case _ =>
val hadoopConf = env.spark.sessionState.newHadoopConf()
val hadoopConf = env.spark.sessionState.newHadoopConf()

// Write a new checkpoint folder if needed
val checkpointDir = new Path(flowMetadata.latestCheckpointLocation)
val fs1 = checkpointDir.getFileSystem(hadoopConf)
if (fs1.exists(checkpointDir)) {
val nextVersion = checkpointDir.getName.toInt + 1
val nextPath = new Path(checkpointDir.getParent, nextVersion.toString)
fs1.mkdirs(nextPath)
logInfo(
log"Created new checkpoint for stream ${MDC(LogKeys.FLOW_NAME, flow.displayName)} " +
log"at ${MDC(LogKeys.CHECKPOINT_PATH, nextPath.toString)}."
)
}
// Write a new checkpoint folder if needed
val checkpointDir = new Path(flowMetadata.latestCheckpointLocation)
val fs1 = checkpointDir.getFileSystem(hadoopConf)
if (fs1.exists(checkpointDir)) {
val nextVersion = checkpointDir.getName.toInt + 1
val nextPath = new Path(checkpointDir.getParent, nextVersion.toString)
fs1.mkdirs(nextPath)
logInfo(
log"Created new checkpoint for stream ${MDC(LogKeys.FLOW_NAME, flow.displayName)} " +
log"at ${MDC(LogKeys.CHECKPOINT_PATH, nextPath.toString)}."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,33 @@ case class FlowSystemMetadata(
) extends SystemMetadata with Logging {

/**
* Returns the checkpoint root directory for a given flow
* which is storage/_checkpoints/flow_destination_table/flow_name.
* @return the checkpoint root directory for `flow`
* @return the checkpoint root directory for this flow
* (of the form storage/_checkpoints/flow_destination_table/flow_name)
* @throws IllegalArgumentException when the flow's destination is neither a table nor sink
*/
def flowCheckpointsDirOpt(): Option[Path] = {
Option(if (graph.table.contains(flow.destinationIdentifier) ||
graph.sink.contains(flow.destinationIdentifier)) {
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
// Different tables in the pipeline can have flows with the same name, so we include
// the table's fully qualified identifier in the path to avoid collisions.
val flowTableId = tableIdentifierToPathString(flow.destinationIdentifier)
val flowName = flow.identifier.table
val checkpointDir = new Path(
new Path(checkpointRoot, flowTableId),
flowName
)
logInfo(
log"Flow ${MDC(LogKeys.FLOW_NAME, flowName)} using checkpoint " +
log"directory: ${MDC(LogKeys.CHECKPOINT_PATH, checkpointDir)}"
)
checkpointDir
} else {
def flowCheckpointsDir(): Path = {
def isTableOrSink(destination: TableIdentifier): Boolean = {
graph.table.contains(destination) || graph.sink.contains(destination)
}
if (!isTableOrSink(flow.destinationIdentifier)) {
throw new IllegalArgumentException(
s"Flow ${flow.identifier} does not have a valid destination for checkpoints."
)
})
}
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
// Different tables in the pipeline can have flows with the same name, so we include
// the table's fully qualified identifier in the path to avoid collisions.
val flowTableId = tableIdentifierToPathString(flow.destinationIdentifier)
val flowName = flow.identifier.table
val checkpointDir = new Path(
new Path(checkpointRoot, flowTableId),
flowName
)
logInfo(
log"Flow ${MDC(LogKeys.FLOW_NAME, flowName)} using checkpoint " +
log"directory: ${MDC(LogKeys.CHECKPOINT_PATH, checkpointDir)}"
)
checkpointDir
}

/**
Expand All @@ -74,19 +75,9 @@ case class FlowSystemMetadata(

/** Returns the location for the most recent checkpoint of a given flow. */
def latestCheckpointLocation: String = {
val checkpointsDir = flowCheckpointsDirOpt().get
val checkpointsDir = flowCheckpointsDir()
SystemMetadata.getLatestCheckpointDir(checkpointsDir)
}

/**
* Same as [[latestCheckpointLocation]] but returns None if the flow checkpoints directory
* does not exist.
*/
def latestCheckpointLocationOpt(): Option[String] = {
flowCheckpointsDirOpt().map { flowCheckpointsDir =>
SystemMetadata.getLatestCheckpointDir(flowCheckpointsDir)
}
}
}

object SystemMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class SystemMetadataSuite
val stSystemMetadata = FlowSystemMetadata(updateContext, stFlow, graph)
val schema2StSystemMetadata = FlowSystemMetadata(updateContext, schema2StFlow, graph)
assert(
stSystemMetadata.flowCheckpointsDirOpt() != schema2StSystemMetadata.flowCheckpointsDirOpt()
stSystemMetadata.flowCheckpointsDir() != schema2StSystemMetadata.flowCheckpointsDir()
)
}
}
Expand Down