|
105 | 105 | import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
106 | 106 | import org.apache.iceberg.transforms.Transform; |
107 | 107 | import org.apache.iceberg.types.Conversions; |
| 108 | +import org.apache.iceberg.types.Type; |
108 | 109 | import org.apache.iceberg.types.Types; |
109 | 110 | import org.apache.iceberg.util.ByteBuffers; |
110 | 111 | import org.apache.iceberg.util.Pair; |
@@ -440,64 +441,113 @@ public static PartitionData toPartitionData(StructLike key, Types.StructType key |
440 | 441 |
|
441 | 442 | public static PartitionData toPartitionData(StructLike sourceKey, Types.StructType sourceKeyType, |
442 | 443 | Types.StructType targetKeyType) { |
443 | | - StructProjection projection = StructProjection.create(sourceKeyType, targetKeyType).wrap(sourceKey); |
| 444 | + StructProjection projection = StructProjection.create(sourceKeyType, targetKeyType) |
| 445 | + .wrap(sourceKey); |
444 | 446 | return toPartitionData(projection, targetKeyType); |
445 | 447 | } |
446 | 448 |
|
447 | | - public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec, |
| 449 | + public static Expression generateExprFromIdentityPartitionSpec(Table table, Map<String, String> partitionSpec, |
448 | 450 | boolean latestSpecOnly) throws SemanticException { |
449 | 451 |
|
450 | | - return generateExpressionFromPartitionSpec( |
451 | | - table, partitionSpec, partitionFieldsBySourceColumn(table, latestSpecOnly)); |
| 452 | + Map<String, PartitionField> partitionFields = getPartitionFields(table, latestSpecOnly).stream() |
| 453 | + .collect(Collectors.toMap(PartitionField::name, Function.identity())); |
| 454 | + |
| 455 | + return generateExprFromIdentityPartitionSpec(table, partitionSpec, partitionFields); |
| 456 | + } |
| 457 | + |
| 458 | + public static Expression generateExprFromIdentityPartitionSpec(Table table, Map<String, String> partitionSpec, |
| 459 | + Map<String, PartitionField> partitionFields) throws SemanticException { |
| 460 | + |
| 461 | + return buildPartitionExpression( |
| 462 | + partitionSpec, |
| 463 | + (column, value) -> |
| 464 | + buildIdentityPartitionPredicate(table, column, value, partitionFields.get(column)), |
| 465 | + partitionFields::containsKey |
| 466 | + ); |
452 | 467 | } |
453 | 468 |
|
454 | | - public static Map<String, List<PartitionField>> partitionFieldsBySourceColumn(Table table, boolean latestSpecOnly) { |
| 469 | + public static Expression generateExprFromPartitionSpec(Table table, Map<String, String> partitionSpec, |
| 470 | + boolean latestSpecOnly) throws SemanticException { |
| 471 | + |
455 | 472 | // Group partition fields by source column name to handle partition evolution |
456 | 473 | // where the same source column may have multiple transforms across different specs |
457 | | - return getPartitionFields(table, latestSpecOnly).stream() |
458 | | - .collect(Collectors.groupingBy( |
459 | | - partitionField -> table.schema().findColumnName(partitionField.sourceId())) |
460 | | - ); |
| 474 | + Map<String, List<PartitionField>> partitionFieldsBySourceColumn = |
| 475 | + getPartitionFields(table, latestSpecOnly).stream() |
| 476 | + .collect(Collectors.groupingBy( |
| 477 | + pf -> table.schema().findColumnName(pf.sourceId())) |
| 478 | + ); |
| 479 | + |
| 480 | + return buildPartitionExpression( |
| 481 | + partitionSpec, |
| 482 | + (column, value) -> |
| 483 | + buildTransformPartitionPredicate(table, value, partitionFieldsBySourceColumn.get(column)), |
| 484 | + partitionFieldsBySourceColumn::containsKey |
| 485 | + ); |
| 486 | + } |
| 487 | + |
| 488 | + @FunctionalInterface |
| 489 | + private interface PartitionPredicateBuilder { |
| 490 | + Expression build(String partitionColumn, String partitionValue) throws SemanticException; |
461 | 491 | } |
462 | 492 |
|
463 | | - public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec, |
464 | | - Map<String, List<PartitionField>> partitionFieldsBySourceColumn) throws SemanticException { |
| 493 | + private static Expression buildPartitionExpression( |
| 494 | + Map<String, String> partitionSpec, |
| 495 | + PartitionPredicateBuilder predicateBuilder, |
| 496 | + Predicate<String> fieldValidator) throws SemanticException { |
465 | 497 |
|
466 | 498 | Expression predicate = Expressions.alwaysTrue(); |
467 | 499 |
|
468 | 500 | for (Map.Entry<String, String> entry : partitionSpec.entrySet()) { |
469 | 501 | String partitionColumn = entry.getKey(); |
470 | | - List<PartitionField> partitionFields = partitionFieldsBySourceColumn.get(partitionColumn); |
471 | 502 |
|
472 | | - if (partitionFields == null) { |
| 503 | + // Validate field exists |
| 504 | + if (!fieldValidator.test(partitionColumn)) { |
473 | 505 | throw new SemanticException(String.format("No partition column by the name: %s", partitionColumn)); |
474 | 506 | } |
| 507 | + Expression columnPredicate = predicateBuilder.build(partitionColumn, entry.getValue()); |
| 508 | + predicate = Expressions.and(predicate, columnPredicate); |
| 509 | + } |
475 | 510 |
|
476 | | - // When there are multiple partition fields for the same source column (due to partition evolution), |
477 | | - // create an OR expression that matches any of the transforms |
478 | | - Types.NestedField sourceField = table.schema().findField( |
479 | | - partitionFields.getFirst().sourceId()); |
480 | | - Object sourceValue = Conversions.fromPartitionString(sourceField.type(), entry.getValue()); |
| 511 | + return predicate; |
| 512 | + } |
481 | 513 |
|
482 | | - Expression columnPredicate = Expressions.alwaysFalse(); |
| 514 | + private static Expression buildIdentityPartitionPredicate(Table table, String partitionColumn, |
| 515 | + String partitionValue, PartitionField partitionField) throws SemanticException { |
483 | 516 |
|
484 | | - for (PartitionField partitionField : partitionFields) { |
485 | | - // Apply the transform to the source value |
486 | | - @SuppressWarnings("unchecked") |
487 | | - Transform<Object, Object> transform = (Transform<Object, Object>) partitionField.transform(); |
488 | | - Object transformedValue = transform.bind(sourceField.type()).apply(sourceValue); |
| 517 | + if (!partitionField.transform().isIdentity()) { |
| 518 | + throw new SemanticException(String.format("Partition transforms are not supported here: %s", partitionColumn)); |
| 519 | + } |
| 520 | + Type columnType = table.schema().findField(partitionField.sourceId()).type(); |
| 521 | + Object columnValue = Conversions.fromPartitionString(columnType, partitionValue); |
489 | 522 |
|
490 | | - TransformSpec transformSpec = TransformSpec.fromString(transform.toString().toUpperCase(), sourceField.name()); |
491 | | - UnboundTerm<Object> term = SchemaUtils.toTerm(transformSpec); |
| 523 | + return Expressions.equal(partitionColumn, columnValue); |
| 524 | + } |
492 | 525 |
|
493 | | - columnPredicate = Expressions.or( |
494 | | - columnPredicate, Expressions.equal(term, transformedValue)); |
495 | | - } |
| 526 | + private static Expression buildTransformPartitionPredicate(Table table, String partitionValue, |
| 527 | + List<PartitionField> partitionFields) { |
496 | 528 |
|
497 | | - predicate = Expressions.and(predicate, columnPredicate); |
| 529 | + // Get source field type from first partition field (all share same source) |
| 530 | + Types.NestedField sourceField = table.schema().findField( |
| 531 | + partitionFields.getFirst().sourceId()); |
| 532 | + Object sourceValue = Conversions.fromPartitionString(sourceField.type(), partitionValue); |
| 533 | + |
| 534 | + Expression columnPredicate = Expressions.alwaysFalse(); |
| 535 | + |
| 536 | + // Create OR expression for each transform on this source column |
| 537 | + for (PartitionField partitionField : partitionFields) { |
| 538 | + // Apply the transform to the source value |
| 539 | + @SuppressWarnings("unchecked") |
| 540 | + Transform<Object, Object> transform = (Transform<Object, Object>) partitionField.transform(); |
| 541 | + Object transformedValue = transform.bind(sourceField.type()).apply(sourceValue); |
| 542 | + |
| 543 | + TransformSpec transformSpec = TransformSpec.fromString(transform.toString().toUpperCase(), sourceField.name()); |
| 544 | + UnboundTerm<Object> term = SchemaUtils.toTerm(transformSpec); |
| 545 | + |
| 546 | + columnPredicate = Expressions.or( |
| 547 | + columnPredicate, Expressions.equal(term, transformedValue)); |
498 | 548 | } |
499 | 549 |
|
500 | | - return predicate; |
| 550 | + return columnPredicate; |
501 | 551 | } |
502 | 552 |
|
503 | 553 | public static List<PartitionField> getPartitionFields(Table table, boolean latestSpecOnly) { |
@@ -575,7 +625,7 @@ private static List<String> getPartitionNames(Configuration conf, |
575 | 625 | Comparator<Map.Entry<String, Integer>> specIdComparator) throws SemanticException { |
576 | 626 | Table icebergTable = getTable(conf, table.getTTable()); |
577 | 627 |
|
578 | | - Expression partitionExpr = IcebergTableUtil.generateExpressionFromPartitionSpec( |
| 628 | + Expression partitionExpr = IcebergTableUtil.generateExprFromPartitionSpec( |
579 | 629 | icebergTable, partitionSpec, latestSpecOnly); |
580 | 630 |
|
581 | 631 | int latestSpecId = icebergTable.spec().specId(); |
|
0 commit comments