Skip to content

Commit fdd8c78

Browse files
committed
[SPARK-56316][SQL] Support static partition overwrite for V2 file tables
Implement SupportsOverwriteV2 for V2 file tables to support static partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...). Key changes: - FileTable: replace SupportsTruncate with SupportsOverwriteV2 on WriteBuilder, implement overwrite(predicates) - FileWrite: extend toBatch() to delete only the matching partition directory, ordered by partitionSchema - FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER - All 6 format Write/Table classes: plumb overwritePredicates parameter This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
1 parent 90a506c commit fdd8c78

15 files changed

Lines changed: 103 additions & 22 deletions

File tree

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ case class AvroTable(
4444

4545
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
4646
createFileWriteBuilder(info) {
47-
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate) =>
47+
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate, overPreds) =>
4848
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, bSpec,
49-
customLocs, dynamicOverwrite, truncate)
49+
overPreds, customLocs, dynamicOverwrite, truncate)
5050
}
5151
}
5252

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.hadoop.mapreduce.Job
2020

2121
import org.apache.spark.sql.avro.AvroUtils
2222
import org.apache.spark.sql.catalyst.catalog.BucketSpec
23+
import org.apache.spark.sql.connector.expressions.filter.Predicate
2324
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2425
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
2526
import org.apache.spark.sql.execution.datasources.v2.FileWrite
@@ -33,6 +34,7 @@ case class AvroWrite(
3334
info: LogicalWriteInfo,
3435
partitionSchema: StructType,
3536
override val bucketSpec: Option[BucketSpec] = None,
37+
override val overwritePredicates: Option[Array[Predicate]] = None,
3638
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
3739
override val dynamicPartitionOverwrite: Boolean,
3840
override val isTruncate: Boolean) extends FileWrite {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
3131
Table, TableCapability}
3232
import org.apache.spark.sql.connector.catalog.TableCapability._
3333
import org.apache.spark.sql.connector.expressions.Transform
34+
import org.apache.spark.sql.connector.expressions.filter.{AlwaysTrue, Predicate}
3435
import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
3536
LogicalWriteInfoImpl, SupportsDynamicOverwrite,
36-
SupportsTruncate, Write, WriteBuilder}
37+
SupportsOverwriteV2, Write, WriteBuilder}
3738
import org.apache.spark.sql.errors.QueryCompilationErrors
3839
import org.apache.spark.sql.execution.datasources._
3940
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
@@ -308,19 +309,23 @@ abstract class FileTable(
308309
buildWrite: (LogicalWriteInfo, StructType,
309310
Option[BucketSpec],
310311
Map[Map[String, String], String],
311-
Boolean, Boolean) => Write
312+
Boolean, Boolean,
313+
Option[Array[Predicate]]) => Write
312314
): WriteBuilder = {
313-
new WriteBuilder with SupportsDynamicOverwrite with SupportsTruncate {
315+
new WriteBuilder with SupportsDynamicOverwrite
316+
with SupportsOverwriteV2 {
314317
private var isDynamicOverwrite = false
315-
private var isTruncate = false
318+
private var overwritePredicates
319+
: Option[Array[Predicate]] = None
316320

317321
override def overwriteDynamicPartitions(): WriteBuilder = {
318322
isDynamicOverwrite = true
319323
this
320324
}
321325

322-
override def truncate(): WriteBuilder = {
323-
isTruncate = true
326+
override def overwrite(
327+
predicates: Array[Predicate]): WriteBuilder = {
328+
overwritePredicates = Some(predicates)
324329
this
325330
}
326331

@@ -362,10 +367,13 @@ abstract class FileTable(
362367
.getOrElse(fromIndex)
363368
}
364369
val bSpec = catalogTable.flatMap(_.bucketSpec)
370+
val isTruncate = overwritePredicates.exists(
371+
_.exists(_.isInstanceOf[AlwaysTrue]))
365372
val customLocs = getCustomPartitionLocations(
366373
partSchema)
367374
buildWrite(merged, partSchema, bSpec,
368-
customLocs, isDynamicOverwrite, isTruncate)
375+
customLocs, isDynamicOverwrite, isTruncate,
376+
overwritePredicates)
369377
}
370378
}
371379
}
@@ -577,7 +585,8 @@ abstract class FileTable(
577585

578586
object FileTable {
579587
private val CAPABILITIES = util.EnumSet.of(
580-
BATCH_READ, BATCH_WRITE, TRUNCATE, OVERWRITE_DYNAMIC)
588+
BATCH_READ, BATCH_WRITE, TRUNCATE,
589+
OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC)
581590

582591
/** Option key for injecting stored row count from ANALYZE TABLE into FileScan. */
583592
val NUM_ROWS_KEY: String = "__numRows"

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3434
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
3535
import org.apache.spark.sql.connector.expressions.{Expressions, SortDirection}
3636
import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
37+
import org.apache.spark.sql.connector.expressions.filter.Predicate
3738
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, RequiresDistributionAndOrdering, Write}
3839
import org.apache.spark.sql.errors.QueryCompilationErrors
3940
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, V1WritesUtils, WriteJobDescription}
@@ -52,6 +53,7 @@ trait FileWrite extends Write
5253
def info: LogicalWriteInfo
5354
def partitionSchema: StructType
5455
def bucketSpec: Option[BucketSpec] = None
56+
def overwritePredicates: Option[Array[Predicate]] = None
5557
def customPartitionLocations: Map[Map[String, String], String] = Map.empty
5658
def dynamicPartitionOverwrite: Boolean = false
5759
def isTruncate: Boolean = false
@@ -93,15 +95,31 @@ trait FileWrite extends Write
9395
fs.mkdirs(qualifiedPath)
9496
}
9597

