|
18 | 18 |
|
19 | 19 | package org.apache.paimon.table.source; |
20 | 20 |
|
| 21 | +import org.apache.paimon.manifest.PartitionEntry; |
| 22 | +import org.apache.paimon.options.Options; |
21 | 23 | import org.apache.paimon.predicate.FieldRef; |
22 | 24 | import org.apache.paimon.predicate.Predicate; |
23 | 25 | import org.apache.paimon.predicate.PredicateBuilder; |
|
36 | 38 |
|
37 | 39 | import java.util.Arrays; |
38 | 40 | import java.util.Collections; |
| 41 | +import java.util.HashMap; |
39 | 42 | import java.util.List; |
| 43 | +import java.util.Map; |
40 | 44 |
|
41 | 45 | import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST; |
42 | 46 | import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; |
@@ -539,4 +543,330 @@ public void testPushDownTopNOnlyNull() throws Exception { |
539 | 543 | assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions)) |
540 | 544 | .isNull(); |
541 | 545 | } |
| 546 | + |
| 547 | + @Test |
| 548 | + public void testPartitionFilter() throws Exception { |
| 549 | + // Test partition filter functionality |
| 550 | + StreamTableWrite write = table.newWrite(commitUser); |
| 551 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 552 | + |
| 553 | + // Write data to multiple partitions |
| 554 | + write.write(rowData(1, 10, 100L)); // partition pt=1 |
| 555 | + write.write(rowData(1, 20, 200L)); |
| 556 | + commit.commit(0, write.prepareCommit(true, 0)); |
| 557 | + |
| 558 | + write.write(rowData(2, 30, 300L)); // partition pt=2 |
| 559 | + write.write(rowData(2, 40, 400L)); |
| 560 | + commit.commit(1, write.prepareCommit(true, 1)); |
| 561 | + |
| 562 | + write.write(rowData(3, 50, 500L)); // partition pt=3 |
| 563 | + commit.commit(2, write.prepareCommit(true, 2)); |
| 564 | + |
| 565 | + // Without partition filter - should return all data |
| 566 | + TableScan.Plan planAll = table.newScan().plan(); |
| 567 | + List<String> resultAll = getResult(table.newRead(), planAll.splits()); |
| 568 | + assertThat(resultAll.size()).isEqualTo(5); |
| 569 | + |
| 570 | + // Specify partition filter using Map |
| 571 | + Map<String, String> partitionSpec = new HashMap<>(); |
| 572 | + partitionSpec.put("pt", "1"); |
| 573 | + TableScan.Plan plan1 = table.newScan().withPartitionFilter(partitionSpec).plan(); |
| 574 | + List<String> result1 = getResult(table.newRead(), plan1.splits()); |
| 575 | + assertThat(result1.size()).isEqualTo(2); |
| 576 | + assertThat(result1).allMatch(s -> s.contains("1|")); |
| 577 | + |
| 578 | + // Specify partition filter using BinaryRow |
| 579 | + TableScan.Plan plan2 = |
| 580 | + table.newScan().withPartitionFilter(Collections.singletonList(binaryRow(2))).plan(); |
| 581 | + List<String> result2 = getResult(table.newRead(), plan2.splits()); |
| 582 | + assertThat(result2.size()).isEqualTo(2); |
| 583 | + assertThat(result2).allMatch(s -> s.contains("2|")); |
| 584 | + |
| 585 | + write.close(); |
| 586 | + commit.close(); |
| 587 | + } |
| 588 | + |
| 589 | + @Test |
| 590 | + public void testBucketFilter() throws Exception { |
| 591 | + // Create append-only table with multiple buckets directly |
| 592 | + Options conf = new Options(); |
| 593 | + conf.set(org.apache.paimon.CoreOptions.BUCKET, 3); |
| 594 | + conf.set(org.apache.paimon.CoreOptions.BUCKET_KEY, "a"); |
| 595 | + |
| 596 | + // Use a new path to avoid schema conflict with the default primary key table |
| 597 | + java.nio.file.Path newTempDir = java.nio.file.Files.createTempDirectory("junit"); |
| 598 | + tablePath = new org.apache.paimon.fs.Path( |
| 599 | + org.apache.paimon.utils.TraceableFileIO.SCHEME + "://" + newTempDir.toString()); |
| 600 | + fileIO = org.apache.paimon.fs.FileIOFinder.find(tablePath); |
| 601 | + table = createFileStoreTable(false, conf, tablePath); |
| 602 | + |
| 603 | + StreamTableWrite write = table.newWrite(commitUser); |
| 604 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 605 | + |
| 606 | + // Write data to different buckets |
| 607 | + for (int i = 0; i < 10; i++) { |
| 608 | + write.write(rowData(1, i, (long) i * 100)); |
| 609 | + commit.commit(i, write.prepareCommit(true, i)); |
| 610 | + } |
| 611 | + |
| 612 | + // Without bucket filter - should return all data |
| 613 | + TableScan.Plan planAll = table.newScan().plan(); |
| 614 | + assertThat(planAll.splits().size()).isEqualTo(10); |
| 615 | + |
| 616 | + // Use bucket filter - only return data from specified bucket |
| 617 | + TableScan.Plan planBucket0 = table.newScan().withBucket(0).plan(); |
| 618 | + assertThat(planBucket0.splits()).allMatch(split -> ((DataSplit) split).bucket() == 0); |
| 619 | + |
| 620 | + // Use bucketFilter - filter out specific buckets |
| 621 | + TableScan.Plan planBucketFilter = |
| 622 | + table.newScan().withBucketFilter(bucket -> bucket == 1 || bucket == 2).plan(); |
| 623 | + assertThat(planBucketFilter.splits()) |
| 624 | + .allMatch( |
| 625 | + split -> { |
| 626 | + int bucket = ((DataSplit) split).bucket(); |
| 627 | + return bucket == 1 || bucket == 2; |
| 628 | + }); |
| 629 | + |
| 630 | + write.close(); |
| 631 | + commit.close(); |
| 632 | + } |
| 633 | + |
| 634 | + @Test |
| 635 | + public void testLevelFilter() throws Exception { |
| 636 | + // Test level filter for primary key table |
| 637 | + StreamTableWrite write = table.newWrite(commitUser); |
| 638 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 639 | + |
| 640 | + // Write data to trigger compaction and produce files at different levels |
| 641 | + for (int i = 0; i < 10; i++) { |
| 642 | + write.write(rowData(1, i, (long) i * 100)); |
| 643 | + commit.commit(i, write.prepareCommit(true, i)); |
| 644 | + } |
| 645 | + |
| 646 | + // Without level filter |
| 647 | + TableScan.Plan planAll = table.newScan().plan(); |
| 648 | + assertThat(planAll.splits().size()).isGreaterThan(0); |
| 649 | + |
| 650 | + // Use level filter - only return level 0 data |
| 651 | + TableScan.Plan planLevel0 = table.newScan().withLevelFilter(level -> level == 0).plan(); |
| 652 | + for (Split split : planLevel0.splits()) { |
| 653 | + DataSplit dataSplit = (DataSplit) split; |
| 654 | + assertThat(dataSplit.dataFiles()).allMatch(file -> file.level() == 0); |
| 655 | + } |
| 656 | + |
| 657 | + write.close(); |
| 658 | + commit.close(); |
| 659 | + } |
| 660 | + |
| 661 | + @Test |
| 662 | + public void testListPartitionEntries() throws Exception { |
| 663 | + StreamTableWrite write = table.newWrite(commitUser); |
| 664 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 665 | + |
| 666 | + // Write data to multiple partitions |
| 667 | + write.write(rowData(1, 10, 100L)); |
| 668 | + commit.commit(0, write.prepareCommit(true, 0)); |
| 669 | + |
| 670 | + write.write(rowData(2, 20, 200L)); |
| 671 | + commit.commit(1, write.prepareCommit(true, 1)); |
| 672 | + |
| 673 | + write.write(rowData(3, 30, 300L)); |
| 674 | + commit.commit(2, write.prepareCommit(true, 2)); |
| 675 | + |
| 676 | + // Test listPartitionEntries |
| 677 | + List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries(); |
| 678 | + assertThat(partitionEntries.size()).isEqualTo(3); |
| 679 | + |
| 680 | + // Verify partition values |
| 681 | + List<Integer> partitionValues = |
| 682 | + partitionEntries.stream() |
| 683 | + .map(entry -> entry.partition().getInt(0)) |
| 684 | + .sorted() |
| 685 | + .collect(java.util.stream.Collectors.toList()); |
| 686 | + assertThat(partitionValues).containsExactly(1, 2, 3); |
| 687 | + |
| 688 | + // Test listPartitions (convenience method) |
| 689 | + List<org.apache.paimon.data.BinaryRow> partitions = table.newScan().listPartitions(); |
| 690 | + assertThat(partitions.size()).isEqualTo(3); |
| 691 | + |
| 692 | + write.close(); |
| 693 | + commit.close(); |
| 694 | + } |
| 695 | + |
| 696 | + @Test |
| 697 | + public void testPrimaryKeyTableScan() throws Exception { |
| 698 | + // Use existing primary key table (default table is primary key table) |
| 699 | + StreamTableWrite write = table.newWrite(commitUser); |
| 700 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 701 | + |
| 702 | + // Write data |
| 703 | + write.write(rowData(1, 10, 100L)); |
| 704 | + write.write(rowData(1, 20, 200L)); |
| 705 | + commit.commit(0, write.prepareCommit(true, 0)); |
| 706 | + |
| 707 | + // Update data (primary key is pt, a) |
| 708 | + write.write(rowData(1, 10, 101L)); // Update data for (1, 10) |
| 709 | + commit.commit(1, write.prepareCommit(true, 1)); |
| 710 | + |
| 711 | + // Verify scan result - should only have the latest values |
| 712 | + TableScan.Plan plan = table.newScan().plan(); |
| 713 | + List<String> result = getResult(table.newRead(), plan.splits()); |
| 714 | + assertThat(result.size()).isEqualTo(2); |
| 715 | + assertThat(result).containsExactlyInAnyOrder("+I 1|10|101", "+I 1|20|200"); |
| 716 | + |
| 717 | + // Delete data |
| 718 | + write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 200L)); |
| 719 | + commit.commit(2, write.prepareCommit(true, 2)); |
| 720 | + |
| 721 | + // Verify result after deletion |
| 722 | + TableScan.Plan planAfterDelete = table.newScan().plan(); |
| 723 | + List<String> resultAfterDelete = getResult(table.newRead(), planAfterDelete.splits()); |
| 724 | + assertThat(resultAfterDelete.size()).isEqualTo(1); |
| 725 | + assertThat(resultAfterDelete).containsExactly("+I 1|10|101"); |
| 726 | + |
| 727 | + write.close(); |
| 728 | + commit.close(); |
| 729 | + } |
| 730 | + |
| 731 | + @Test |
| 732 | + public void testEmptyTableScan() throws Exception { |
| 733 | + // Test empty table scan |
| 734 | + TableScan.Plan plan = table.newScan().plan(); |
| 735 | + assertThat(plan.splits()).isEmpty(); |
| 736 | + |
| 737 | + // Partition list for empty table |
| 738 | + List<PartitionEntry> partitionEntries = table.newScan().listPartitionEntries(); |
| 739 | + assertThat(partitionEntries).isEmpty(); |
| 740 | + } |
| 741 | + |
| 742 | + @Test |
| 743 | + public void testScanWithMultipleFilters() throws Exception { |
| 744 | + createAppendOnlyTable(); |
| 745 | + |
| 746 | + StreamTableWrite write = table.newWrite(commitUser); |
| 747 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 748 | + |
| 749 | + // Write test data |
| 750 | + for (int pt = 1; pt <= 3; pt++) { |
| 751 | + for (int a = 1; a <= 10; a++) { |
| 752 | + write.write(rowData(pt, a * 10, (long) pt * 1000 + a * 100)); |
| 753 | + commit.commit(pt * 100 + a, write.prepareCommit(true, pt * 100 + a)); |
| 754 | + } |
| 755 | + } |
| 756 | + |
| 757 | + // Combine partition filter and column filter |
| 758 | + Map<String, String> partitionSpec = new HashMap<>(); |
| 759 | + partitionSpec.put("pt", "2"); |
| 760 | + |
| 761 | + Predicate filter = |
| 762 | + new PredicateBuilder(table.schema().logicalRowType()) |
| 763 | + .greaterOrEqual(1, 50); // a >= 50 |
| 764 | + |
| 765 | + TableScan.Plan plan = |
| 766 | + table.newScan().withPartitionFilter(partitionSpec).withFilter(filter).plan(); |
| 767 | + |
| 768 | + List<String> result = getResult(table.newRead(), plan.splits()); |
| 769 | + |
| 770 | + // Verify result: only data with pt=2 and a >= 50 |
| 771 | + assertThat(result).allMatch(s -> s.contains("2|")); |
| 772 | + for (String r : result) { |
| 773 | + String[] parts = r.split("\\|"); |
| 774 | + int aValue = Integer.parseInt(parts[1].trim()); |
| 775 | + assertThat(aValue).isGreaterThanOrEqualTo(50); |
| 776 | + } |
| 777 | + |
| 778 | + write.close(); |
| 779 | + commit.close(); |
| 780 | + } |
| 781 | + |
| 782 | + @Test |
| 783 | + public void testLimitWithPartitionFilter() throws Exception { |
| 784 | + createAppendOnlyTable(); |
| 785 | + |
| 786 | + StreamTableWrite write = table.newWrite(commitUser); |
| 787 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 788 | + |
| 789 | + // Write data to different partitions |
| 790 | + for (int pt = 1; pt <= 3; pt++) { |
| 791 | + for (int i = 0; i < 10; i++) { |
| 792 | + write.write(rowData(pt, i, (long) pt * 1000 + i * 100)); |
| 793 | + commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i)); |
| 794 | + } |
| 795 | + } |
| 796 | + |
| 797 | + // Use partition filter + limit |
| 798 | + Map<String, String> partitionSpec = new HashMap<>(); |
| 799 | + partitionSpec.put("pt", "2"); |
| 800 | + |
| 801 | + TableScan.Plan plan = |
| 802 | + table.newScan().withPartitionFilter(partitionSpec).withLimit(5).plan(); |
| 803 | + |
| 804 | + // Should return at most 5 splits (1 row per split) |
| 805 | + assertThat(plan.splits().size()).isLessThanOrEqualTo(5); |
| 806 | + |
| 807 | + // All data should come from partition 2 |
| 808 | + List<String> result = getResult(table.newRead(), plan.splits()); |
| 809 | + assertThat(result).allMatch(s -> s.contains("2|")); |
| 810 | + |
| 811 | + write.close(); |
| 812 | + commit.close(); |
| 813 | + } |
| 814 | + |
| 815 | + @Test |
| 816 | + public void testScanAfterCompaction() throws Exception { |
| 817 | + // Test scan after compaction for primary key table |
| 818 | + StreamTableWrite write = table.newWrite(commitUser); |
| 819 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 820 | + |
| 821 | + // Write data with same primary key multiple times to trigger compaction |
| 822 | + for (int i = 0; i < 5; i++) { |
| 823 | + write.write(rowData(1, 10, 100L + i)); |
| 824 | + commit.commit(i, write.prepareCommit(true, i)); |
| 825 | + } |
| 826 | + |
| 827 | + // Scan result should only have the latest value |
| 828 | + TableScan.Plan plan = table.newScan().plan(); |
| 829 | + List<String> result = getResult(table.newRead(), plan.splits()); |
| 830 | + assertThat(result.size()).isEqualTo(1); |
| 831 | + assertThat(result).containsExactly("+I 1|10|104"); // latest value |
| 832 | + |
| 833 | + write.close(); |
| 834 | + commit.close(); |
| 835 | + } |
| 836 | + |
| 837 | + @Test |
| 838 | + public void testTopNWithPartitionFilter() throws Exception { |
| 839 | + createAppendOnlyTable(); |
| 840 | + |
| 841 | + StreamTableWrite write = table.newWrite(commitUser); |
| 842 | + StreamTableCommit commit = table.newCommit(commitUser); |
| 843 | + |
| 844 | + // Write data to different partitions |
| 845 | + for (int pt = 1; pt <= 2; pt++) { |
| 846 | + for (int i = 1; i <= 5; i++) { |
| 847 | + write.write(rowData(pt, i * 10, (long) pt * 1000 + i * 100)); |
| 848 | + commit.commit(pt * 100 + i, write.prepareCommit(true, pt * 100 + i)); |
| 849 | + } |
| 850 | + } |
| 851 | + |
| 852 | + // Combine partition filter and TopN |
| 853 | + Map<String, String> partitionSpec = new HashMap<>(); |
| 854 | + partitionSpec.put("pt", "1"); |
| 855 | + |
| 856 | + DataField field = table.schema().fields().get(1); |
| 857 | + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); |
| 858 | + |
| 859 | + TableScan.Plan plan = |
| 860 | + table.newScan() |
| 861 | + .withPartitionFilter(partitionSpec) |
| 862 | + .withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 2)) |
| 863 | + .plan(); |
| 864 | + |
| 865 | + // Verify result: only pt=1 data, and top 2 |
| 866 | + List<Split> splits = plan.splits(); |
| 867 | + assertThat(splits.size()).isLessThanOrEqualTo(2); |
| 868 | + |
| 869 | + write.close(); |
| 870 | + commit.close(); |
| 871 | + } |
542 | 872 | } |
0 commit comments