Skip to content

Commit 34fa7e4

Browse files
committed
add_support_partition_overwrite_mode
1 parent 23f9194 commit 34fa7e4

3 files changed

Lines changed: 36 additions & 6 deletions

File tree

native/proto/src/proto/operator.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ message ParquetWriter {
245245
optional string job_id = 6;
246246
// Task attempt ID for this specific task
247247
optional int32 task_attempt_id = 7;
248+
// set of partition columns
249+
repeated string partition_columns = 8;
248250
}
249251

250252
enum AggregateMode {

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec
2929
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec}
3030
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3131
import org.apache.spark.sql.internal.SQLConf
32+
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
3233

3334
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
3435
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -62,7 +63,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
6263
}
6364

6465
if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
65-
return Unsupported(Some("Partitioned writes are not supported"))
66+
return Incompatible(Some("Partitioned writes are not supported"))
6667
}
6768

6869
if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) {
@@ -167,6 +168,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
167168
other
168169
}
169170

171+
val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty &&
172+
SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
173+
170174
// Create FileCommitProtocol for atomic writes
171175
val jobId = java.util.UUID.randomUUID().toString
172176
val committer =
@@ -178,11 +182,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
178182
committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean])
179183
Some(
180184
constructor
181-
.newInstance(
182-
jobId,
183-
outputPath,
184-
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
185-
)
185+
.newInstance(jobId, outputPath, isDynamicOverWriteMode)
186186
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
187187
} catch {
188188
case e: Exception =>

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,32 @@ class CometParquetWriterSuite extends CometTestBase {
228228
}
229229
}
230230
}
231+
232+
test("parquet write with mode overwrite") {
233+
withTempPath { dir =>
234+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
235+
236+
withTempPath { inputDir =>
237+
val inputPath = createTestData(inputDir)
238+
239+
withSQLConf(
240+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
241+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
242+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
243+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
244+
245+
val df = spark.read.parquet(inputPath)
246+
247+
// First write
248+
df.repartition(2).write.parquet(outputPath)
249+
// verifyWrittenFile(outputPath)
250+
// Second write (with overwrite mode and a different record count to make sure we are not reading the same data)
251+
df.limit(500).repartition(2).write.mode("overwrite").parquet(outputPath)
252+
// // Verify the data was written
253+
val resultDf = spark.read.parquet(outputPath)
254+
assert(resultDf.count() == 500, "Expected 1000 rows after overwrite")
255+
}
256+
}
257+
}
258+
}
231259
}

0 commit comments

Comments
 (0)