96-
// For truncate (full overwrite), delete existing data before writing.
9798
if (isTruncate && fs.exists(qualifiedPath)) {
99+
// Full overwrite: delete all non-hidden data
98100
fs.listStatus(qualifiedPath).foreach { status =>
99-
// Preserve hidden files/dirs (e.g., _SUCCESS, .spark-staging-*)
100101
if (!status.getPath.getName.startsWith("_") &&
101102
!status.getPath.getName.startsWith(".")) {
102103
fs.delete(status.getPath, true)
103104
}
104105
}
106+
} else if (overwritePredicates.exists(_.nonEmpty) &&
107+
fs.exists(qualifiedPath)) {
108+
// Static partition overwrite: delete only matching partition dir.
109+
// Extract partition spec from predicates and order by
110+
// partitionSchema to match the directory structure.
111+
val specMap = overwritePredicates.get
112+
.flatMap(FileWrite.predicateToPartitionSpec)
113+
.toMap
114+
if (specMap.nonEmpty) {
115+
val partPath = partitionSchema.fieldNames
116+
.flatMap(col => specMap.get(col).map(v => s"$col=$v"))
117+
.mkString("/")
118+
val targetPath = new Path(qualifiedPath, partPath)
119+
if (fs.exists(targetPath)) {
120+
fs.delete(targetPath, true)
121+
}
122+
}
105123
}
106124

107125
val job = getJobInstance(hadoopConf, path)
@@ -219,3 +237,31 @@ trait FileWrite extends Write
219237
}
220238
}
221239

