diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 673d988dabf7..e9d66c7a38c1 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,6 +32,12 @@ Boolean Whether to remove the whole row in aggregation engine when -D records are received. + +
add-column-before-partition
+ false + Boolean + If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables. +
alter-column-null-to-not-null.disabled
true diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5934b65dbfb..d85cf9ae3757 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2040,6 +2040,16 @@ public InlineElement getDescription() { "If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. " + "Users can enable this option to disable explicit type casting"); + public static final ConfigOption ADD_COLUMN_BEFORE_PARTITION = + ConfigOptions.key("add-column-before-partition") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, when adding a new column without specifying a position, " + + "the column will be placed before the first partition column " + + "instead of at the end of the schema. " + + "This only takes effect for partitioned tables."); + public static final ConfigOption COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT = ConfigOptions.key("commit.strict-mode.last-safe-snapshot") .longType() @@ -2955,6 +2965,10 @@ public boolean disableExplicitTypeCasting() { return options.get(DISABLE_EXPLICIT_TYPE_CASTING); } + public boolean addColumnBeforePartition() { + return options.get(ADD_COLUMN_BEFORE_PARTITION); + } + public boolean indexFileInDataFileDir() { return options.get(INDEX_FILE_IN_DATA_FILE_DIR); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index e726b803268d..81585a1d44c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -293,6 +293,14 @@ public static TableSchema generateTableSchema( CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING .defaultValue() .toString())); + + boolean addColumnBeforePartition = + Boolean.parseBoolean( + oldOptions.getOrDefault( + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + List partitionKeys = oldTableSchema.partitionKeys(); + List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); @@ -368,6 +376,17 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } + } else if (addColumnBeforePartition + && !partitionKeys.isEmpty() + && addColumn.fieldNames().length == 1) { + int insertIndex = newFields.size(); + for (int i = 0; i < newFields.size(); i++) { + if (partitionKeys.contains(newFields.get(i).name())) { + insertIndex = i; + break; + } + } + newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 9084b55d60e5..840dce79422a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1578,4 +1578,202 @@ public void testDisableExplicitTypeCasting(String formatType) { sql("ALTER TABLE T MODIFY v INT"); assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } + + @Test + public void testAddColumnBeforePartitionEnabled() { + sql( + "CREATE TABLE T_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add column without specifying position + sql("ALTER TABLE T_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + + sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + } + + @Test + public void testAddColumnBeforePartitionDisabledByDefault() { + sql( + "CREATE TABLE T_PART_DEFAULT (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // Add column without specifying position (default behavior) + sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // score should be appended at the end + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `score` DOUBLE"); + } + + @Test + public void testAddColumnBeforePartitionWithExplicitPosition() { + sql( + "CREATE TABLE T_PART_POS (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add column with explicit FIRST position, should respect explicit position + sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + + List result = sql("SHOW CREATE TABLE T_PART_POS"); + assertThat(result.toString()) + .contains( + "`score` DOUBLE,\n" + + " `user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionViaAlterOption() { + sql( + "CREATE TABLE T_PART_ALTER (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // First add column without config (default: append at end) + sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + + // Enable config via ALTER TABLE SET + sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + + // Now add another column, should go before partition column dt + sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + } + + @Test + public void testAddMultipleColumnsBeforePartition() { + sql( + "CREATE TABLE T_PART_MULTI (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add first column + sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // Add second column + sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); + + List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // Both new columns should be before partition columns dt and hh + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col1` INT,\n" + + " `col2` INT,\n" + + " `col3` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + sql( + "CREATE TABLE T_PK_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add single column + sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Add multiple columns + sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + + result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Verify data read/write still works + sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PK_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + } } diff --git a/paimon-python/pypaimon/common/options/__init__.py b/paimon-python/pypaimon/common/options/__init__.py index b02d7bb5f495..8760397beb4b 100644 --- a/paimon-python/pypaimon/common/options/__init__.py +++ b/paimon-python/pypaimon/common/options/__init__.py @@ -19,10 +19,12 @@ from .config_option import ConfigOption, Description from .config_options import ConfigOptions from .options import Options +from .core_options import CoreOptions __all__ = [ 'ConfigOption', 'Description', 'ConfigOptions', 'Options', + 'CoreOptions' ] diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 85cf965f91b9..6e6fc4108bdb 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -376,6 +376,16 @@ class CoreOptions: .with_description("Read batch size for any file format if it supports.") ) + ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = ( + ConfigOptions.key("add-column-before-partition") + .boolean_type() + .default_value(False) + .with_description( + "When adding a new column, if the table has partition keys, " + "insert the new column before the first partition column by default." + ) + ) + def __init__(self, options: Options): self.options = options @@ -540,3 +550,6 @@ def global_index_thread_num(self) -> Optional[int]: def read_batch_size(self, default=None) -> int: return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024) + + def add_column_before_partition(self) -> bool: + return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False) diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index c8f007ba0811..20298be7e0c1 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -20,6 +20,7 @@ from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import JSON +from pypaimon.common.options import Options, CoreOptions from pypaimon.schema.data_types import AtomicInteger, DataField from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import ( @@ -181,7 +182,11 @@ def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): def _handle_add_column( - change: AddColumn, new_fields: List[DataField], highest_field_id: AtomicInteger + change: AddColumn, + new_fields: List[DataField], + highest_field_id: AtomicInteger, + partition_keys: List[str], + add_column_before_partition: bool ): if not change.data_type.nullable: raise ValueError( @@ -194,6 +199,17 @@ def _handle_add_column( new_field = DataField(field_id, field_name, change.data_type, change.comment) if change.move: _apply_move(new_fields, new_field, change.move) + elif ( + add_column_before_partition + and partition_keys + and len(change.field_names) == 1 + ): + insert_index = len(new_fields) + for i, field in enumerate(new_fields): + if field.name in partition_keys: + insert_index = i + break + new_fields.insert(insert_index, new_field) else: new_fields.append(new_field) @@ -278,7 +294,7 @@ def commit_changes(self, changes: List[SchemaChange]) -> TableSchema: f"Table schema does not exist at path: {self.table_path}. " "This may happen if the table was deleted concurrently." ) - + new_table_schema = self._generate_table_schema(old_table_schema, changes) try: success = self.commit(new_table_schema) @@ -306,6 +322,10 @@ def _generate_table_schema( highest_field_id = AtomicInteger(old_table_schema.highest_field_id) new_comment = old_table_schema.comment + # Get add_column_before_partition option + add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition() + partition_keys = old_table_schema.partition_keys + for change in changes: if isinstance(change, SetOption): new_options[change.key] = change.value @@ -314,7 +334,10 @@ def _generate_table_schema( elif isinstance(change, UpdateComment): new_comment = change.comment elif isinstance(change, AddColumn): - _handle_add_column(change, new_fields, highest_field_id) + _handle_add_column( + change, new_fields, highest_field_id, + partition_keys, add_column_before_partition + ) elif isinstance(change, RenameColumn): _assert_not_updating_partition_keys( old_table_schema, change.field_names, "rename" diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index fe2c19f18ac9..c778c2708242 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -172,6 +172,84 @@ def test_alter_table(self): table = catalog.get_table(identifier) self.assertEqual(len(table.fields), 2) + def test_add_column_before_partition(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + identifier = "test_db.test_table" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "col1", "type": "STRING", "description": "field1"}), + DataField.from_dict( + {"id": 1, "name": "partition_col", "type": "STRING", "description": "partition field"}) + ], + partition_keys=["partition_col"], + primary_keys=[], + options={}, + comment="comment" + ) + catalog.create_table(identifier, schema, False) + + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 2) + self.assertEqual(table.fields[1].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("add-column-before-partition", "true")], + False + ) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("new_col", AtomicType("INT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 3) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.add_column("col_multi1", AtomicType("INT")), + SchemaChange.add_column("col_multi2", AtomicType("STRING")), + SchemaChange.add_column("col_multi3", AtomicType("DOUBLE"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 6) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "col_multi1") + self.assertEqual(table.fields[3].name, "col_multi2") + self.assertEqual(table.fields[4].name, "col_multi3") + self.assertEqual(table.fields[5].name, "partition_col") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("add-column-before-partition", "false")], + False + ) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("another_col", AtomicType("BIGINT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 7) + self.assertEqual(table.fields[0].name, "col1") + self.assertEqual(table.fields[1].name, "new_col") + self.assertEqual(table.fields[2].name, "col_multi1") + self.assertEqual(table.fields[3].name, "col_multi2") + self.assertEqual(table.fields[4].name, "col_multi3") + self.assertEqual(table.fields[5].name, "partition_col") + self.assertEqual(table.fields[6].name, "another_col") + def test_get_database_propagates_exists_error(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse @@ -186,7 +264,7 @@ def test_get_database_propagates_exists_error(self): from pypaimon.catalog.filesystem_catalog import FileSystemCatalog self.assertIsInstance(catalog, FileSystemCatalog) filesystem_catalog = catalog # type: FileSystemCatalog - + original_exists = filesystem_catalog.file_io.exists filesystem_catalog.file_io.exists = MagicMock(side_effect=OSError("Permission denied")) @@ -195,6 +273,6 @@ def test_get_database_propagates_exists_error(self): catalog.get_database("test_db") self.assertIn("Permission denied", str(context.exception)) self.assertNotIsInstance(context.exception, DatabaseNotExistException) - + # Restore original method filesystem_catalog.file_io.exists = original_exists