From 02ad6ba38da22bfc183aa81463d32ad2d8421cde Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 15:58:30 +0800 Subject: [PATCH 1/9] Supoort adding columns before partition Option. --- .../java/org/apache/paimon/CoreOptions.java | 10 ++ .../apache/paimon/schema/SchemaManager.java | 19 +++ .../paimon/flink/SchemaChangeITCase.java | 144 ++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 70e50efe9c83..0d3943007c89 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2035,6 +2035,16 @@ public InlineElement getDescription() { "If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. " + "Users can enable this option to disable explicit type casting"); + public static final ConfigOption ADD_COLUMN_BEFORE_PARTITION = + ConfigOptions.key("add-column-before-partition") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, when adding a new column without specifying a position, " + + "the column will be placed before the first partition column " + + "instead of at the end of the schema. " + + "This only takes effect for partitioned tables."); + public static final ConfigOption COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT = ConfigOptions.key("commit.strict-mode.last-safe-snapshot") .longType() diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index e726b803268d..81585a1d44c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -293,6 +293,14 @@ public static TableSchema generateTableSchema( CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING .defaultValue() .toString())); + + boolean addColumnBeforePartition = + Boolean.parseBoolean( + oldOptions.getOrDefault( + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + List partitionKeys = oldTableSchema.partitionKeys(); + List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); @@ -368,6 +376,17 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } + } else if (addColumnBeforePartition + && !partitionKeys.isEmpty() + && addColumn.fieldNames().length == 1) { + int insertIndex = newFields.size(); + for (int i = 0; i < newFields.size(); i++) { + if (partitionKeys.contains(newFields.get(i).name())) { + insertIndex = i; + break; + } + } + newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 9084b55d60e5..8e6478afd842 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1578,4 +1578,148 @@ public void testDisableExplicitTypeCasting(String formatType) { sql("ALTER TABLE T MODIFY v INT"); assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } + + @Test + public void testAddColumnBeforePartitionEnabled() { + sql( + "CREATE TABLE T_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add column without specifying position + sql("ALTER TABLE T_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + + sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + } + + @Test + public void testAddColumnBeforePartitionDisabledByDefault() { + sql( + "CREATE TABLE T_PART_DEFAULT (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // Add column without specifying position (default behavior) + sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // score should be appended at the end + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `score` DOUBLE"); + } + + @Test + public void testAddColumnBeforePartitionWithExplicitPosition() { + sql( + "CREATE TABLE T_PART_POS (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add column with explicit FIRST position, should respect explicit position + sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + + List result = sql("SHOW CREATE TABLE T_PART_POS"); + assertThat(result.toString()) + .contains( + "`score` DOUBLE,\n" + + " `user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionViaAlterOption() { + sql( + "CREATE TABLE T_PART_ALTER (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // First add column without config (default: append at end) + sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + + // Enable config via ALTER TABLE SET + sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + + // Now add another column, should go before partition column dt + sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + } + + @Test + public void testAddMultipleColumnsBeforePartition() { + sql( + "CREATE TABLE T_PART_MULTI (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add first column + sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // Add second column + sql("ALTER TABLE T_PART_MULTI ADD col2 DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // Both new columns should be before partition columns dt and hh + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + } } From 9690ce450f4beb348679c2893e81a47074d6924f Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 16:32:40 +0800 Subject: [PATCH 2/9] test --- .../java/org/apache/paimon/flink/SchemaChangeITCase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 8e6478afd842..7ea82ea49b26 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1709,7 +1709,7 @@ public void testAddMultipleColumnsBeforePartition() { // Add first column sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); // Add second column - sql("ALTER TABLE T_PART_MULTI ADD col2 DOUBLE"); + sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); List result = sql("SHOW CREATE TABLE T_PART_MULTI"); // Both new columns should be before partition columns dt and hh @@ -1718,7 +1718,8 @@ public void testAddMultipleColumnsBeforePartition() { "`user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `col1` INT,\n" - + " `col2` DOUBLE,\n" + + " `col2` INT,\n" + + " `col3` DOUBLE,\n" + " `dt` VARCHAR(2147483647),\n" + " `hh` VARCHAR(2147483647)"); } From 6e9b6d2caab5cf4e4e49bb82bbd34796b8358a86 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 16:43:52 +0800 Subject: [PATCH 3/9] test --- .../paimon/flink/SchemaChangeITCase.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 7ea82ea49b26..fbfbfc81e459 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1723,4 +1723,57 @@ public void testAddMultipleColumnsBeforePartition() { + " `dt` VARCHAR(2147483647),\n" + " `hh` VARCHAR(2147483647)"); } + + @Test + public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + sql( + "CREATE TABLE T_PK_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add single column + sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Add multiple columns + sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + + result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Verify data read/write still works + sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PK_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + } } From f243b6115a7093419c6fc39bdc6541607e59b303 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 20:05:42 +0800 Subject: [PATCH 4/9] fix --- .../org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index 3c1e03ee89fd..d6df89e72375 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -838,7 +838,7 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl test("Paimon MergeInto: concurrent two merge") { for (dvEnabled <- Seq("true", "false")) { - withTable("s", "t") { + withTable("s", s"t_$dvEnabled") { sql("CREATE TABLE s (id INT, b INT, c INT)") sql( "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)") From 177abd26f9a534548a457c8365dafd4ee3d46ab8 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 22:22:20 +0800 Subject: [PATCH 5/9] remove --- .../apache/paimon/schema/SchemaManager.java | 38 +- .../paimon/flink/SchemaChangeITCase.java | 395 +++++++++--------- .../spark/sql/MergeIntoTableTestBase.scala | 2 +- 3 files changed, 220 insertions(+), 215 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 81585a1d44c9..95e8b21a38e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -294,12 +294,13 @@ public static TableSchema generateTableSchema( .defaultValue() .toString())); - boolean addColumnBeforePartition = - Boolean.parseBoolean( - oldOptions.getOrDefault( - CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), - CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); - List partitionKeys = oldTableSchema.partitionKeys(); + // boolean addColumnBeforePartition = + // Boolean.parseBoolean( + // oldOptions.getOrDefault( + // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + // + // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + // List partitionKeys = oldTableSchema.partitionKeys(); List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); @@ -376,17 +377,20 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } - } else if (addColumnBeforePartition - && !partitionKeys.isEmpty() - && addColumn.fieldNames().length == 1) { - int insertIndex = newFields.size(); - for (int i = 0; i < newFields.size(); i++) { - if (partitionKeys.contains(newFields.get(i).name())) { - insertIndex = i; - break; - } - } - newFields.add(insertIndex, dataField); + // } else if (addColumnBeforePartition + // && !partitionKeys.isEmpty() + // && addColumn.fieldNames().length == 1) + // { + // int insertIndex = newFields.size(); + // for (int i = 0; i < newFields.size(); i++) + // { + // if + // (partitionKeys.contains(newFields.get(i).name())) { + // insertIndex = i; + // break; + // } + // } + // newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index fbfbfc81e459..7b0552c76470 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1579,201 +1579,202 @@ public void testDisableExplicitTypeCasting(String formatType) { assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } - @Test - public void testAddColumnBeforePartitionEnabled() { - sql( - "CREATE TABLE T_PART (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " behavior STRING,\n" - + " dt STRING,\n" - + " hh STRING\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - - // Add column without specifying position - sql("ALTER TABLE T_PART ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `hh` VARCHAR(2147483647)"); - - sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); - result = sql("SELECT * FROM T_PART"); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), - Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); - } - - @Test - public void testAddColumnBeforePartitionDisabledByDefault() { - sql( - "CREATE TABLE T_PART_DEFAULT (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt)"); - - // Add column without specifying position (default behavior) - sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); - // score should be appended at the end - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `score` DOUBLE"); - } - - @Test - public void testAddColumnBeforePartitionWithExplicitPosition() { - sql( - "CREATE TABLE T_PART_POS (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - // Add column with explicit FIRST position, should respect explicit position - sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); - - List result = sql("SHOW CREATE TABLE T_PART_POS"); - assertThat(result.toString()) - .contains( - "`score` DOUBLE,\n" - + " `user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647)"); - } - - @Test - public void testAddColumnBeforePartitionViaAlterOption() { - sql( - "CREATE TABLE T_PART_ALTER (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt)"); - - // First add column without config (default: append at end) - sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); - List result = sql("SHOW CREATE TABLE T_PART_ALTER"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `col1` INT"); - - // Enable config via ALTER TABLE SET - sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); - - // Now add another column, should go before partition column dt - sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); - result = sql("SHOW CREATE TABLE T_PART_ALTER"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `col2` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `col1` INT"); - } - - @Test - public void testAddMultipleColumnsBeforePartition() { - sql( - "CREATE TABLE T_PART_MULTI (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING,\n" - + " hh STRING\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - // Add first column - sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); - // Add second column - sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); - - List result = sql("SHOW CREATE TABLE T_PART_MULTI"); - // Both new columns should be before partition columns dt and hh - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `col1` INT,\n" - + " `col2` INT,\n" - + " `col3` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `hh` VARCHAR(2147483647)"); - } - - @Test - public void testAddColumnBeforePartitionOnPrimaryKeyTable() { - sql( - "CREATE TABLE T_PK_PART (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " behavior STRING,\n" - + " dt STRING,\n" - + " hh STRING,\n" - + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - - // Add single column - sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PK_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT NOT NULL,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `dt` VARCHAR(2147483647) NOT NULL,\n" - + " `hh` VARCHAR(2147483647) NOT NULL"); - - // Add multiple columns - sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); - - result = sql("SHOW CREATE TABLE T_PK_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT NOT NULL,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `col1` INT,\n" - + " `col2` DOUBLE,\n" - + " `dt` VARCHAR(2147483647) NOT NULL,\n" - + " `hh` VARCHAR(2147483647) NOT NULL"); - - // Verify data read/write still works - sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); - result = sql("SELECT * FROM T_PK_PART"); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), - Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); - } + // @Test + // public void testAddColumnBeforePartitionEnabled() { + // sql( + // "CREATE TABLE T_PART (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " behavior STRING,\n" + // + " dt STRING,\n" + // + " hh STRING\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + // + // // Add column without specifying position + // sql("ALTER TABLE T_PART ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `hh` VARCHAR(2147483647)"); + // + // sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + // result = sql("SELECT * FROM T_PART"); + // assertThat(result) + // .containsExactlyInAnyOrder( + // Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + // Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + // } + // + // @Test + // public void testAddColumnBeforePartitionDisabledByDefault() { + // sql( + // "CREATE TABLE T_PART_DEFAULT (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt)"); + // + // // Add column without specifying position (default behavior) + // sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // // score should be appended at the end + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `score` DOUBLE"); + // } + // + // @Test + // public void testAddColumnBeforePartitionWithExplicitPosition() { + // sql( + // "CREATE TABLE T_PART_POS (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // // Add column with explicit FIRST position, should respect explicit position + // sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + // + // List result = sql("SHOW CREATE TABLE T_PART_POS"); + // assertThat(result.toString()) + // .contains( + // "`score` DOUBLE,\n" + // + " `user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647)"); + // } + // + // @Test + // public void testAddColumnBeforePartitionViaAlterOption() { + // sql( + // "CREATE TABLE T_PART_ALTER (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt)"); + // + // // First add column without config (default: append at end) + // sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + // List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `col1` INT"); + // + // // Enable config via ALTER TABLE SET + // sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + // + // // Now add another column, should go before partition column dt + // sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + // result = sql("SHOW CREATE TABLE T_PART_ALTER"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `col2` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `col1` INT"); + // } + // + // @Test + // public void testAddMultipleColumnsBeforePartition() { + // sql( + // "CREATE TABLE T_PART_MULTI (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING,\n" + // + " hh STRING\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // // Add first column + // sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // // Add second column + // sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); + // + // List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // // Both new columns should be before partition columns dt and hh + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `col1` INT,\n" + // + " `col2` INT,\n" + // + " `col3` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `hh` VARCHAR(2147483647)"); + // } + // + // @Test + // public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + // sql( + // "CREATE TABLE T_PK_PART (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " behavior STRING,\n" + // + " dt STRING,\n" + // + " hh STRING,\n" + // + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + // + // // Add single column + // sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PK_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT NOT NULL,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647) NOT NULL,\n" + // + " `hh` VARCHAR(2147483647) NOT NULL"); + // + // // Add multiple columns + // sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + // + // result = sql("SHOW CREATE TABLE T_PK_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT NOT NULL,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `col1` INT,\n" + // + " `col2` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647) NOT NULL,\n" + // + " `hh` VARCHAR(2147483647) NOT NULL"); + // + // // Verify data read/write still works + // sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', + // '11')"); + // result = sql("SELECT * FROM T_PK_PART"); + // assertThat(result) + // .containsExactlyInAnyOrder( + // Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + // Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + // } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index d6df89e72375..3c1e03ee89fd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -838,7 +838,7 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl test("Paimon MergeInto: concurrent two merge") { for (dvEnabled <- Seq("true", "false")) { - withTable("s", s"t_$dvEnabled") { + withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT, c INT)") sql( "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)") From ec411c75e13c0a60a47b237fa507cc7d4c92776a Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:47:26 +0800 Subject: [PATCH 6/9] fix --- .../apache/paimon/schema/SchemaManager.java | 38 +- .../paimon/flink/SchemaChangeITCase.java | 395 +++++++++--------- 2 files changed, 214 insertions(+), 219 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 95e8b21a38e0..81585a1d44c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -294,13 +294,12 @@ public static TableSchema generateTableSchema( .defaultValue() .toString())); - // boolean addColumnBeforePartition = - // Boolean.parseBoolean( - // oldOptions.getOrDefault( - // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), - // - // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); - // List partitionKeys = oldTableSchema.partitionKeys(); + boolean addColumnBeforePartition = + Boolean.parseBoolean( + oldOptions.getOrDefault( + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + List partitionKeys = oldTableSchema.partitionKeys(); List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); @@ -377,20 +376,17 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } - // } else if (addColumnBeforePartition - // && !partitionKeys.isEmpty() - // && addColumn.fieldNames().length == 1) - // { - // int insertIndex = newFields.size(); - // for (int i = 0; i < newFields.size(); i++) - // { - // if - // (partitionKeys.contains(newFields.get(i).name())) { - // insertIndex = i; - // break; - // } - // } - // newFields.add(insertIndex, dataField); + } else if (addColumnBeforePartition + && !partitionKeys.isEmpty() + && addColumn.fieldNames().length == 1) { + int insertIndex = newFields.size(); + for (int i = 0; i < newFields.size(); i++) { + if (partitionKeys.contains(newFields.get(i).name())) { + insertIndex = i; + break; + } + } + newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 7b0552c76470..fbfbfc81e459 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1579,202 +1579,201 @@ public void testDisableExplicitTypeCasting(String formatType) { assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } - // @Test - // public void testAddColumnBeforePartitionEnabled() { - // sql( - // "CREATE TABLE T_PART (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " behavior STRING,\n" - // + " dt STRING,\n" - // + " hh STRING\n" - // + ") PARTITIONED BY (dt, hh) WITH (\n" - // + " 'add-column-before-partition' = 'true'\n" - // + ")"); - // - // sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - // - // // Add column without specifying position - // sql("ALTER TABLE T_PART ADD score DOUBLE"); - // - // List result = sql("SHOW CREATE TABLE T_PART"); - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `behavior` VARCHAR(2147483647),\n" - // + " `score` DOUBLE,\n" - // + " `dt` VARCHAR(2147483647),\n" - // + " `hh` VARCHAR(2147483647)"); - // - // sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); - // result = sql("SELECT * FROM T_PART"); - // assertThat(result) - // .containsExactlyInAnyOrder( - // Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), - // Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); - // } - // - // @Test - // public void testAddColumnBeforePartitionDisabledByDefault() { - // sql( - // "CREATE TABLE T_PART_DEFAULT (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " dt STRING\n" - // + ") PARTITIONED BY (dt)"); - // - // // Add column without specifying position (default behavior) - // sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); - // - // List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); - // // score should be appended at the end - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `dt` VARCHAR(2147483647),\n" - // + " `score` DOUBLE"); - // } - // - // @Test - // public void testAddColumnBeforePartitionWithExplicitPosition() { - // sql( - // "CREATE TABLE T_PART_POS (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " dt STRING\n" - // + ") PARTITIONED BY (dt) WITH (\n" - // + " 'add-column-before-partition' = 'true'\n" - // + ")"); - // - // // Add column with explicit FIRST position, should respect explicit position - // sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); - // - // List result = sql("SHOW CREATE TABLE T_PART_POS"); - // assertThat(result.toString()) - // .contains( - // "`score` DOUBLE,\n" - // + " `user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `dt` VARCHAR(2147483647)"); - // } - // - // @Test - // public void testAddColumnBeforePartitionViaAlterOption() { - // sql( - // "CREATE TABLE T_PART_ALTER (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " dt STRING\n" - // + ") PARTITIONED BY (dt)"); - // - // // First add column without config (default: append at end) - // sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); - // List result = sql("SHOW CREATE TABLE T_PART_ALTER"); - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `dt` VARCHAR(2147483647),\n" - // + " `col1` INT"); - // - // // Enable config via ALTER TABLE SET - // sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); - // - // // Now add another column, should go before partition column dt - // sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); - // result = sql("SHOW CREATE TABLE T_PART_ALTER"); - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `col2` DOUBLE,\n" - // + " `dt` VARCHAR(2147483647),\n" - // + " `col1` INT"); - // } - // - // @Test - // public void testAddMultipleColumnsBeforePartition() { - // sql( - // "CREATE TABLE T_PART_MULTI (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " dt STRING,\n" - // + " hh STRING\n" - // + ") PARTITIONED BY (dt, hh) WITH (\n" - // + " 'add-column-before-partition' = 'true'\n" - // + ")"); - // - // // Add first column - // sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); - // // Add second column - // sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); - // - // List result = sql("SHOW CREATE TABLE T_PART_MULTI"); - // // Both new columns should be before partition columns dt and hh - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT,\n" - // + " `item_id` BIGINT,\n" - // + " `col1` INT,\n" - // + " `col2` INT,\n" - // + " `col3` DOUBLE,\n" - // + " `dt` VARCHAR(2147483647),\n" - // + " `hh` VARCHAR(2147483647)"); - // } - // - // @Test - // public void testAddColumnBeforePartitionOnPrimaryKeyTable() { - // sql( - // "CREATE TABLE T_PK_PART (\n" - // + " user_id BIGINT,\n" - // + " item_id BIGINT,\n" - // + " behavior STRING,\n" - // + " dt STRING,\n" - // + " hh STRING,\n" - // + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" - // + ") PARTITIONED BY (dt, hh) WITH (\n" - // + " 'add-column-before-partition' = 'true'\n" - // + ")"); - // - // sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - // - // // Add single column - // sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); - // - // List result = sql("SHOW CREATE TABLE T_PK_PART"); - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT NOT NULL,\n" - // + " `item_id` BIGINT,\n" - // + " `behavior` VARCHAR(2147483647),\n" - // + " `score` DOUBLE,\n" - // + " `dt` VARCHAR(2147483647) NOT NULL,\n" - // + " `hh` VARCHAR(2147483647) NOT NULL"); - // - // // Add multiple columns - // sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); - // - // result = sql("SHOW CREATE TABLE T_PK_PART"); - // assertThat(result.toString()) - // .contains( - // "`user_id` BIGINT NOT NULL,\n" - // + " `item_id` BIGINT,\n" - // + " `behavior` VARCHAR(2147483647),\n" - // + " `score` DOUBLE,\n" - // + " `col1` INT,\n" - // + " `col2` DOUBLE,\n" - // + " `dt` VARCHAR(2147483647) NOT NULL,\n" - // + " `hh` VARCHAR(2147483647) NOT NULL"); - // - // // Verify data read/write still works - // sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', - // '11')"); - // result = sql("SELECT * FROM T_PK_PART"); - // assertThat(result) - // .containsExactlyInAnyOrder( - // Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), - // Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); - // } + @Test + public void testAddColumnBeforePartitionEnabled() { + sql( + "CREATE TABLE T_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add column without specifying position + sql("ALTER TABLE T_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + + sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + } + + @Test + public void testAddColumnBeforePartitionDisabledByDefault() { + sql( + "CREATE TABLE T_PART_DEFAULT (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // Add column without specifying position (default behavior) + sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // score should be appended at the end + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `score` DOUBLE"); + } + + @Test + public void testAddColumnBeforePartitionWithExplicitPosition() { + sql( + "CREATE TABLE T_PART_POS (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add column with explicit FIRST position, should respect explicit position + sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + + List result = sql("SHOW CREATE TABLE T_PART_POS"); + assertThat(result.toString()) + .contains( + "`score` DOUBLE,\n" + + " `user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionViaAlterOption() { + sql( + "CREATE TABLE T_PART_ALTER (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // First add column without config (default: append at end) + sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + + // Enable config via ALTER TABLE SET + sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + + // Now add another column, should go before partition column dt + sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + } + + @Test + public void testAddMultipleColumnsBeforePartition() { + sql( + "CREATE TABLE T_PART_MULTI (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add first column + sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // Add second column + sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); + + List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // Both new columns should be before partition columns dt and hh + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col1` INT,\n" + + " `col2` INT,\n" + + " `col3` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + sql( + "CREATE TABLE T_PK_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add single column + sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Add multiple columns + sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + + result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Verify data read/write still works + sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PK_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + } } From ad9aab83d669efefb4cd92cf36a1bfa4c34aad21 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 16:11:00 +0800 Subject: [PATCH 7/9] fix --- .../test/java/org/apache/paimon/flink/SchemaChangeITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index fbfbfc81e459..840dce79422a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1740,7 +1740,7 @@ public void testAddColumnBeforePartitionOnPrimaryKeyTable() { sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - // Add single column + // Add single column sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); List result = sql("SHOW CREATE TABLE T_PK_PART"); From 5433c6f3f84cc4a0c5aef82f0129f55284739615 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 18:27:30 +0800 Subject: [PATCH 8/9] addDoc --- docs/layouts/shortcodes/generated/core_configuration.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 673d988dabf7..e9d66c7a38c1 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,6 +32,12 @@ Boolean Whether to remove the whole row in aggregation engine when -D records are received. + +
add-column-before-partition
+ false + Boolean + If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables. +
alter-column-null-to-not-null.disabled
true From b18a4e7e76292a906f7c11cef0ac2a1d115c2e6c Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 12 Mar 2026 15:02:05 +0800 Subject: [PATCH 9/9] supportPython --- .../java/org/apache/paimon/CoreOptions.java | 4 + .../pypaimon/common/options/__init__.py | 2 + .../pypaimon/common/options/core_options.py | 13 +++ .../pypaimon/schema/schema_manager.py | 29 ++++++- .../pypaimon/tests/filesystem_catalog_test.py | 82 ++++++++++++++++++- 5 files changed, 125 insertions(+), 5 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 7fcd2c11a506..d85cf9ae3757 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2965,6 +2965,10 @@ public boolean disableExplicitTypeCasting() { return options.get(DISABLE_EXPLICIT_TYPE_CASTING); } + public boolean addColumnBeforePartition() { + return options.get(ADD_COLUMN_BEFORE_PARTITION); + } + public boolean indexFileInDataFileDir() { return options.get(INDEX_FILE_IN_DATA_FILE_DIR); } diff --git a/paimon-python/pypaimon/common/options/__init__.py b/paimon-python/pypaimon/common/options/__init__.py index b02d7bb5f495..8760397beb4b 100644 --- a/paimon-python/pypaimon/common/options/__init__.py +++ b/paimon-python/pypaimon/common/options/__init__.py @@ -19,10 +19,12 @@ from .config_option import ConfigOption, Description from .config_options import ConfigOptions from .options import Options +from .core_options import CoreOptions __all__ = [ 'ConfigOption', 'Description', 'ConfigOptions', 'Options', + 'CoreOptions' ] diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 85cf965f91b9..6e6fc4108bdb 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -376,6 +376,16 @@ class CoreOptions: .with_description("Read batch size for any file format if it supports.") ) + ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = ( + ConfigOptions.key("add-column-before-partition") + .boolean_type() + .default_value(False) + .with_description( + "When adding a new column, if the table has partition keys, " + "insert the new column before the first partition column by default." + ) + ) + def __init__(self, options: Options): self.options = options @@ -540,3 +550,6 @@ def global_index_thread_num(self) -> Optional[int]: def read_batch_size(self, default=None) -> int: return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024) + + def add_column_before_partition(self) -> bool: + return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False) diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index c8f007ba0811..20298be7e0c1 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -20,6 +20,7 @@ from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import JSON +from pypaimon.common.options import Options, CoreOptions from pypaimon.schema.data_types import AtomicInteger, DataField from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import ( @@ -181,7 +182,11 @@ def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): def _handle_add_column( - change: AddColumn, new_fields: List[DataField], highest_field_id: AtomicInteger + change: AddColumn, + new_fields: List[DataField], + highest_field_id: AtomicInteger, + partition_keys: List[str], + add_column_before_partition: bool ): if not change.data_type.nullable: raise ValueError( @@ -194,6 +199,17 @@ def _handle_add_column( new_field = DataField(field_id, field_name, change.data_type, change.comment) if change.move: _apply_move(new_fields, new_field, change.move) + elif ( + add_column_before_partition + and partition_keys + and len(change.field_names) == 1 + ): + insert_index = len(new_fields) + for i, field in enumerate(new_fields): + if field.name in partition_keys: + insert_index = i + break + new_fields.insert(insert_index, new_field) else: new_fields.append(new_field) @@ -278,7 +294,7 @@ def commit_changes(self, changes: List[SchemaChange]) -> TableSchema: f"Table schema does not exist at path: {self.table_path}. " "This may happen if the table was deleted concurrently." ) - + new_table_schema = self._generate_table_schema(old_table_schema, changes) try: success = self.commit(new_table_schema) @@ -306,6 +322,10 @@ def _generate_table_schema( highest_field_id = AtomicInteger(old_table_schema.highest_field_id) new_comment = old_table_schema.comment + # Get add_column_before_partition option + add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition() + partition_keys = old_table_schema.partition_keys + for change in changes: if isinstance(change, SetOption): new_options[change.key] = change.value @@ -314,7 +334,10 @@ def _generate_table_schema( elif isinstance(change, UpdateComment): new_comment = change.comment elif isinstance(change, AddColumn): - _handle_add_column(change, new_fields, highest_field_id) + _handle_add_column( + change, new_fields, highest_field_id, + partition_keys, add_column_before_partition + ) elif isinstance(change, RenameColumn): _assert_not_updating_partition_keys( old_table_schema, change.field_names, "rename" diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index fe2c19f18ac9..c778c2708242 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -172,6 +172,84 @@ def test_alter_table(self): table = catalog.get_table(identifier) self.assertEqual(len(table.fields), 2) + def test_add_column_before_partition(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + identifier = "test_db.test_table" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "col1", "type": "STRING", "description": "field1"}), + DataField.from_dict( + {"id": 1, "name": "partition_col", "type": "STRING", "description": "partition field"}) + ], + partition_keys=["partition_col"], + primary_keys=[], + options={}, + comment="comment" + ) + catalog.create_table(identifier, schema, False) + + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 2) + self.assertEqual(table.fields[1].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("add-column-before-partition", "true")], + False + ) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("new_col", AtomicType("INT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 3) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.add_column("col_multi1", AtomicType("INT")), + SchemaChange.add_column("col_multi2", AtomicType("STRING")), + SchemaChange.add_column("col_multi3", AtomicType("DOUBLE"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 6) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "col_multi1") + self.assertEqual(table.fields[3].name, "col_multi2") + self.assertEqual(table.fields[4].name, "col_multi3") + self.assertEqual(table.fields[5].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("add-column-before-partition", "false")], + False + ) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("another_col", AtomicType("BIGINT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 7) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "col_multi1") + self.assertEqual(table.fields[3].name, "col_multi2") + self.assertEqual(table.fields[4].name, "col_multi3") + self.assertEqual(table.fields[5].name, "partition_col") + self.assertEqual(table.fields[6].name, "another_col") + def test_get_database_propagates_exists_error(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse @@ -186,7 +264,7 @@ def test_get_database_propagates_exists_error(self): from pypaimon.catalog.filesystem_catalog import FileSystemCatalog self.assertIsInstance(catalog, FileSystemCatalog) filesystem_catalog = catalog # type: FileSystemCatalog - + original_exists = filesystem_catalog.file_io.exists filesystem_catalog.file_io.exists = MagicMock(side_effect=OSError("Permission denied")) @@ -195,6 +273,6 @@ def test_get_database_propagates_exists_error(self): catalog.get_database("test_db") self.assertIn("Permission denied", str(context.exception)) self.assertNotIsInstance(context.exception, DatabaseNotExistException) - + # Restore original method filesystem_catalog.file_io.exists = original_exists