Skip to content

Commit 3d46dbd

Browse files
committed
[SPARK-56304][SQL] V2 ifPartitionNotExists support for file table INSERT INTO
1 parent fdd8c78 commit 3d46dbd

3 files changed

Lines changed: 70 additions & 7 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,14 +1159,16 @@ class Analyzer(
11591159
throw QueryCompilationErrors.unsupportedInsertReplaceOnOrUsing(
11601160
i.table.asInstanceOf[DataSourceV2Relation].table.name())
11611161

1162-
case i: InsertIntoStatement
1163-
if i.table.isInstanceOf[DataSourceV2Relation] &&
1164-
i.query.resolved &&
1165-
i.replaceCriteriaOpt.isEmpty =>
1166-
val r = i.table.asInstanceOf[DataSourceV2Relation]
1167-
// ifPartitionNotExists is append with validation, but validation is not supported
1162+
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _, _)
1163+
if i.query.resolved && i.replaceCriteriaOpt.isEmpty =>
1164+
// SPARK-56304: allow ifPartitionNotExists for tables that
1165+
// support partition management and overwrite-by-filter
11681166
if (i.ifPartitionNotExists) {
1169-
throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
1167+
val caps = r.table.capabilities
1168+
if (!caps.contains(TableCapability.OVERWRITE_BY_FILTER) ||
1169+
!r.table.isInstanceOf[SupportsPartitionManagement]) {
1170+
throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
1171+
}
11701172
}
11711173

11721174
// Create a project if this is an INSERT INTO BY NAME query.
@@ -1209,17 +1211,27 @@ class Analyzer(
12091211
withSchemaEvolution = i.withSchemaEvolution)
12101212
}
12111213
} else {
1214+
val extraOpts = if (i.ifPartitionNotExists) {
1215+
Map("ifPartitionNotExists" -> "true") ++
1216+
staticPartitions.map { case (k, v) =>
1217+
s"__staticPartition.$k" -> v
1218+
}
1219+
} else {
1220+
Map.empty[String, String]
1221+
}
12121222
if (isByName) {
12131223
OverwriteByExpression.byName(
12141224
table = r,
12151225
df = query,
12161226
deleteExpr = staticDeleteExpression(r, staticPartitions),
1227+
writeOptions = extraOpts,
12171228
withSchemaEvolution = i.withSchemaEvolution)
12181229
} else {
12191230
OverwriteByExpression.byPosition(
12201231
table = r,
12211232
query = query,
12221233
deleteExpr = staticDeleteExpression(r, staticPartitions),
1234+
writeOptions = extraOpts,
12231235
withSchemaEvolution = i.withSchemaEvolution)
12241236
}
12251237
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,31 @@ class DataSourceV2Strategy(session: SparkSession)
409409
v1, v2Write.getClass.getName, classOf[V1Write].getName)
410410
}
411411

412+
// SPARK-56304: skip write if target partition already exists
413+
case OverwriteByExpression(
414+
r: DataSourceV2Relation, _, query, writeOptions, _, _, Some(write), _)
415+
if writeOptions.getOrElse("ifPartitionNotExists", "false") == "true"
416+
&& r.table.isInstanceOf[FileTable] =>
417+
val ft = r.table.asInstanceOf[FileTable]
418+
val prefix = "__staticPartition."
419+
val staticSpec = writeOptions
420+
.filter(_._1.startsWith(prefix))
421+
.map { case (k, v) => k.stripPrefix(prefix) -> v }
422+
// Check filesystem for partition existence
423+
val partPath = ft.partitionSchema().fieldNames
424+
.flatMap(col => staticSpec.get(col).map(v => s"$col=$v"))
425+
.mkString("/")
426+
val rootPath = ft.fileIndex.rootPaths.head
427+
val hadoopConf = session.sessionState.newHadoopConf()
428+
val fs = rootPath.getFileSystem(hadoopConf)
429+
val targetPath = new Path(rootPath, partPath)
430+
if (partPath.nonEmpty && fs.exists(targetPath)) {
431+
LocalTableScanExec(Nil, Nil, None) :: Nil
432+
} else {
433+
OverwriteByExpressionExec(
434+
planLater(query), refreshCache(r), write) :: Nil
435+
}
436+
412437
case OverwriteByExpression(
413438
r: DataSourceV2Relation, _, query, _, _, _, Some(write), _) =>
414439
OverwriteByExpressionExec(planLater(query), refreshCache(r), write) :: Nil

sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2WriteSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,32 @@ class FileDataSourceV2WriteSuite extends QueryTest with SharedSparkSession {
723723
}
724724
}
725725

726+
test("SPARK-56304: INSERT OVERWRITE IF NOT EXISTS skips when partition exists") {
727+
withTable("t") {
728+
sql("CREATE TABLE t (id BIGINT, part INT)" +
729+
" USING parquet PARTITIONED BY (part)")
730+
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
731+
// IF NOT EXISTS: partition part=1 exists, should skip
732+
sql("INSERT OVERWRITE TABLE t PARTITION(part=1) IF NOT EXISTS" +
733+
" SELECT 999")
734+
checkAnswer(
735+
sql("SELECT * FROM t ORDER BY part"),
736+
Seq(Row(1, 1), Row(2, 2)))
737+
// Without IF NOT EXISTS: partition part=1 is replaced
738+
sql("INSERT OVERWRITE TABLE t PARTITION(part=1)" +
739+
" SELECT 999")
740+
checkAnswer(
741+
sql("SELECT * FROM t ORDER BY part"),
742+
Seq(Row(999, 1), Row(2, 2)))
743+
// IF NOT EXISTS: partition part=3 does not exist, should write
744+
sql("INSERT OVERWRITE TABLE t PARTITION(part=3) IF NOT EXISTS" +
745+
" SELECT 300")
746+
checkAnswer(
747+
sql("SELECT * FROM t ORDER BY part"),
748+
Seq(Row(999, 1), Row(2, 2), Row(300, 3)))
749+
}
750+
}
751+
726752
test("SELECT FROM format.path uses V2 path") {
727753
Seq("parquet", "orc", "json").foreach { format =>
728754
withTempPath { path =>

0 commit comments

Comments
 (0)