[SPARK-56304][SQL] Support IF NOT EXISTS for V2 file table INSERT OVERWRITE#55154
Draft
LuciferYang wants to merge 8 commits intoapache:masterfrom
Draft
[SPARK-56304][SQL] Support IF NOT EXISTS for V2 file table INSERT OVERWRITE#55154LuciferYang wants to merge 8 commits intoapache:masterfrom
LuciferYang wants to merge 8 commits intoapache:masterfrom
Conversation
…Frame API writes and delete FallBackFileSourceV2 Key changes: - FileWrite: added partitionSchema, customPartitionLocations, dynamicPartitionOverwrite, isTruncate; path creation and truncate logic; dynamic partition overwrite via FileCommitProtocol - FileTable: createFileWriteBuilder with SupportsDynamicOverwrite and SupportsTruncate; capabilities now include TRUNCATE and OVERWRITE_DYNAMIC; fileIndex skips file existence checks when userSpecifiedSchema is provided (write path) - All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use createFileWriteBuilder with partition/truncate/overwrite support - DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for non-partitioned Append and Overwrite via df.write.save(path) - DataFrameWriter.insertInto: V1 fallback for file sources (TODO: SPARK-56175) - DataFrameWriter.saveAsTable: V1 fallback for file sources (TODO: SPARK-56230, needs StagingTableCatalog) - DataSourceV2Utils.getTableProvider: V1 fallback for file sources (TODO: SPARK-56175) - Removed FallBackFileSourceV2 rule - V2SessionCatalog.createTable: V1 FileFormat data type validation
Contributor
Author
|
The actual change for the current patch is db67614, which is the 8th patch in SPARK-56170 |
…catalog table loading, and gate removal Key changes: - FileTable extends SupportsPartitionManagement with createPartition, dropPartition, listPartitionIdentifiers, partitionSchema - Partition operations sync to catalog metastore (best-effort) - V2SessionCatalog.loadTable returns FileTable instead of V1Table, sets catalogTable and useCatalogFileIndex on FileTable - V2SessionCatalog.getDataSourceOptions includes storage.properties for proper option propagation (header, ORC bloom filter, etc.) - V2SessionCatalog.createTable validates data types via FileTable - FileTable.columns() restores NOT NULL constraints from catalogTable - FileTable.partitioning() falls back to userSpecifiedPartitioning or catalog partition columns - FileTable.fileIndex uses CatalogFileIndex when catalog has registered partitions (custom partition locations) - FileTable.schema checks column name duplication for non-catalog tables only - DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate - DataFrameWriter.insertInto: enabled V2 for file sources - DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230) - ResolveSessionCatalog: V1 fallback for FileTable-backed commands (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition, ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions, DropPartitions, SetTableLocation, CREATE TABLE validation, REPLACE TABLE blocking) - FindDataSourceTable: streaming V1 fallback for FileTable (TODO: SPARK-56233) - DataSource.planForWritingFileFormat: graceful V2 handling
Enable bucketed writes for V2 file tables via catalog BucketSpec. Key changes: - FileWrite: add bucketSpec field, use V1WritesUtils.getWriterBucketSpec() instead of hardcoded None - FileTable: createFileWriteBuilder passes catalogTable.bucketSpec to the write pipeline - FileDataSourceV2: getTable uses collect to skip BucketTransform (handled via catalogTable.bucketSpec instead) - FileWriterFactory: use DynamicPartitionDataConcurrentWriter for bucketed writes since V2's RequiresDistributionAndOrdering cannot express hash-based ordering - All 6 format Write/Table classes updated with BucketSpec parameter Note: bucket pruning and bucket join (read-path optimization) are not included in this patch (tracked under SPARK-56231).
Add RepairTableExec to sync filesystem partition directories with catalog metastore for V2 file tables. Key changes: - New RepairTableExec: scans filesystem partitions via FileTable.listPartitionIdentifiers(), compares with catalog, registers missing partitions and drops orphaned entries - DataSourceV2Strategy: route RepairTable and RecoverPartitions for FileTable to new V2 exec node
Implement SupportsOverwriteV2 for V2 file tables to support static partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...). Key changes: - FileTable: replace SupportsTruncate with SupportsOverwriteV2 on WriteBuilder, implement overwrite(predicates) - FileWrite: extend toBatch() to delete only the matching partition directory, ordered by partitionSchema - FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER - All 6 format Write/Table classes: plumb overwritePredicates parameter This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR adds
IF NOT EXISTSsupport forINSERT OVERWRITE ... PARTITION(...) IF NOT EXISTSon V2 file tables.Previously,
INSERT OVERWRITEwithIF NOT EXISTSalways threwunsupportedIfNotExistsErrorfor all V2 tables. This PR lifts that restriction for tables that implement bothSupportsPartitionManagementandOVERWRITE_BY_FILTER(i.e., V2 file tables like Parquet, ORC, etc.).Key changes:
Analyzer.ResolveInsertInto: Instead of unconditionally rejectingifPartitionNotExists, allow it when the V2 table supports bothOVERWRITE_BY_FILTERandSupportsPartitionManagement. Pass the flag and static partition spec as write options toOverwriteByExpression.DataSourceV2Strategy: Add a new plan rule that interceptsOverwriteByExpressionwithifPartitionNotExists=trueonFileTable. It checks the filesystem for the target partition path — if the partition already exists, the write is skipped (LocalTableScanExec); otherwise, a normalOverwriteByExpressionExecproceeds.Why are the changes needed?
IF NOT EXISTSis a commonly used Hive/SQL idiom for idempotent partition loads. Without this, migrating workloads from V1 file sources to V2 file tables requires rewriting allINSERT OVERWRITE ... IF NOT EXISTSstatements, which is a barrier to V2 adoption.Does this PR introduce any user-facing change?
Yes.
INSERT OVERWRITE TABLE t PARTITION(p=1) IF NOT EXISTS SELECT ...now works on V2 file tables. Previously it threwUnsupportedOperationException.How was this patch tested?
New test in
FileDataSourceV2WriteSuiteWas this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 4.6