240+
private[v2] object FileWrite {
241+
/**
242+
* Extract a (column, value) pair from a V2 equality
243+
* predicate (e.g., `p <=> 1` => `("p", "1")`).
244+
*/
245+
def predicateToPartitionSpec(
246+
predicate: Predicate): Option[(String, String)] = {
247+
if (predicate.name() == "=" || predicate.name() == "<=>") {
248+
val children = predicate.children()
249+
if (children.length == 2) {
250+
val name = children(0) match {
251+
case ref: org.apache.spark.sql.connector
252+
.expressions.NamedReference =>
253+
Some(ref.fieldNames().head)
254+
case _ => None
255+
}
256+
val value = children(1) match {
257+
case lit: org.apache.spark.sql.connector
258+
.expressions.Literal[_] =>
259+
Some(lit.value().toString)
260+
case _ => None
261+
}
262+
for (n <- name; v <- value) yield (n, v)
263+
} else None
264+
} else None
265+
}
266+
}
267+

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ case class CSVTable(
5252

5353
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
5454
createFileWriteBuilder(info) {
55-
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate) =>
55+
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate, overPreds) =>
5656
CSVWrite(paths, formatName, supportsWriteDataType, mergedInfo, partSchema, bSpec,
57-
customLocs, dynamicOverwrite, truncate)
57+
overPreds, customLocs, dynamicOverwrite, truncate)
5858
}
5959
}
6060

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2121
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2222
import org.apache.spark.sql.catalyst.csv.CSVOptions
2323
import org.apache.spark.sql.catalyst.util.CompressionCodecs
24+
import org.apache.spark.sql.connector.expressions.filter.Predicate
2425
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2526
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
2627
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
@@ -35,6 +36,7 @@ case class CSVWrite(
3536
info: LogicalWriteInfo,
3637
partitionSchema: StructType,
3738
override val bucketSpec: Option[BucketSpec] = None,
39+
override val overwritePredicates: Option[Array[Predicate]] = None,
3840
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
3941
override val dynamicPartitionOverwrite: Boolean,
4042
override val isTruncate: Boolean) extends FileWrite {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ case class JsonTable(
5151

5252
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
5353
createFileWriteBuilder(info) {
54-
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate) =>
54+
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate, overPreds) =>
5555
JsonWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, bSpec,
56-
customLocs, dynamicOverwrite, truncate)
56+
overPreds, customLocs, dynamicOverwrite, truncate)
5757
}
5858
}
5959

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2121
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2222
import org.apache.spark.sql.catalyst.json.JSONOptions
2323
import org.apache.spark.sql.catalyst.util.CompressionCodecs
24+
import org.apache.spark.sql.connector.expressions.filter.Predicate
2425
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2526
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
2627
import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
@@ -35,6 +36,7 @@ case class JsonWrite(
3536
info: LogicalWriteInfo,
3637
partitionSchema: StructType,
3738
override val bucketSpec: Option[BucketSpec] = None,
39+
override val overwritePredicates: Option[Array[Predicate]] = None,
3840
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
3941
override val dynamicPartitionOverwrite: Boolean,
4042
override val isTruncate: Boolean) extends FileWrite {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ case class OrcTable(
4545

4646
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
4747
createFileWriteBuilder(info) {
48-
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate) =>
48+
(mergedInfo, partSchema, bSpec, customLocs, dynamicOverwrite, truncate, overPreds) =>
4949
OrcWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, bSpec,
50-
customLocs, dynamicOverwrite, truncate)
50+
overPreds, customLocs, dynamicOverwrite, truncate)
5151
}
5252
}
5353

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
2222
import org.apache.orc.mapred.OrcStruct
2323

2424
import org.apache.spark.sql.catalyst.catalog.BucketSpec
25+
import org.apache.spark.sql.connector.expressions.filter.Predicate
2526
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2627
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
2728
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcOutputWriter, OrcUtils}
@@ -36,6 +37,7 @@ case class OrcWrite(
3637
info: LogicalWriteInfo,
3738
partitionSchema: StructType,
3839
override val bucketSpec: Option[BucketSpec] = None,
40+
override val overwritePredicates: Option[Array[Predicate]] = None,
3941
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
4042
override val dynamicPartitionOverwrite: Boolean,
4143
override val isTruncate: Boolean) extends FileWrite {

0 commit comments

Comments
 (0)