diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 673d988dabf7..e9d66c7a38c1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,6 +32,12 @@
Boolean |
Whether to remove the whole row in aggregation engine when -D records are received. |
+
+ add-column-before-partition |
+ false |
+ Boolean |
+ If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables. |
+
alter-column-null-to-not-null.disabled |
true |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5934b65dbfb..d85cf9ae3757 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2040,6 +2040,16 @@ public InlineElement getDescription() {
"If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. "
+ "Users can enable this option to disable explicit type casting");
+ public static final ConfigOption ADD_COLUMN_BEFORE_PARTITION =
+ ConfigOptions.key("add-column-before-partition")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, when adding a new column without specifying a position, "
+ + "the column will be placed before the first partition column "
+ + "instead of at the end of the schema. "
+ + "This only takes effect for partitioned tables.");
+
public static final ConfigOption COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT =
ConfigOptions.key("commit.strict-mode.last-safe-snapshot")
.longType()
@@ -2955,6 +2965,10 @@ public boolean disableExplicitTypeCasting() {
return options.get(DISABLE_EXPLICIT_TYPE_CASTING);
}
+ public boolean addColumnBeforePartition() {
+ return options.get(ADD_COLUMN_BEFORE_PARTITION);
+ }
+
public boolean indexFileInDataFileDir() {
return options.get(INDEX_FILE_IN_DATA_FILE_DIR);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index e726b803268d..81585a1d44c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -293,6 +293,14 @@ public static TableSchema generateTableSchema(
CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING
.defaultValue()
.toString()));
+
+ boolean addColumnBeforePartition =
+ Boolean.parseBoolean(
+ oldOptions.getOrDefault(
+ CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(),
+ CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString()));
+ List partitionKeys = oldTableSchema.partitionKeys();
+
List newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
@@ -368,6 +376,17 @@ protected void updateLastColumn(
throw new UnsupportedOperationException(
"Unsupported move type: " + move.type());
}
+ } else if (addColumnBeforePartition
+ && !partitionKeys.isEmpty()
+ && addColumn.fieldNames().length == 1) {
+ int insertIndex = newFields.size();
+ for (int i = 0; i < newFields.size(); i++) {
+ if (partitionKeys.contains(newFields.get(i).name())) {
+ insertIndex = i;
+ break;
+ }
+ }
+ newFields.add(insertIndex, dataField);
} else {
newFields.add(dataField);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 9084b55d60e5..840dce79422a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1578,4 +1578,202 @@ public void testDisableExplicitTypeCasting(String formatType) {
sql("ALTER TABLE T MODIFY v INT");
assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20));
}
+
+ @Test
+ public void testAddColumnBeforePartitionEnabled() {
+ sql(
+ "CREATE TABLE T_PART (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING\n"
+ + ") PARTITIONED BY (dt, hh) WITH (\n"
+ + " 'add-column-before-partition' = 'true'\n"
+ + ")");
+
+ sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");
+
+ // Add column without specifying position
+ sql("ALTER TABLE T_PART ADD score DOUBLE");
+
+ List result = sql("SHOW CREATE TABLE T_PART");
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `behavior` VARCHAR(2147483647),\n"
+ + " `score` DOUBLE,\n"
+ + " `dt` VARCHAR(2147483647),\n"
+ + " `hh` VARCHAR(2147483647)");
+
+ sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')");
+ result = sql("SELECT * FROM T_PART");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1L, 100L, "buy", null, "2024-01-01", "10"),
+ Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11"));
+ }
+
+ @Test
+ public void testAddColumnBeforePartitionDisabledByDefault() {
+ sql(
+ "CREATE TABLE T_PART_DEFAULT (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " dt STRING\n"
+ + ") PARTITIONED BY (dt)");
+
+ // Add column without specifying position (default behavior)
+ sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE");
+
+ List result = sql("SHOW CREATE TABLE T_PART_DEFAULT");
+ // score should be appended at the end
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `dt` VARCHAR(2147483647),\n"
+ + " `score` DOUBLE");
+ }
+
+ @Test
+ public void testAddColumnBeforePartitionWithExplicitPosition() {
+ sql(
+ "CREATE TABLE T_PART_POS (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " dt STRING\n"
+ + ") PARTITIONED BY (dt) WITH (\n"
+ + " 'add-column-before-partition' = 'true'\n"
+ + ")");
+
+ // Add column with explicit FIRST position, should respect explicit position
+ sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST");
+
+ List result = sql("SHOW CREATE TABLE T_PART_POS");
+ assertThat(result.toString())
+ .contains(
+ "`score` DOUBLE,\n"
+ + " `user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `dt` VARCHAR(2147483647)");
+ }
+
+ @Test
+ public void testAddColumnBeforePartitionViaAlterOption() {
+ sql(
+ "CREATE TABLE T_PART_ALTER (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " dt STRING\n"
+ + ") PARTITIONED BY (dt)");
+
+ // First add column without config (default: append at end)
+ sql("ALTER TABLE T_PART_ALTER ADD col1 INT");
+ List result = sql("SHOW CREATE TABLE T_PART_ALTER");
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `dt` VARCHAR(2147483647),\n"
+ + " `col1` INT");
+
+ // Enable config via ALTER TABLE SET
+ sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')");
+
+ // Now add another column, should go before partition column dt
+ sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE");
+ result = sql("SHOW CREATE TABLE T_PART_ALTER");
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `col2` DOUBLE,\n"
+ + " `dt` VARCHAR(2147483647),\n"
+ + " `col1` INT");
+ }
+
+ @Test
+ public void testAddMultipleColumnsBeforePartition() {
+ sql(
+ "CREATE TABLE T_PART_MULTI (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " dt STRING,\n"
+ + " hh STRING\n"
+ + ") PARTITIONED BY (dt, hh) WITH (\n"
+ + " 'add-column-before-partition' = 'true'\n"
+ + ")");
+
+ // Add first column
+ sql("ALTER TABLE T_PART_MULTI ADD col1 INT");
+ // Add second column
+ sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )");
+
+ List result = sql("SHOW CREATE TABLE T_PART_MULTI");
+ // Both new columns should be before partition columns dt and hh
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT,\n"
+ + " `item_id` BIGINT,\n"
+ + " `col1` INT,\n"
+ + " `col2` INT,\n"
+ + " `col3` DOUBLE,\n"
+ + " `dt` VARCHAR(2147483647),\n"
+ + " `hh` VARCHAR(2147483647)");
+ }
+
+ @Test
+ public void testAddColumnBeforePartitionOnPrimaryKeyTable() {
+ sql(
+ "CREATE TABLE T_PK_PART (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt, hh) WITH (\n"
+ + " 'add-column-before-partition' = 'true'\n"
+ + ")");
+
+ sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");
+
+ // Add single column
+ sql("ALTER TABLE T_PK_PART ADD score DOUBLE");
+
+ List result = sql("SHOW CREATE TABLE T_PK_PART");
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT NOT NULL,\n"
+ + " `item_id` BIGINT,\n"
+ + " `behavior` VARCHAR(2147483647),\n"
+ + " `score` DOUBLE,\n"
+ + " `dt` VARCHAR(2147483647) NOT NULL,\n"
+ + " `hh` VARCHAR(2147483647) NOT NULL");
+
+ // Add multiple columns
+ sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )");
+
+ result = sql("SHOW CREATE TABLE T_PK_PART");
+ assertThat(result.toString())
+ .contains(
+ "`user_id` BIGINT NOT NULL,\n"
+ + " `item_id` BIGINT,\n"
+ + " `behavior` VARCHAR(2147483647),\n"
+ + " `score` DOUBLE,\n"
+ + " `col1` INT,\n"
+ + " `col2` DOUBLE,\n"
+ + " `dt` VARCHAR(2147483647) NOT NULL,\n"
+ + " `hh` VARCHAR(2147483647) NOT NULL");
+
+ // Verify data read/write still works
+ sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')");
+ result = sql("SELECT * FROM T_PK_PART");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"),
+ Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11"));
+ }
}
diff --git a/paimon-python/pypaimon/common/options/__init__.py b/paimon-python/pypaimon/common/options/__init__.py
index b02d7bb5f495..8760397beb4b 100644
--- a/paimon-python/pypaimon/common/options/__init__.py
+++ b/paimon-python/pypaimon/common/options/__init__.py
@@ -19,10 +19,12 @@
from .config_option import ConfigOption, Description
from .config_options import ConfigOptions
from .options import Options
+from .core_options import CoreOptions
__all__ = [
'ConfigOption',
'Description',
'ConfigOptions',
'Options',
+ 'CoreOptions'
]
diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py
index 85cf965f91b9..6e6fc4108bdb 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -376,6 +376,16 @@ class CoreOptions:
.with_description("Read batch size for any file format if it supports.")
)
+ ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = (
+ ConfigOptions.key("add-column-before-partition")
+ .boolean_type()
+ .default_value(False)
+ .with_description(
+ "When adding a new column, if the table has partition keys, "
+ "insert the new column before the first partition column by default."
+ )
+ )
+
def __init__(self, options: Options):
self.options = options
@@ -540,3 +550,6 @@ def global_index_thread_num(self) -> Optional[int]:
def read_batch_size(self, default=None) -> int:
return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
+
+ def add_column_before_partition(self) -> bool:
+ return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py
index c8f007ba0811..20298be7e0c1 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -20,6 +20,7 @@
from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException
from pypaimon.common.file_io import FileIO
from pypaimon.common.json_util import JSON
+from pypaimon.common.options import Options, CoreOptions
from pypaimon.schema.data_types import AtomicInteger, DataField
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import (
@@ -181,7 +182,11 @@ def _apply_move(fields: List[DataField], new_field: Optional[DataField], move):
def _handle_add_column(
- change: AddColumn, new_fields: List[DataField], highest_field_id: AtomicInteger
+ change: AddColumn,
+ new_fields: List[DataField],
+ highest_field_id: AtomicInteger,
+ partition_keys: List[str],
+ add_column_before_partition: bool
):
if not change.data_type.nullable:
raise ValueError(
@@ -194,6 +199,17 @@ def _handle_add_column(
new_field = DataField(field_id, field_name, change.data_type, change.comment)
if change.move:
_apply_move(new_fields, new_field, change.move)
+ elif (
+ add_column_before_partition
+ and partition_keys
+ and len(change.field_names) == 1
+ ):
+ insert_index = len(new_fields)
+ for i, field in enumerate(new_fields):
+ if field.name in partition_keys:
+ insert_index = i
+ break
+ new_fields.insert(insert_index, new_field)
else:
new_fields.append(new_field)
@@ -278,7 +294,7 @@ def commit_changes(self, changes: List[SchemaChange]) -> TableSchema:
f"Table schema does not exist at path: {self.table_path}. "
"This may happen if the table was deleted concurrently."
)
-
+
new_table_schema = self._generate_table_schema(old_table_schema, changes)
try:
success = self.commit(new_table_schema)
@@ -306,6 +322,10 @@ def _generate_table_schema(
highest_field_id = AtomicInteger(old_table_schema.highest_field_id)
new_comment = old_table_schema.comment
+ # Get add_column_before_partition option
+ add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition()
+ partition_keys = old_table_schema.partition_keys
+
for change in changes:
if isinstance(change, SetOption):
new_options[change.key] = change.value
@@ -314,7 +334,10 @@ def _generate_table_schema(
elif isinstance(change, UpdateComment):
new_comment = change.comment
elif isinstance(change, AddColumn):
- _handle_add_column(change, new_fields, highest_field_id)
+ _handle_add_column(
+ change, new_fields, highest_field_id,
+ partition_keys, add_column_before_partition
+ )
elif isinstance(change, RenameColumn):
_assert_not_updating_partition_keys(
old_table_schema, change.field_names, "rename"
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index fe2c19f18ac9..c778c2708242 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -172,6 +172,84 @@ def test_alter_table(self):
table = catalog.get_table(identifier)
self.assertEqual(len(table.fields), 2)
+ def test_add_column_before_partition(self):
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db", False)
+
+ identifier = "test_db.test_table"
+ schema = Schema(
+ fields=[
+ DataField.from_dict({"id": 0, "name": "col1", "type": "STRING", "description": "field1"}),
+ DataField.from_dict(
+ {"id": 1, "name": "partition_col", "type": "STRING", "description": "partition field"})
+ ],
+ partition_keys=["partition_col"],
+ primary_keys=[],
+ options={},
+ comment="comment"
+ )
+ catalog.create_table(identifier, schema, False)
+
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 2)
+ self.assertEqual(table.fields[1].name, "partition_col")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.set_option("add-column-before-partition", "true")],
+ False
+ )
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.add_column("new_col", AtomicType("INT"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 3)
+ self.assertEqual(table.fields[0].name, "col1")
+ self.assertEqual(table.fields[1].name, "new_col")
+ self.assertEqual(table.fields[2].name, "partition_col")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.add_column("col_multi1", AtomicType("INT")),
+ SchemaChange.add_column("col_multi2", AtomicType("STRING")),
+ SchemaChange.add_column("col_multi3", AtomicType("DOUBLE"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 6)
+ self.assertEqual(table.fields[0].name, "col1")
+ self.assertEqual(table.fields[1].name, "new_col")
+ self.assertEqual(table.fields[2].name, "col_multi1")
+ self.assertEqual(table.fields[3].name, "col_multi2")
+ self.assertEqual(table.fields[4].name, "col_multi3")
+ self.assertEqual(table.fields[5].name, "partition_col")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.set_option("add-column-before-partition", "false")],
+ False
+ )
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.add_column("another_col", AtomicType("BIGINT"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 7)
+ self.assertEqual(table.fields[0].name, "col1")
+ self.assertEqual(table.fields[1].name, "new_col")
+ self.assertEqual(table.fields[2].name, "col_multi1")
+ self.assertEqual(table.fields[3].name, "col_multi2")
+ self.assertEqual(table.fields[4].name, "col_multi3")
+ self.assertEqual(table.fields[5].name, "partition_col")
+ self.assertEqual(table.fields[6].name, "another_col")
+
def test_get_database_propagates_exists_error(self):
catalog = CatalogFactory.create({
"warehouse": self.warehouse
@@ -186,7 +264,7 @@ def test_get_database_propagates_exists_error(self):
from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
self.assertIsInstance(catalog, FileSystemCatalog)
filesystem_catalog = catalog # type: FileSystemCatalog
-
+
original_exists = filesystem_catalog.file_io.exists
filesystem_catalog.file_io.exists = MagicMock(side_effect=OSError("Permission denied"))
@@ -195,6 +273,6 @@ def test_get_database_propagates_exists_error(self):
catalog.get_database("test_db")
self.assertIn("Permission denied", str(context.exception))
self.assertNotIsInstance(context.exception, DatabaseNotExistException)
-
+
# Restore original method
filesystem_catalog.file_io.exists = original_exists