newPartitions, @Nullable Predicate partitionFilter) throws Exception {
+ close();
+ specifyPartitions(newPartitions, partitionFilter);
+ open();
+ }
+
+ /**
+ * Check if an async partition refresh has completed. If a new table is ready, this method
+ * returns it and the caller should replace its current lookup table reference. Returns {@code
+ * null} if no switch is needed.
+ *
+ * For synchronous partition refresh, this always returns {@code null}.
+ */
+ @Nullable
+ default LookupTable checkPartitionRefreshCompletion() throws Exception {
+ return null;
+ }
+
+ /**
+ * Return the partitions that the current lookup table was loaded with. During async refresh,
+ * this may differ from the latest partitions detected by the partition loader.
+ *
+ * @return the active partitions, or {@code null} if partition refresh is not managed by this
+ * table
+ */
+ @Nullable
+ default List scanPartitions() {
+ return null;
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 408d74ea0cd9..bda4bcc567d0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1250,4 +1250,408 @@ public void testFallbackCacheMode() throws Exception {
iterator.close();
}
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL"})
+ public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception {
+ // This test verifies synchronous partition refresh (default mode):
+ // when max_pt() changes, the lookup table is refreshed synchronously,
+ // so queries immediately return new partition data after refresh.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ // insert data into partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ // verify initial lookup returns partition '1' data
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert data into a new partition '2', which will trigger sync partition refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception {
+ // This test verifies asynchronous partition refresh:
+ // when max_pt() changes, the lookup table is refreshed in a background thread,
+ // old partition data continues serving queries until the new partition is fully loaded.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ // insert data into partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ // verify initial lookup returns partition '1' data
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert data into a new partition '2', which will trigger async partition refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ // trigger a lookup to check async completion and switch to new partition
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ // insert another new partition '3' and verify switch again
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode)
+ throws Exception {
+ // Verify that during async refresh, queries still return old partition data
+ // until the new partition is fully loaded and switched.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+
+ // immediately query before async refresh completes — should still return old partition data
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // old partition data (100, 200) should still be served
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // now wait for async refresh to complete and trigger switch
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // after switch, new partition data should be returned
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMode mode)
+ throws Exception {
+ // Verify that incremental data updates in the old partition are visible
+ // before async partition switch happens.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // update data in the current partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)");
+ Thread.sleep(500); // wait for incremental refresh
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250));
+
+ // now insert new partition '2' to trigger async refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception {
+ // Verify async partition refresh works correctly with multi-level partition keys.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+ + "'bucket' = '1', "
+ + "'scan.partitions' = 'pt1=max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2024', 1, 1, 100), ('2024', 1, 2, 200), "
+ + "('2024', 2, 1, 300), ('2024', 2, 2, 400)");
+
+ String query =
+ "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2024", 1, 1, 100),
+ Row.of("2024", 1, 2, 200),
+ Row.of("2024", 2, 1, 300),
+ Row.of("2024", 2, 2, 400));
+
+ // insert new max partition '2025' with sub-partitions
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), "
+ + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2025", 1, 1, 1000),
+ Row.of("2025", 1, 2, 2000),
+ Row.of("2025", 2, 1, 3000),
+ Row.of("2025", 2, 2, 4000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithOverwrite() throws Exception {
+ // Verify async partition refresh works correctly when a new max partition
+ // is created via INSERT OVERWRITE.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // overwrite current max partition with new data
+ sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250));
+
+ // overwrite to create a new max partition
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), Row.of(3, 3000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception {
+ // Verify async partition refresh works correctly with max_two_pt() strategy.
+ sql(
+ "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_two_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ // insert data into partitions '1' and '2'
+ sql(
+ "INSERT INTO TWO_PT_DIM VALUES "
+ + "('1', 1, 100), ('1', 2, 200), "
+ + "('2', 1, 300), ('2', 2, 400)");
+
+ String query =
+ "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("1", 1, 100),
+ Row.of("1", 2, 200),
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400));
+
+ // insert new partition '3', now max_two_pt should be '2' and '3'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ // should now see data from partitions '2' and '3'
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400),
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000));
+
+ // insert another partition '4', max_two_pt should be '3' and '4'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000),
+ Row.of("4", 1, 10000),
+ Row.of("4", 2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws Exception {
+ // Verify async partition refresh works correctly with non-primary-key append tables.
+ sql(
+ "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(3);
+ // non-pk table may return multiple matches
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), Row.of(2, 200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), Row.of(2, 2000));
+
+ iterator.close();
+ }
}