diff --git a/.github/workflows/utitcase-flink-1.x-common.yml b/.github/workflows/utitcase-flink-1.x-common.yml index 6825d8e7a369..f22b56014fe2 100644 --- a/.github/workflows/utitcase-flink-1.x-common.yml +++ b/.github/workflows/utitcase-flink-1.x-common.yml @@ -38,7 +38,7 @@ concurrency: jobs: build_test: runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 100 steps: - name: Checkout code diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 1003aff77cfb..9cde0eefe38a 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -80,6 +80,12 @@ Duration Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition. + +
lookup.dynamic-partition.refresh.async
+ false + Boolean + Whether to refresh dynamic partition lookup table asynchronously. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded. +
lookup.refresh.async
false @@ -357,4 +363,4 @@ Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. - + \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 8febafc5e3c5..bfb4b7b8d1e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -276,6 +276,17 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to refresh lookup table in an async thread."); + public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC = + ConfigOptions.key("lookup.dynamic-partition.refresh.async") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to refresh dynamic partition lookup table asynchronously. " + + "When enabled, partition changes will be loaded in a background thread " + + "while the old partition data continues serving queries. " + + "When disabled (default), partition refresh is synchronous and blocks queries " + + "until the new partition data is fully loaded."); + public static final ConfigOption LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT = ConfigOptions.key("lookup.refresh.async.pending-snapshot-count") .intType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 5b71664ca6a9..ce5068345fc9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -271,13 +271,16 @@ public Collection lookup(RowData keyRow) { if (partitionLoader == null) { return lookupInternal(key); } - - if (partitionLoader.partitions().isEmpty()) { + List partitions = + lookupTable.scanPartitions() == null + ? partitionLoader.partitions() + : lookupTable.scanPartitions(); + if (partitions.isEmpty()) { return Collections.emptyList(); } List rows = new ArrayList<>(); - for (BinaryRow partition : partitionLoader.partitions()) { + for (BinaryRow partition : partitions) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; @@ -324,7 +327,15 @@ void tryRefresh() throws Exception { return; } - // 2. refresh dynamic partition + // 2. check if async partition refresh has completed, and switch if so + LookupTable switchedTable = lookupTable.checkPartitionRefreshCompletion(); + if (switchedTable != null) { + close(); + lookupTable = switchedTable; + path = ((FullCacheLookupTable) switchedTable).context.tempPath; + } + + // 3. refresh dynamic partition if (partitionLoader != null) { boolean partitionChanged = partitionLoader.checkRefresh(); List partitions = partitionLoader.partitions(); @@ -334,18 +345,14 @@ void tryRefresh() throws Exception { } if (partitionChanged) { - // reopen with latest partition - lookupTable.specifyPartitions( - partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); - lookupTable.close(); - lookupTable.open(); + lookupTable.startPartitionRefresh( + partitions, partitionLoader.createSpecificPartFilter()); nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); - // no need to refresh the lookup table because it is reopened return; } } - // 3. refresh lookup table + // 4. refresh lookup table if (shouldRefreshLookupTable()) { // Check if we should do full load (close and reopen table) instead of incremental // refresh diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 81af38ea8ff2..48bcb9418c9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -55,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -68,9 +69,11 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY; +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS; /** Lookup table of full cache. */ public abstract class FullCacheLookupTable implements LookupTable { @@ -96,6 +99,12 @@ public abstract class FullCacheLookupTable implements LookupTable { @Nullable private Predicate partitionFilter; @Nullable private Filter cacheRowFilter; + // ---- Partition refresh fields ---- + private final boolean partitionRefreshAsync; + @Nullable private ExecutorService partitionRefreshExecutor; + private AtomicReference pendingLookupTable; + private AtomicReference partitionRefreshException; + public FullCacheLookupTable(Context context) { this.table = context.table; List sequenceFields = new ArrayList<>(); @@ -137,6 +146,7 @@ public FullCacheLookupTable(Context context) { this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); this.cachedException = new AtomicReference<>(); this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); + this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC); } @Override @@ -166,6 +176,21 @@ protected void init() throws Exception { "%s-lookup-refresh", Thread.currentThread().getName()))) : null; + if (partitionRefreshAsync) { + initPartitionRefresh(); + } + } + + private void initPartitionRefresh() { + this.pendingLookupTable = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.scanPartitions = scanPartitions != null ? scanPartitions : Collections.emptyList(); + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh-partition", + Thread.currentThread().getName()))); } private StateFactory createStateFactory() throws IOException { @@ -338,12 +363,119 @@ public Predicate projectedPredicate() { public abstract TableBulkLoader createBulkLoader(); + // ---- Partition refresh implementation ---- + + @Override + public void startPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + if (partitionRefreshAsync) { + asyncPartitionRefresh(newPartitions, partitionFilter); + } else { + syncPartitionRefresh(newPartitions, partitionFilter); + } + } + + private void syncPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + LOG.info( + "Synchronously refreshing partition for table {}, new partitions detected.", + table.name()); + close(); + specifyPartitions(newPartitions, partitionFilter); + open(); + LOG.info("Synchronous partition refresh completed for table {}.", table.name()); + } + + private void asyncPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) { + + LOG.info( + "Starting async partition refresh for table {}, new partitions detected.", + table.name()); + + partitionRefreshExecutor.submit( + () -> { + File newPath = null; + try { + newPath = + new File( + context.tempPath.getParent(), + "lookup-" + java.util.UUID.randomUUID()); + if (!newPath.mkdirs()) { + throw new RuntimeException("Failed to create dir: " + newPath); + } + LookupTable newTable = copyWithNewPath(newPath); + newTable.specifyPartitions(newPartitions, partitionFilter); + newTable.open(); + + pendingLookupTable.set(newTable); + LOG.info("Async partition refresh completed for table {}.", table.name()); + } catch (Exception e) { + LOG.error("Async partition refresh failed for table {}.", table.name(), e); + partitionRefreshException.set(e); + if (newPath != null) { + FileIOUtils.deleteDirectoryQuietly(newPath); + } + } + }); + } + + @Override + public LookupTable copyWithNewPath(File newPath) { + Context newContext = context.copy(newPath); + Options options = Options.fromMap(context.table.options()); + FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS)); + if (cacheRowFilter != null) { + newTable.specifyCacheRowFilter(cacheRowFilter); + } + return newTable; + } + + @Nullable + @Override + public LookupTable checkPartitionRefreshCompletion() throws Exception { + if (!partitionRefreshAsync) { + return null; + } + + Exception asyncException = partitionRefreshException.getAndSet(null); + if (asyncException != null) { + LOG.error( + "Async partition refresh failed for table {}, " + "will stop running.", + table.name(), + asyncException); + throw asyncException; + } + + LookupTable newTable = pendingLookupTable.getAndSet(null); + if (newTable == null) { + return null; + } + + LOG.info("Switched to new lookup table for table {} with new partitions.", table.name()); + return newTable; + } + + @Override + public List scanPartitions() { + return scanPartitions; + } + @Override public void close() throws IOException { try { if (refreshExecutor != null) { ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor); } + if (partitionRefreshExecutor != null) { + ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, partitionRefreshExecutor); + } + if (pendingLookupTable != null) { + LookupTable pending = pendingLookupTable.getAndSet(null); + if (pending != null) { + pending.close(); + } + } } finally { stateFactory.close(); FileIOUtils.deleteDirectory(context.tempPath); @@ -414,5 +546,16 @@ public Context copy(int[] newProjection) { joinKey, requiredCachedBucketIds); } + + public Context copy(File newTempPath) { + return new Context( + table.wrapped(), + projection, + tablePredicate, + projectedPredicate, + newTempPath, + joinKey, + requiredCachedBucketIds); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index 3ca792d39d5d..3a4117561e09 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.util.List; @@ -43,4 +44,54 @@ public interface LookupTable extends Closeable { void specifyCacheRowFilter(Filter filter); Long nextSnapshotId(); + + // ---- Partition refresh methods ---- + + /** + * Create a new LookupTable instance with the same configuration but a different temp path. The + * new table is not opened yet. + * + * @throws UnsupportedOperationException if the implementation does not support this operation + */ + default LookupTable copyWithNewPath(File newPath) { + throw new UnsupportedOperationException( + "copyWithNewPath is not supported by " + getClass().getSimpleName()); + } + + /** + * Start refresh partition. + * + * @param newPartitions the new partitions to refresh to + * @param partitionFilter the partition filter for the new partitions + */ + default void startPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + close(); + specifyPartitions(newPartitions, partitionFilter); + open(); + } + + /** + * Check if an async partition refresh has completed. If a new table is ready, this method + * returns it and the caller should replace its current lookup table reference. Returns {@code + * null} if no switch is needed. + * + *

For synchronous partition refresh, this always returns {@code null}. + */ + @Nullable + default LookupTable checkPartitionRefreshCompletion() throws Exception { + return null; + } + + /** + * Return the partitions that the current lookup table was loaded with. During async refresh, + * this may differ from the latest partitions detected by the partition loader. + * + * @return the active partitions, or {@code null} if partition refresh is not managed by this + * table + */ + @Nullable + default List scanPartitions() { + return null; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 408d74ea0cd9..bda4bcc567d0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1250,4 +1250,408 @@ public void testFallbackCacheMode() throws Exception { iterator.close(); } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL"}) + public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies synchronous partition refresh (default mode): + // when max_pt() changes, the lookup table is refreshed synchronously, + // so queries immediately return new partition data after refresh. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert data into a new partition '2', which will trigger sync partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies asynchronous partition refresh: + // when max_pt() changes, the lookup table is refreshed in a background thread, + // old partition data continues serving queries until the new partition is fully loaded. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert data into a new partition '2', which will trigger async partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + Thread.sleep(500); // wait for async refresh to complete + // trigger a lookup to check async completion and switch to new partition + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + // insert another new partition '3' and verify switch again + sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); + Thread.sleep(500); // wait for async refresh to complete + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode) + throws Exception { + // Verify that during async refresh, queries still return old partition data + // until the new partition is fully loaded and switched. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + + // immediately query before async refresh completes — should still return old partition data + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // old partition data (100, 200) should still be served + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // now wait for async refresh to complete and trigger switch + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // after switch, new partition data should be returned + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMode mode) + throws Exception { + // Verify that incremental data updates in the old partition are visible + // before async partition switch happens. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // update data in the current partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)"); + Thread.sleep(500); // wait for incremental refresh + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // now insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception { + // Verify async partition refresh works correctly with multi-level partition keys. + sql( + "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt1`, `pt2`) WITH (" + + "'bucket' = '1', " + + "'scan.partitions' = 'pt1=max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2024', 1, 1, 100), ('2024', 1, 2, 200), " + + "('2024', 2, 1, 300), ('2024', 2, 2, 400)"); + + String query = + "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2024", 1, 1, 100), + Row.of("2024", 1, 2, 200), + Row.of("2024", 2, 1, 300), + Row.of("2024", 2, 2, 400)); + + // insert new max partition '2025' with sub-partitions + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), " + + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2025", 1, 1, 1000), + Row.of("2025", 1, 2, 2000), + Row.of("2025", 2, 1, 3000), + Row.of("2025", 2, 2, 4000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithOverwrite() throws Exception { + // Verify async partition refresh works correctly when a new max partition + // is created via INSERT OVERWRITE. + sql( + "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // overwrite current max partition with new data + sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // overwrite to create a new max partition + sql( + "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2), (3)"); + iterator.collect(3); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2), (3)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), Row.of(3, 3000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception { + // Verify async partition refresh works correctly with max_two_pt() strategy. + sql( + "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_two_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + // insert data into partitions '1' and '2' + sql( + "INSERT INTO TWO_PT_DIM VALUES " + + "('1', 1, 100), ('1', 2, 200), " + + "('2', 1, 300), ('2', 2, 400)"); + + String query = + "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("1", 1, 100), + Row.of("1", 2, 200), + Row.of("2", 1, 300), + Row.of("2", 2, 400)); + + // insert new partition '3', now max_two_pt should be '2' and '3' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + // should now see data from partitions '2' and '3' + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2", 1, 300), + Row.of("2", 2, 400), + Row.of("3", 1, 1000), + Row.of("3", 2, 2000)); + + // insert another partition '4', max_two_pt should be '3' and '4' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("3", 1, 1000), + Row.of("3", 2, 2000), + Row.of("4", 1, 10000), + Row.of("4", 2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws Exception { + // Verify async partition refresh works correctly with non-primary-key append tables. + sql( + "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)" + + "PARTITIONED BY (`pt`) WITH (" + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(3); + // non-pk table may return multiple matches + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(3); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), Row.of(2, 2000)); + + iterator.close(); + } }