From 275ccc28f00c98245b6b1f51db8c7fcb70069b50 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 18:15:25 +0800 Subject: [PATCH 01/18] test --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 4 ++-- paimon-python/pypaimon/write/commit/commit_rollback.py | 8 ++++++++ paimon-python/pypaimon/write/file_store_commit.py | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 55f055ae9abe..e54d609a2397 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -125,10 +125,10 @@ def _initialize_oss_fs(self, path) -> FileSystem: if self._pyarrow_gte_7: client_kwargs['force_virtual_addressing'] = True - client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) + client_kwargs['endpoint_override'] = "oss-cn-hangzhou.aliyuncs.com" else: client_kwargs['endpoint_override'] = (self._oss_bucket + "." + - self.properties.get(OssOptions.OSS_ENDPOINT)) + "oss-cn-hangzhou.aliyuncs.com") retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) diff --git a/paimon-python/pypaimon/write/commit/commit_rollback.py b/paimon-python/pypaimon/write/commit/commit_rollback.py index 66106efae7bd..b8c706f35e09 100644 --- a/paimon-python/pypaimon/write/commit/commit_rollback.py +++ b/paimon-python/pypaimon/write/commit/commit_rollback.py @@ -20,8 +20,12 @@ Commit rollback to rollback 'COMPACT' commits for resolving conflicts. """ +import logging + from pypaimon.table.instant import Instant +logger = logging.getLogger(__name__) + class CommitRollback: """Rollback COMPACT commits to resolve conflicts. @@ -51,11 +55,15 @@ def try_to_rollback(self, latest_snapshot): Returns: True if rollback succeeded, False otherwise. """ + logger.warning("Try rollback...") if latest_snapshot.commit_kind == "COMPACT": latest_id = latest_snapshot.id try: self._table_rollback.rollback_to( Instant.snapshot(latest_id - 1), latest_id) + logger.warning( + "Successfully rolled back COMPACT snapshot %d to %d", + latest_id, latest_id - 1) return True except Exception: pass diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index ee7d7a9694aa..164f2ee51f72 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -314,7 +314,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str start_millis = int(time.time() * 1000) if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() - + logger.warning("hello commit...") unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" From c076cf7e3788383701a7f69702e8f952bfa258cd Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 22:11:20 +0800 Subject: [PATCH 02/18] proto --- .../flink/lookup/FileStoreLookupFunction.java | 240 +++++++++++++++--- .../apache/paimon/flink/LookupJoinITCase.java | 55 ++++ 2 files changed, 266 insertions(+), 29 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 5b71664ca6a9..800083a1b7e6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -62,7 +62,11 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -102,6 +106,18 @@ public class FileStoreLookupFunction implements Serializable, Closeable { // threshold for triggering full table reload when snapshots are pending private transient Integer refreshFullThreshold; + // async partition refresh fields + private transient ExecutorService partitionRefreshExecutor; + private transient volatile Future partitionRefreshFuture; + private transient AtomicReference pendingLookupTable; + private transient AtomicReference pendingPath; + private transient AtomicReference> pendingPartitions; + private transient AtomicReference partitionRefreshException; + // the partitions that the current lookupTable was loaded with; + // during async refresh, partitionLoader.partitions() may already point to new partitions + // while lookupTable still serves old data, so we must use this field for lookup queries + private transient volatile List activePartitions; + protected FunctionContext functionContext; @Nullable private Filter cacheRowFilter; @@ -190,19 +206,69 @@ private void open() throws Exception { projectFields, joinKeys); + this.lookupTable = createLookupTable(path, projection); + + if (partitionLoader != null) { + partitionLoader.open(); + partitionLoader.checkRefresh(); + List partitions = partitionLoader.partitions(); + if (!partitions.isEmpty()) { + lookupTable.specifyPartitions( + partitions, partitionLoader.createSpecificPartFilter()); + } + } + + if (cacheRowFilter != null) { + lookupTable.specifyCacheRowFilter(cacheRowFilter); + } + lookupTable.open(); + + // initialize async partition refresh fields + this.pendingLookupTable = new AtomicReference<>(null); + this.pendingPath = new AtomicReference<>(null); + this.pendingPartitions = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshFuture = null; + this.activePartitions = + partitionLoader != null ? partitionLoader.partitions() : Collections.emptyList(); + if (partitionLoader != null) { + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread thread = new Thread(r); + thread.setName( + Thread.currentThread().getName() + "-partition-refresh"); + thread.setDaemon(true); + return thread; + }); + } + } + + /** + * Create a new {@link LookupTable} instance. This method is used both during initial open and + * during async partition refresh to create a new LookupTable with a separate temp directory. + */ + private LookupTable createLookupTable(File tablePath, int[] projection) { + Options options = Options.fromMap(table.options()); + LookupTable newLookupTable = null; + LOG.info("Creating lookup table for {}.", table.name()); if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { if (isRemoteServiceAvailable(table)) { - this.lookupTable = + newLookupTable = PrimaryKeyPartialLookupTable.createRemoteTable(table, projection, joinKeys); LOG.info( "Remote service is available. Created PrimaryKeyPartialLookupTable with remote service."); } else { try { - this.lookupTable = + newLookupTable = PrimaryKeyPartialLookupTable.createLocalTable( - table, projection, path, joinKeys, getRequireCachedBucketIds()); + table, + projection, + tablePath, + joinKeys, + getRequireCachedBucketIds()); LOG.info( "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor."); } catch (UnsupportedOperationException e) { @@ -214,34 +280,21 @@ private void open() throws Exception { } } - if (lookupTable == null) { + if (newLookupTable == null) { FullCacheLookupTable.Context context = new FullCacheLookupTable.Context( table, projection, predicate, createProjectedPredicate(projection), - path, + tablePath, joinKeys, getRequireCachedBucketIds()); - this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); - LOG.info("Created {}.", lookupTable.getClass().getSimpleName()); - } - - if (partitionLoader != null) { - partitionLoader.open(); - partitionLoader.checkRefresh(); - List partitions = partitionLoader.partitions(); - if (!partitions.isEmpty()) { - lookupTable.specifyPartitions( - partitions, partitionLoader.createSpecificPartFilter()); - } + newLookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); + LOG.info("Created {}.", newLookupTable.getClass().getSimpleName()); } - if (cacheRowFilter != null) { - lookupTable.specifyCacheRowFilter(cacheRowFilter); - } - lookupTable.open(); + return newLookupTable; } @Nullable @@ -272,12 +325,15 @@ public Collection lookup(RowData keyRow) { return lookupInternal(key); } - if (partitionLoader.partitions().isEmpty()) { + // use the partitions that match the current lookupTable, not + // partitionLoader.partitions() which may already point to new partitions + // during async refresh + if (activePartitions.isEmpty()) { return Collections.emptyList(); } List rows = new ArrayList<>(); - for (BinaryRow partition : partitionLoader.partitions()) { + for (BinaryRow partition : activePartitions) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; @@ -319,6 +375,9 @@ private void reopen() { @VisibleForTesting void tryRefresh() throws Exception { + // 0. check if async partition refresh has completed, and switch if so + checkAsyncPartitionRefreshCompletion(); + // 1. check if this time is in black list if (refreshBlacklist != null && !refreshBlacklist.canRefresh()) { return; @@ -334,13 +393,8 @@ void tryRefresh() throws Exception { } if (partitionChanged) { - // reopen with latest partition - lookupTable.specifyPartitions( - partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); - lookupTable.close(); - lookupTable.open(); + startAsyncPartitionRefresh(partitions); nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); - // no need to refresh the lookup table because it is reopened return; } } @@ -365,6 +419,114 @@ void tryRefresh() throws Exception { } } + /** + * Start an async task to create and load a new {@link LookupTable} for the new partitions. The + * current lookup table continues serving queries until the new one is fully loaded. + */ + private void startAsyncPartitionRefresh(List newPartitions) { + // if there is already an async refresh in progress, skip this one + if (partitionRefreshFuture != null && !partitionRefreshFuture.isDone()) { + LOG.info( + "Async partition refresh is already in progress for table {}, " + + "skipping new partition change.", + table.name()); + return; + } + + List fieldNames = table.rowType().getFieldNames(); + int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); + + LOG.info( + "Starting async partition refresh for table {}, new partitions detected.", + table.name()); + + partitionRefreshFuture = + partitionRefreshExecutor.submit( + () -> { + File newPath = null; + try { + newPath = + new File( + path.getParent(), + "lookup-partition-refresh-" + UUID.randomUUID()); + if (!newPath.mkdirs()) { + throw new RuntimeException("Failed to create dir: " + newPath); + } + LookupTable newTable = createLookupTable(newPath, projection); + newTable.specifyPartitions( + newPartitions, partitionLoader.createSpecificPartFilter()); + if (cacheRowFilter != null) { + newTable.specifyCacheRowFilter(cacheRowFilter); + } + newTable.open(); + + pendingLookupTable.set(newTable); + pendingPath.set(newPath); + pendingPartitions.set(newPartitions); + LOG.info( + "Async partition refresh completed for table {}.", + table.name()); + } catch (Exception e) { + LOG.error( + "Async partition refresh failed for table {}.", + table.name(), + e); + partitionRefreshException.set(e); + // clean up the partially created lookup table + if (newPath != null) { + FileIOUtils.deleteDirectoryQuietly(newPath); + } + } + }); + } + + /** + * Check if the async partition refresh has completed. If so, atomically switch to the new + * lookup table and close the old one. + */ + private void checkAsyncPartitionRefreshCompletion() throws Exception { + // propagate any exception from the async refresh thread + Exception asyncException = partitionRefreshException.getAndSet(null); + if (asyncException != null) { + LOG.warn( + "Async partition refresh had failed for table {}, " + + "will retry on next partition change. Continuing with old partitions.", + table.name()); + } + + LookupTable newTable = pendingLookupTable.getAndSet(null); + if (newTable == null) { + return; + } + + File newPath = pendingPath.getAndSet(null); + List newPartitions = pendingPartitions.getAndSet(null); + + // switch: close old lookup table and replace with the new one + LookupTable oldTable = this.lookupTable; + File oldPath = this.path; + + this.lookupTable = newTable; + if (newPath != null) { + this.path = newPath; + } + if (newPartitions != null) { + this.activePartitions = newPartitions; + } + + LOG.info("Switched to new lookup table for table {} with new partitions.", table.name()); + + // close old table and clean up old temp directory + try { + oldTable.close(); + } catch (IOException e) { + LOG.warn("Failed to close old lookup table for table {}.", table.name(), e); + } + if (!oldPath.equals(this.path)) { + FileIOUtils.deleteDirectoryQuietly(oldPath); + } + } + /** * Check if we should do full load instead of incremental refresh. This can improve performance * when there are many pending snapshots. @@ -415,6 +577,26 @@ long nextBlacklistCheckTime() { @Override public void close() throws IOException { + // shut down async partition refresh executor + if (partitionRefreshExecutor != null) { + partitionRefreshExecutor.shutdownNow(); + partitionRefreshExecutor = null; + } + + // clean up any pending lookup table that was not yet switched + if (pendingLookupTable != null) { + LookupTable pending = pendingLookupTable.getAndSet(null); + if (pending != null) { + pending.close(); + } + } + if (pendingPath != null) { + File pending = pendingPath.getAndSet(null); + if (pending != null) { + FileIOUtils.deleteDirectoryQuietly(pending); + } + } + if (lookupTable != null) { lookupTable.close(); lookupTable = null; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 408d74ea0cd9..d7e4539b4b1b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1250,4 +1250,59 @@ public void testFallbackCacheMode() throws Exception { iterator.close(); } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncPartitionRefreshServesOldDataDuringSwitch(LookupCacheMode mode) + throws Exception { + // This test verifies that during async partition refresh (max_pt() change), + // queries continue to return old partition data until the new partition is fully loaded. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert data into a new partition '2', which will trigger async partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + Thread.sleep(1000); // wait for async refresh + // after async refresh completes, should get new partition '2' data + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(1000); + + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + // insert another new partition '3' and verify switch again + sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); + Thread.sleep(1000); // wait for async refresh + + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); + + iterator.close(); + } } From de94f4640866e463166af8fa68b8ff6b3b6ec67f Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 22:18:50 +0800 Subject: [PATCH 03/18] proto2 --- .../paimon/flink/FlinkConnectorOptions.java | 11 +++ .../flink/lookup/FileStoreLookupFunction.java | 71 ++++++++++++------- .../apache/paimon/flink/LookupJoinITCase.java | 63 +++++++++++++--- 3 files changed, 112 insertions(+), 33 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 8febafc5e3c5..bfb4b7b8d1e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -276,6 +276,17 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to refresh lookup table in an async thread."); + public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC = + ConfigOptions.key("lookup.dynamic-partition.refresh.async") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to refresh dynamic partition lookup table asynchronously. " + + "When enabled, partition changes will be loaded in a background thread " + + "while the old partition data continues serving queries. " + + "When disabled (default), partition refresh is synchronous and blocks queries " + + "until the new partition data is fully loaded."); + public static final ConfigOption LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT = ConfigOptions.key("lookup.refresh.async.pending-snapshot-count") .intType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 800083a1b7e6..dd0a79f88f22 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -18,6 +18,10 @@ package org.apache.paimon.flink.lookup; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -30,24 +34,17 @@ import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Preconditions; - -import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; - -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.TableFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -72,6 +69,7 @@ import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; @@ -106,7 +104,10 @@ public class FileStoreLookupFunction implements Serializable, Closeable { // threshold for triggering full table reload when snapshots are pending private transient Integer refreshFullThreshold; - // async partition refresh fields + // whether to refresh dynamic partition asynchronously + private transient boolean partitionRefreshAsync; + + // async partition refresh fields (only used when partitionRefreshAsync is true) private transient ExecutorService partitionRefreshExecutor; private transient volatile Future partitionRefreshFuture; private transient AtomicReference pendingLookupTable; @@ -198,6 +199,7 @@ private void open() throws Exception { options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL) .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL)); this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD); + this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC); List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); @@ -224,14 +226,13 @@ private void open() throws Exception { lookupTable.open(); // initialize async partition refresh fields - this.pendingLookupTable = new AtomicReference<>(null); - this.pendingPath = new AtomicReference<>(null); - this.pendingPartitions = new AtomicReference<>(null); - this.partitionRefreshException = new AtomicReference<>(null); - this.partitionRefreshFuture = null; - this.activePartitions = - partitionLoader != null ? partitionLoader.partitions() : Collections.emptyList(); - if (partitionLoader != null) { + if (partitionRefreshAsync && partitionLoader != null) { + this.pendingLookupTable = new AtomicReference<>(null); + this.pendingPath = new AtomicReference<>(null); + this.pendingPartitions = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshFuture = null; + this.activePartitions = partitionLoader.partitions(); this.partitionRefreshExecutor = Executors.newSingleThreadExecutor( r -> { @@ -325,15 +326,17 @@ public Collection lookup(RowData keyRow) { return lookupInternal(key); } - // use the partitions that match the current lookupTable, not - // partitionLoader.partitions() which may already point to new partitions - // during async refresh - if (activePartitions.isEmpty()) { + // when async refresh is enabled, use activePartitions which tracks the + // partitions that match the current lookupTable; otherwise use + // partitionLoader.partitions() directly since sync refresh keeps them consistent + List currentPartitions = + partitionRefreshAsync ? activePartitions : partitionLoader.partitions(); + if (currentPartitions.isEmpty()) { return Collections.emptyList(); } List rows = new ArrayList<>(); - for (BinaryRow partition : activePartitions) { + for (BinaryRow partition : currentPartitions) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; @@ -376,7 +379,9 @@ private void reopen() { @VisibleForTesting void tryRefresh() throws Exception { // 0. check if async partition refresh has completed, and switch if so - checkAsyncPartitionRefreshCompletion(); + if (partitionRefreshAsync) { + checkAsyncPartitionRefreshCompletion(); + } // 1. check if this time is in black list if (refreshBlacklist != null && !refreshBlacklist.canRefresh()) { @@ -393,7 +398,11 @@ void tryRefresh() throws Exception { } if (partitionChanged) { - startAsyncPartitionRefresh(partitions); + if (partitionRefreshAsync) { + startAsyncPartitionRefresh(partitions); + } else { + syncPartitionRefresh(partitions); + } nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); return; } @@ -419,6 +428,20 @@ void tryRefresh() throws Exception { } } + /** + * Synchronously refresh the lookup table for new partitions. This blocks queries until the new + * partition data is fully loaded. + */ + private void syncPartitionRefresh(List newPartitions) throws Exception { + LOG.info( + "Synchronously refreshing partition for table {}, new partitions detected.", + table.name()); + lookupTable.close(); + lookupTable.specifyPartitions(newPartitions, partitionLoader.createSpecificPartFilter()); + lookupTable.open(); + LOG.info("Synchronous partition refresh completed for table {}.", table.name()); + } + /** * Start an async task to create and load a new {@link LookupTable} for the new partitions. The * current lookup table continues serving queries until the new one is fully loaded. diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index d7e4539b4b1b..1a7bac65506f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1253,10 +1253,10 @@ public void testFallbackCacheMode() throws Exception { @ParameterizedTest @EnumSource(LookupCacheMode.class) - public void testAsyncPartitionRefreshServesOldDataDuringSwitch(LookupCacheMode mode) - throws Exception { - // This test verifies that during async partition refresh (max_pt() change), - // queries continue to return old partition data until the new partition is fully loaded. + public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies synchronous partition refresh (default mode): + // when max_pt() changes, the lookup table is refreshed synchronously, + // so queries immediately return new partition data after refresh. sql( "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + "PARTITIONED BY (`pt`) WITH (" @@ -1280,22 +1280,67 @@ public void testAsyncPartitionRefreshServesOldDataDuringSwitch(LookupCacheMode m List result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + // insert data into a new partition '2', which will trigger sync partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + Thread.sleep(2000); // wait for partition refresh interval to trigger + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + // insert another new partition '3' and verify switch again + sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); + Thread.sleep(2000); // wait for partition refresh interval to trigger + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies asynchronous partition refresh: + // when max_pt() changes, the lookup table is refreshed in a background thread, + // old partition data continues serving queries until the new partition is fully loaded. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + // insert data into a new partition '2', which will trigger async partition refresh sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); - Thread.sleep(1000); // wait for async refresh - // after async refresh completes, should get new partition '2' data + Thread.sleep(1000); // wait for async refresh to complete + // trigger a lookup to check async completion and switch to new partition sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(2); Thread.sleep(1000); - sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); // insert another new partition '3' and verify switch again sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); - Thread.sleep(1000); // wait for async refresh - + Thread.sleep(1000); // wait for async refresh to complete sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(2); Thread.sleep(1000); From 74fde8a57b81764bd39af2cfd502693e22b01ed1 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 10 Mar 2026 21:00:35 +0800 Subject: [PATCH 04/18] refactor --- .../flink/lookup/FileStoreLookupFunction.java | 214 ++---------------- .../flink/lookup/FullCacheLookupTable.java | 196 +++++++++++++++- .../paimon/flink/lookup/LookupTable.java | 59 +++++ 3 files changed, 274 insertions(+), 195 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index dd0a79f88f22..52fbc8dd3b7d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -59,17 +59,12 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; -import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; @@ -104,21 +99,6 @@ public class FileStoreLookupFunction implements Serializable, Closeable { // threshold for triggering full table reload when snapshots are pending private transient Integer refreshFullThreshold; - // whether to refresh dynamic partition asynchronously - private transient boolean partitionRefreshAsync; - - // async partition refresh fields (only used when partitionRefreshAsync is true) - private transient ExecutorService partitionRefreshExecutor; - private transient volatile Future partitionRefreshFuture; - private transient AtomicReference pendingLookupTable; - private transient AtomicReference pendingPath; - private transient AtomicReference> pendingPartitions; - private transient AtomicReference partitionRefreshException; - // the partitions that the current lookupTable was loaded with; - // during async refresh, partitionLoader.partitions() may already point to new partitions - // while lookupTable still serves old data, so we must use this field for lookup queries - private transient volatile List activePartitions; - protected FunctionContext functionContext; @Nullable private Filter cacheRowFilter; @@ -199,7 +179,6 @@ private void open() throws Exception { options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL) .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL)); this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD); - this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC); List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); @@ -224,25 +203,6 @@ private void open() throws Exception { lookupTable.specifyCacheRowFilter(cacheRowFilter); } lookupTable.open(); - - // initialize async partition refresh fields - if (partitionRefreshAsync && partitionLoader != null) { - this.pendingLookupTable = new AtomicReference<>(null); - this.pendingPath = new AtomicReference<>(null); - this.pendingPartitions = new AtomicReference<>(null); - this.partitionRefreshException = new AtomicReference<>(null); - this.partitionRefreshFuture = null; - this.activePartitions = partitionLoader.partitions(); - this.partitionRefreshExecutor = - Executors.newSingleThreadExecutor( - r -> { - Thread thread = new Thread(r); - thread.setName( - Thread.currentThread().getName() + "-partition-refresh"); - thread.setDaemon(true); - return thread; - }); - } } /** @@ -326,11 +286,12 @@ public Collection lookup(RowData keyRow) { return lookupInternal(key); } - // when async refresh is enabled, use activePartitions which tracks the - // partitions that match the current lookupTable; otherwise use - // partitionLoader.partitions() directly since sync refresh keeps them consistent + // use activePartitions from lookupTable if available (async mode tracks + // which partitions the current table was loaded with); otherwise fall back + // to partitionLoader.partitions() + List activePartitions = lookupTable.activePartitions(); List currentPartitions = - partitionRefreshAsync ? activePartitions : partitionLoader.partitions(); + activePartitions != null ? activePartitions : partitionLoader.partitions(); if (currentPartitions.isEmpty()) { return Collections.emptyList(); } @@ -379,8 +340,19 @@ private void reopen() { @VisibleForTesting void tryRefresh() throws Exception { // 0. check if async partition refresh has completed, and switch if so - if (partitionRefreshAsync) { - checkAsyncPartitionRefreshCompletion(); + LookupTable switchedTable = lookupTable.checkPartitionRefreshCompletion(); + if (switchedTable != null) { + LookupTable oldTable = this.lookupTable; + this.lookupTable = switchedTable; + if (switchedTable instanceof FullCacheLookupTable) { + this.path = ((FullCacheLookupTable) switchedTable).context.tempPath; + } + // close old table and clean up old temp directory + try { + oldTable.close(); + } catch (IOException e) { + LOG.warn("Failed to close old lookup table for table {}.", table.name(), e); + } } // 1. check if this time is in black list @@ -398,11 +370,8 @@ void tryRefresh() throws Exception { } if (partitionChanged) { - if (partitionRefreshAsync) { - startAsyncPartitionRefresh(partitions); - } else { - syncPartitionRefresh(partitions); - } + lookupTable.startPartitionRefresh( + partitions, partitionLoader.createSpecificPartFilter()); nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); return; } @@ -428,128 +397,6 @@ void tryRefresh() throws Exception { } } - /** - * Synchronously refresh the lookup table for new partitions. This blocks queries until the new - * partition data is fully loaded. - */ - private void syncPartitionRefresh(List newPartitions) throws Exception { - LOG.info( - "Synchronously refreshing partition for table {}, new partitions detected.", - table.name()); - lookupTable.close(); - lookupTable.specifyPartitions(newPartitions, partitionLoader.createSpecificPartFilter()); - lookupTable.open(); - LOG.info("Synchronous partition refresh completed for table {}.", table.name()); - } - - /** - * Start an async task to create and load a new {@link LookupTable} for the new partitions. The - * current lookup table continues serving queries until the new one is fully loaded. - */ - private void startAsyncPartitionRefresh(List newPartitions) { - // if there is already an async refresh in progress, skip this one - if (partitionRefreshFuture != null && !partitionRefreshFuture.isDone()) { - LOG.info( - "Async partition refresh is already in progress for table {}, " - + "skipping new partition change.", - table.name()); - return; - } - - List fieldNames = table.rowType().getFieldNames(); - int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); - - LOG.info( - "Starting async partition refresh for table {}, new partitions detected.", - table.name()); - - partitionRefreshFuture = - partitionRefreshExecutor.submit( - () -> { - File newPath = null; - try { - newPath = - new File( - path.getParent(), - "lookup-partition-refresh-" + UUID.randomUUID()); - if (!newPath.mkdirs()) { - throw new RuntimeException("Failed to create dir: " + newPath); - } - LookupTable newTable = createLookupTable(newPath, projection); - newTable.specifyPartitions( - newPartitions, partitionLoader.createSpecificPartFilter()); - if (cacheRowFilter != null) { - newTable.specifyCacheRowFilter(cacheRowFilter); - } - newTable.open(); - - pendingLookupTable.set(newTable); - pendingPath.set(newPath); - pendingPartitions.set(newPartitions); - LOG.info( - "Async partition refresh completed for table {}.", - table.name()); - } catch (Exception e) { - LOG.error( - "Async partition refresh failed for table {}.", - table.name(), - e); - partitionRefreshException.set(e); - // clean up the partially created lookup table - if (newPath != null) { - FileIOUtils.deleteDirectoryQuietly(newPath); - } - } - }); - } - - /** - * Check if the async partition refresh has completed. If so, atomically switch to the new - * lookup table and close the old one. - */ - private void checkAsyncPartitionRefreshCompletion() throws Exception { - // propagate any exception from the async refresh thread - Exception asyncException = partitionRefreshException.getAndSet(null); - if (asyncException != null) { - LOG.warn( - "Async partition refresh had failed for table {}, " - + "will retry on next partition change. Continuing with old partitions.", - table.name()); - } - - LookupTable newTable = pendingLookupTable.getAndSet(null); - if (newTable == null) { - return; - } - - File newPath = pendingPath.getAndSet(null); - List newPartitions = pendingPartitions.getAndSet(null); - - // switch: close old lookup table and replace with the new one - LookupTable oldTable = this.lookupTable; - File oldPath = this.path; - - this.lookupTable = newTable; - if (newPath != null) { - this.path = newPath; - } - if (newPartitions != null) { - this.activePartitions = newPartitions; - } - - LOG.info("Switched to new lookup table for table {} with new partitions.", table.name()); - - // close old table and clean up old temp directory - try { - oldTable.close(); - } catch (IOException e) { - LOG.warn("Failed to close old lookup table for table {}.", table.name(), e); - } - if (!oldPath.equals(this.path)) { - FileIOUtils.deleteDirectoryQuietly(oldPath); - } - } - /** * Check if we should do full load instead of incremental refresh. This can improve performance * when there are many pending snapshots. @@ -600,27 +447,8 @@ long nextBlacklistCheckTime() { @Override public void close() throws IOException { - // shut down async partition refresh executor - if (partitionRefreshExecutor != null) { - partitionRefreshExecutor.shutdownNow(); - partitionRefreshExecutor = null; - } - - // clean up any pending lookup table that was not yet switched - if (pendingLookupTable != null) { - LookupTable pending = pendingLookupTable.getAndSet(null); - if (pending != null) { - pending.close(); - } - } - if (pendingPath != null) { - File pending = pendingPath.getAndSet(null); - if (pending != null) { - FileIOUtils.deleteDirectoryQuietly(pending); - } - } - if (lookupTable != null) { + lookupTable.closePartitionRefresh(); lookupTable.close(); lookupTable = null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 81af38ea8ff2..5394e7e7a185 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -46,15 +46,14 @@ import org.apache.paimon.utils.PartialRow; import org.apache.paimon.utils.TypeUtils; import org.apache.paimon.utils.UserDefinedSeqComparator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -68,9 +67,11 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY; +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS; /** Lookup table of full cache. */ public abstract class FullCacheLookupTable implements LookupTable { @@ -96,6 +97,16 @@ public abstract class FullCacheLookupTable implements LookupTable { @Nullable private Predicate partitionFilter; @Nullable private Filter cacheRowFilter; + // ---- Partition refresh fields ---- + private final boolean partitionRefreshAsync; + @Nullable private ExecutorService partitionRefreshExecutor; + private volatile Future partitionRefreshFuture; + private AtomicReference pendingLookupTable; + private AtomicReference pendingPath; + private AtomicReference> pendingPartitions; + private AtomicReference partitionRefreshException; + private volatile List currentActivePartitions; + public FullCacheLookupTable(Context context) { this.table = context.table; List sequenceFields = new ArrayList<>(); @@ -137,6 +148,18 @@ public FullCacheLookupTable(Context context) { this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); this.cachedException = new AtomicReference<>(); this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); + this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC); + } + + @Override + public LookupTable copyWithNewPath(File newPath) { + Context newContext = context.copyWithNewPath(newPath); + Options options = Options.fromMap(context.table.options()); + FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS)); + if (cacheRowFilter != null) { + newTable.specifyCacheRowFilter(cacheRowFilter); + } + return newTable; } @Override @@ -166,6 +189,7 @@ protected void init() throws Exception { "%s-lookup-refresh", Thread.currentThread().getName()))) : null; + initPartitionRefresh(); } private StateFactory createStateFactory() throws IOException { @@ -338,6 +362,163 @@ public Predicate projectedPredicate() { public abstract TableBulkLoader createBulkLoader(); + // ---- Partition refresh implementation ---- + + @Override + public boolean isPartitionRefreshAsync() { + return partitionRefreshAsync; + } + + private void initPartitionRefresh() { + if (partitionRefreshAsync) { + this.pendingLookupTable = new AtomicReference<>(null); + this.pendingPath = new AtomicReference<>(null); + this.pendingPartitions = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshFuture = null; + this.currentActivePartitions = + scanPartitions != null ? scanPartitions : Collections.emptyList(); + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread thread = new Thread(r); + thread.setName( + Thread.currentThread().getName() + "-partition-refresh"); + thread.setDaemon(true); + return thread; + }); + } + } + + @Override + public void startPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + if (partitionRefreshAsync) { + startAsyncPartitionRefresh(newPartitions, partitionFilter); + } else { + syncPartitionRefresh(newPartitions, partitionFilter); + } + } + + private void syncPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + LOG.info( + "Synchronously refreshing partition for table {}, new partitions detected.", + table.name()); + close(); + specifyPartitions(newPartitions, partitionFilter); + open(); + LOG.info("Synchronous partition refresh completed for table {}.", table.name()); + } + + private void startAsyncPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) { + if (partitionRefreshFuture != null && !partitionRefreshFuture.isDone()) { + LOG.info( + "Async partition refresh is already in progress for table {}, " + + "skipping new partition change.", + table.name()); + return; + } + + LOG.info( + "Starting async partition refresh for table {}, new partitions detected.", + table.name()); + + partitionRefreshFuture = + partitionRefreshExecutor.submit( + () -> { + File newPath = null; + try { + newPath = + new File( + context.tempPath.getParent(), + "lookup-" + java.util.UUID.randomUUID()); + if (!newPath.mkdirs()) { + throw new RuntimeException("Failed to create dir: " + newPath); + } + LookupTable newTable = copyWithNewPath(newPath); + newTable.specifyPartitions(newPartitions, partitionFilter); + newTable.open(); + + pendingLookupTable.set(newTable); + pendingPath.set(newPath); + pendingPartitions.set(newPartitions); + LOG.info( + "Async partition refresh completed for table {}.", + table.name()); + } catch (Exception e) { + LOG.error( + "Async partition refresh failed for table {}.", + table.name(), + e); + partitionRefreshException.set(e); + if (newPath != null) { + FileIOUtils.deleteDirectoryQuietly(newPath); + } + } + }); + } + + @Nullable + @Override + public LookupTable checkPartitionRefreshCompletion() throws Exception { + if (!partitionRefreshAsync) { + return null; + } + + Exception asyncException = partitionRefreshException.getAndSet(null); + if (asyncException != null) { + LOG.warn( + "Async partition refresh had failed for table {}, " + + "will retry on next partition change. Continuing with old partitions.", + table.name()); + } + + LookupTable newTable = pendingLookupTable.getAndSet(null); + if (newTable == null) { + return null; + } + + pendingPath.getAndSet(null); + List newPartitions = pendingPartitions.getAndSet(null); + + // set activePartitions on the new table since it will replace the current one + if (newPartitions != null && newTable instanceof FullCacheLookupTable) { + ((FullCacheLookupTable) newTable).currentActivePartitions = newPartitions; + } + + LOG.info("Switched to new lookup table for table {} with new partitions.", table.name()); + return newTable; + } + + @Nullable + @Override + public List activePartitions() { + return currentActivePartitions; + } + + @Override + public void closePartitionRefresh() throws IOException { + if (partitionRefreshExecutor != null) { + partitionRefreshExecutor.shutdownNow(); + partitionRefreshExecutor = null; + } + + if (pendingLookupTable != null) { + LookupTable pending = pendingLookupTable.getAndSet(null); + if (pending != null) { + pending.close(); + } + } + if (pendingPath != null) { + File pending = pendingPath.getAndSet(null); + if (pending != null) { + FileIOUtils.deleteDirectoryQuietly(pending); + } + } + } + @Override public void close() throws IOException { try { @@ -414,5 +595,16 @@ public Context copy(int[] newProjection) { joinKey, requiredCachedBucketIds); } + + public Context copyWithNewPath(File newTempPath) { + return new Context( + table.wrapped(), + projection, + tablePredicate, + projectedPredicate, + newTempPath, + joinKey, + requiredCachedBucketIds); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index 3ca792d39d5d..a21956d13631 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.util.List; @@ -34,6 +35,17 @@ public interface LookupTable extends Closeable { void specifyPartitions(List scanPartitions, @Nullable Predicate partitionFilter); + /** + * Create a new LookupTable instance with the same configuration but a different temp path. The + * new table is not opened yet. + * + * @throws UnsupportedOperationException if the implementation does not support this operation + */ + default LookupTable copyWithNewPath(File newPath) { + throw new UnsupportedOperationException( + "copyWithNewPath is not supported by " + getClass().getSimpleName()); + } + void open() throws Exception; List get(InternalRow key) throws IOException; @@ -43,4 +55,51 @@ public interface LookupTable extends Closeable { void specifyCacheRowFilter(Filter filter); Long nextSnapshotId(); + + // ---- Partition refresh methods ---- + + /** Whether partition refresh is configured to run asynchronously. */ + default boolean isPartitionRefreshAsync() { + return false; + } + + /** + * Start a partition refresh (synchronous or asynchronous depending on configuration). + * + * @param newPartitions the new partitions to refresh to + * @param partitionFilter the partition filter for the new partitions + */ + default void startPartitionRefresh( + List newPartitions, @Nullable Predicate partitionFilter) throws Exception { + close(); + specifyPartitions(newPartitions, partitionFilter); + open(); + } + + /** + * Check if an async partition refresh has completed. If a new table is ready, this method + * returns it and the caller should replace its current lookup table reference. Returns {@code + * null} if no switch is needed. + * + *

For synchronous partition refresh, this always returns {@code null}. + */ + @Nullable + default LookupTable checkPartitionRefreshCompletion() throws Exception { + return null; + } + + /** + * Return the partitions that the current lookup table was loaded with. During async refresh, + * this may differ from the latest partitions detected by the partition loader. + * + * @return the active partitions, or {@code null} if partition refresh is not managed by this + * table + */ + @Nullable + default List activePartitions() { + return null; + } + + /** Close partition refresh resources (executor, pending tables, etc.). */ + default void closePartitionRefresh() throws IOException {} } From 1dcff458b2bc26dc2ec0add105e54f39fc462420 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 10:39:39 +0800 Subject: [PATCH 05/18] fix --- .../flink/lookup/FileStoreLookupFunction.java | 59 +++++++------------ .../apache/paimon/flink/LookupJoinITCase.java | 2 +- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 52fbc8dd3b7d..aa6d9b26b738 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -187,49 +187,19 @@ private void open() throws Exception { projectFields, joinKeys); - this.lookupTable = createLookupTable(path, projection); - - if (partitionLoader != null) { - partitionLoader.open(); - partitionLoader.checkRefresh(); - List partitions = partitionLoader.partitions(); - if (!partitions.isEmpty()) { - lookupTable.specifyPartitions( - partitions, partitionLoader.createSpecificPartFilter()); - } - } - - if (cacheRowFilter != null) { - lookupTable.specifyCacheRowFilter(cacheRowFilter); - } - lookupTable.open(); - } - - /** - * Create a new {@link LookupTable} instance. This method is used both during initial open and - * during async partition refresh to create a new LookupTable with a separate temp directory. - */ - private LookupTable createLookupTable(File tablePath, int[] projection) { - Options options = Options.fromMap(table.options()); - LookupTable newLookupTable = null; - LOG.info("Creating lookup table for {}.", table.name()); if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { if (isRemoteServiceAvailable(table)) { - newLookupTable = + this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable(table, projection, joinKeys); LOG.info( "Remote service is available. Created PrimaryKeyPartialLookupTable with remote service."); } else { try { - newLookupTable = + this.lookupTable = PrimaryKeyPartialLookupTable.createLocalTable( - table, - projection, - tablePath, - joinKeys, - getRequireCachedBucketIds()); + table, projection, path, joinKeys, getRequireCachedBucketIds()); LOG.info( "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor."); } catch (UnsupportedOperationException e) { @@ -241,21 +211,34 @@ private LookupTable createLookupTable(File tablePath, int[] projection) { } } - if (newLookupTable == null) { + if (lookupTable == null) { FullCacheLookupTable.Context context = new FullCacheLookupTable.Context( table, projection, predicate, createProjectedPredicate(projection), - tablePath, + path, joinKeys, getRequireCachedBucketIds()); - newLookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); - LOG.info("Created {}.", newLookupTable.getClass().getSimpleName()); + this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); + LOG.info("Created {}.", lookupTable.getClass().getSimpleName()); } - return newLookupTable; + if (partitionLoader != null) { + partitionLoader.open(); + partitionLoader.checkRefresh(); + List partitions = partitionLoader.partitions(); + if (!partitions.isEmpty()) { + lookupTable.specifyPartitions( + partitions, partitionLoader.createSpecificPartFilter()); + } + } + + if (cacheRowFilter != null) { + lookupTable.specifyCacheRowFilter(cacheRowFilter); + } + lookupTable.open(); } @Nullable diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 1a7bac65506f..d4eb7af85770 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1332,7 +1332,7 @@ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { Thread.sleep(1000); // wait for async refresh to complete // trigger a lookup to check async completion and switch to new partition sql("INSERT INTO T VALUES (1), (2)"); - iterator.collect(2); + result = iterator.collect(2); Thread.sleep(1000); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); From b948d2231c9cb4a1ed3b8ee16f816ddd50fd1d4f Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 10:45:24 +0800 Subject: [PATCH 06/18] fix --- .../flink/lookup/FileStoreLookupFunction.java | 13 ++++--- .../flink/lookup/FullCacheLookupTable.java | 37 +++++++++---------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index aa6d9b26b738..e07839941b88 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -18,10 +18,6 @@ package org.apache.paimon.flink.lookup; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.TableFunction; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -34,17 +30,24 @@ import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Preconditions; + +import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.Closeable; import java.io.File; import java.io.IOException; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 5394e7e7a185..3647f4f690fd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -189,7 +189,9 @@ protected void init() throws Exception { "%s-lookup-refresh", Thread.currentThread().getName()))) : null; - initPartitionRefresh(); + if (partitionRefreshAsync) { + initPartitionRefresh(); + } } private StateFactory createStateFactory() throws IOException { @@ -370,24 +372,21 @@ public boolean isPartitionRefreshAsync() { } private void initPartitionRefresh() { - if (partitionRefreshAsync) { - this.pendingLookupTable = new AtomicReference<>(null); - this.pendingPath = new AtomicReference<>(null); - this.pendingPartitions = new AtomicReference<>(null); - this.partitionRefreshException = new AtomicReference<>(null); - this.partitionRefreshFuture = null; - this.currentActivePartitions = - scanPartitions != null ? scanPartitions : Collections.emptyList(); - this.partitionRefreshExecutor = - Executors.newSingleThreadExecutor( - r -> { - Thread thread = new Thread(r); - thread.setName( - Thread.currentThread().getName() + "-partition-refresh"); - thread.setDaemon(true); - return thread; - }); - } + this.pendingLookupTable = new AtomicReference<>(null); + this.pendingPath = new AtomicReference<>(null); + this.pendingPartitions = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshFuture = null; + this.currentActivePartitions = + scanPartitions != null ? scanPartitions : Collections.emptyList(); + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread thread = new Thread(r); + thread.setName(Thread.currentThread().getName() + "-partition-refresh"); + thread.setDaemon(true); + return thread; + }); } @Override From cc509d123417d4fde6e6788a562a75b50688186d Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 12:05:34 +0800 Subject: [PATCH 07/18] fix --- .../flink/lookup/FileStoreLookupFunction.java | 22 ++--- .../flink/lookup/FullCacheLookupTable.java | 88 ++++++------------- .../paimon/flink/lookup/LookupTable.java | 29 +++--- 3 files changed, 45 insertions(+), 94 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index e07839941b88..b883a430a2be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -271,19 +271,16 @@ public Collection lookup(RowData keyRow) { if (partitionLoader == null) { return lookupInternal(key); } - - // use activePartitions from lookupTable if available (async mode tracks - // which partitions the current table was loaded with); otherwise fall back - // to partitionLoader.partitions() - List activePartitions = lookupTable.activePartitions(); - List currentPartitions = - activePartitions != null ? activePartitions : partitionLoader.partitions(); - if (currentPartitions.isEmpty()) { + List partitions = + lookupTable.scanPartitions() == null + ? partitionLoader.partitions() + : lookupTable.scanPartitions(); + if (partitions.isEmpty()) { return Collections.emptyList(); } List rows = new ArrayList<>(); - for (BinaryRow partition : currentPartitions) { + for (BinaryRow partition : partitions) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; @@ -330,9 +327,9 @@ void tryRefresh() throws Exception { if (switchedTable != null) { LookupTable oldTable = this.lookupTable; this.lookupTable = switchedTable; - if (switchedTable instanceof FullCacheLookupTable) { - this.path = ((FullCacheLookupTable) switchedTable).context.tempPath; - } + this.lookupTable.specifyPartitions( + partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); + this.path = ((FullCacheLookupTable) switchedTable).context.tempPath; // close old table and clean up old temp directory try { oldTable.close(); @@ -434,7 +431,6 @@ long nextBlacklistCheckTime() { @Override public void close() throws IOException { if (lookupTable != null) { - lookupTable.closePartitionRefresh(); lookupTable.close(); lookupTable = null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 3647f4f690fd..d513df5851a8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -102,10 +102,7 @@ public abstract class FullCacheLookupTable implements LookupTable { @Nullable private ExecutorService partitionRefreshExecutor; private volatile Future partitionRefreshFuture; private AtomicReference pendingLookupTable; - private AtomicReference pendingPath; - private AtomicReference> pendingPartitions; private AtomicReference partitionRefreshException; - private volatile List currentActivePartitions; public FullCacheLookupTable(Context context) { this.table = context.table; @@ -194,6 +191,19 @@ protected void init() throws Exception { } } + private void initPartitionRefresh() { + this.pendingLookupTable = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshFuture = null; + this.scanPartitions = scanPartitions != null ? scanPartitions : Collections.emptyList(); + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh-partition", + Thread.currentThread().getName()))); + } + private StateFactory createStateFactory() throws IOException { String diskDir = context.tempPath.toString(); Options options = context.table.coreOptions().toConfiguration(); @@ -366,34 +376,11 @@ public Predicate projectedPredicate() { // ---- Partition refresh implementation ---- - @Override - public boolean isPartitionRefreshAsync() { - return partitionRefreshAsync; - } - - private void initPartitionRefresh() { - this.pendingLookupTable = new AtomicReference<>(null); - this.pendingPath = new AtomicReference<>(null); - this.pendingPartitions = new AtomicReference<>(null); - this.partitionRefreshException = new AtomicReference<>(null); - this.partitionRefreshFuture = null; - this.currentActivePartitions = - scanPartitions != null ? scanPartitions : Collections.emptyList(); - this.partitionRefreshExecutor = - Executors.newSingleThreadExecutor( - r -> { - Thread thread = new Thread(r); - thread.setName(Thread.currentThread().getName() + "-partition-refresh"); - thread.setDaemon(true); - return thread; - }); - } - @Override public void startPartitionRefresh( List newPartitions, @Nullable Predicate partitionFilter) throws Exception { if (partitionRefreshAsync) { - startAsyncPartitionRefresh(newPartitions, partitionFilter); + asyncPartitionRefresh(newPartitions, partitionFilter); } else { syncPartitionRefresh(newPartitions, partitionFilter); } @@ -410,7 +397,7 @@ private void syncPartitionRefresh( LOG.info("Synchronous partition refresh completed for table {}.", table.name()); } - private void startAsyncPartitionRefresh( + private void asyncPartitionRefresh( List newPartitions, @Nullable Predicate partitionFilter) { if (partitionRefreshFuture != null && !partitionRefreshFuture.isDone()) { LOG.info( @@ -441,8 +428,6 @@ private void startAsyncPartitionRefresh( newTable.open(); pendingLookupTable.set(newTable); - pendingPath.set(newPath); - pendingPartitions.set(newPartitions); LOG.info( "Async partition refresh completed for table {}.", table.name()); @@ -479,43 +464,13 @@ public LookupTable checkPartitionRefreshCompletion() throws Exception { return null; } - pendingPath.getAndSet(null); - List newPartitions = pendingPartitions.getAndSet(null); - - // set activePartitions on the new table since it will replace the current one - if (newPartitions != null && newTable instanceof FullCacheLookupTable) { - ((FullCacheLookupTable) newTable).currentActivePartitions = newPartitions; - } - LOG.info("Switched to new lookup table for table {} with new partitions.", table.name()); return newTable; } - @Nullable - @Override - public List activePartitions() { - return currentActivePartitions; - } - @Override - public void closePartitionRefresh() throws IOException { - if (partitionRefreshExecutor != null) { - partitionRefreshExecutor.shutdownNow(); - partitionRefreshExecutor = null; - } - - if (pendingLookupTable != null) { - LookupTable pending = pendingLookupTable.getAndSet(null); - if (pending != null) { - pending.close(); - } - } - if (pendingPath != null) { - File pending = pendingPath.getAndSet(null); - if (pending != null) { - FileIOUtils.deleteDirectoryQuietly(pending); - } - } + public List scanPartitions() { + return scanPartitions; } @Override @@ -524,6 +479,15 @@ public void close() throws IOException { if (refreshExecutor != null) { ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor); } + if (partitionRefreshExecutor != null) { + ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, partitionRefreshExecutor); + } + if (pendingLookupTable != null) { + LookupTable pending = pendingLookupTable.getAndSet(null); + if (pending != null) { + pending.close(); + } + } } finally { stateFactory.close(); FileIOUtils.deleteDirectory(context.tempPath); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index a21956d13631..3395b24cb788 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -24,7 +24,6 @@ import org.apache.paimon.utils.Filter; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -35,17 +34,6 @@ public interface LookupTable extends Closeable { void specifyPartitions(List scanPartitions, @Nullable Predicate partitionFilter); - /** - * Create a new LookupTable instance with the same configuration but a different temp path. The - * new table is not opened yet. - * - * @throws UnsupportedOperationException if the implementation does not support this operation - */ - default LookupTable copyWithNewPath(File newPath) { - throw new UnsupportedOperationException( - "copyWithNewPath is not supported by " + getClass().getSimpleName()); - } - void open() throws Exception; List get(InternalRow key) throws IOException; @@ -58,9 +46,15 @@ default LookupTable copyWithNewPath(File newPath) { // ---- Partition refresh methods ---- - /** Whether partition refresh is configured to run asynchronously. */ - default boolean isPartitionRefreshAsync() { - return false; + /** + * Create a new LookupTable instance with the same configuration but a different temp path. The + * new table is not opened yet. + * + * @throws UnsupportedOperationException if the implementation does not support this operation + */ + default LookupTable copyWithNewPath(File newPath) { + throw new UnsupportedOperationException( + "copyWithNewPath is not supported by " + getClass().getSimpleName()); } /** @@ -96,10 +90,7 @@ default LookupTable checkPartitionRefreshCompletion() throws Exception { * table */ @Nullable - default List activePartitions() { + default List scanPartitions() { return null; } - - /** Close partition refresh resources (executor, pending tables, etc.). */ - default void closePartitionRefresh() throws IOException {} } From 462e0bb53435a8ee8b3fc2724bb72ddd4fcf79b8 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 13:19:01 +0800 Subject: [PATCH 08/18] fix --- .../flink/lookup/FileStoreLookupFunction.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index b883a430a2be..7d66656497ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -322,28 +322,24 @@ private void reopen() { @VisibleForTesting void tryRefresh() throws Exception { - // 0. check if async partition refresh has completed, and switch if so + // 1. check if this time is in black list + if (refreshBlacklist != null && !refreshBlacklist.canRefresh()) { + return; + } + + // 2. check if async partition refresh has completed, and switch if so LookupTable switchedTable = lookupTable.checkPartitionRefreshCompletion(); if (switchedTable != null) { LookupTable oldTable = this.lookupTable; - this.lookupTable = switchedTable; - this.lookupTable.specifyPartitions( + lookupTable = switchedTable; + lookupTable.specifyPartitions( partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); - this.path = ((FullCacheLookupTable) switchedTable).context.tempPath; - // close old table and clean up old temp directory - try { - oldTable.close(); - } catch (IOException e) { - LOG.warn("Failed to close old lookup table for table {}.", table.name(), e); - } - } - - // 1. check if this time is in black list - if (refreshBlacklist != null && !refreshBlacklist.canRefresh()) { - return; + path = ((FullCacheLookupTable) switchedTable).context.tempPath; + // close old table + oldTable.close(); } - // 2. refresh dynamic partition + // 3. refresh dynamic partition if (partitionLoader != null) { boolean partitionChanged = partitionLoader.checkRefresh(); List partitions = partitionLoader.partitions(); @@ -360,7 +356,7 @@ void tryRefresh() throws Exception { } } - // 3. refresh lookup table + // 4. refresh lookup table if (shouldRefreshLookupTable()) { // Check if we should do full load (close and reopen table) instead of incremental // refresh From 14b05e2f2a23f1859a98f79d55df8bc8923dd274 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 14:16:42 +0800 Subject: [PATCH 09/18] addTest --- .../flink/lookup/FileStoreLookupFunction.java | 2 - .../flink/lookup/FullCacheLookupTable.java | 24 +- .../paimon/flink/lookup/LookupTable.java | 1 + .../apache/paimon/flink/LookupJoinITCase.java | 328 +++++++++++++++++- 4 files changed, 340 insertions(+), 15 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 7d66656497ca..e15610e73e8d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -332,8 +332,6 @@ void tryRefresh() throws Exception { if (switchedTable != null) { LookupTable oldTable = this.lookupTable; lookupTable = switchedTable; - lookupTable.specifyPartitions( - partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); path = ((FullCacheLookupTable) switchedTable).context.tempPath; // close old table oldTable.close(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index d513df5851a8..becb9ba2670f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -46,10 +46,12 @@ import org.apache.paimon.utils.PartialRow; import org.apache.paimon.utils.TypeUtils; import org.apache.paimon.utils.UserDefinedSeqComparator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -148,17 +150,6 @@ public FullCacheLookupTable(Context context) { this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC); } - @Override - public LookupTable copyWithNewPath(File newPath) { - Context newContext = context.copyWithNewPath(newPath); - Options options = Options.fromMap(context.table.options()); - FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS)); - if (cacheRowFilter != null) { - newTable.specifyCacheRowFilter(cacheRowFilter); - } - return newTable; - } - @Override public void specifyPartitions( List scanPartitions, @Nullable Predicate partitionFilter) { @@ -444,6 +435,17 @@ private void asyncPartitionRefresh( }); } + @Override + public LookupTable copyWithNewPath(File newPath) { + Context newContext = context.copyWithNewPath(newPath); + Options options = Options.fromMap(context.table.options()); + FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS)); + if (cacheRowFilter != null) { + newTable.specifyCacheRowFilter(cacheRowFilter); + } + return newTable; + } + @Nullable @Override public LookupTable checkPartitionRefreshCompletion() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index 3395b24cb788..0002f93d80f0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -24,6 +24,7 @@ import org.apache.paimon.utils.Filter; import javax.annotation.Nullable; + import java.io.Closeable; import java.io.File; import java.io.IOException; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index d4eb7af85770..1355b5b416c1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1298,7 +1298,9 @@ public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { } @ParameterizedTest - @EnumSource(LookupCacheMode.class) + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { // This test verifies asynchronous partition refresh: // when max_pt() changes, the lookup table is refreshed in a background thread, @@ -1332,7 +1334,7 @@ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { Thread.sleep(1000); // wait for async refresh to complete // trigger a lookup to check async completion and switch to new partition sql("INSERT INTO T VALUES (1), (2)"); - result = iterator.collect(2); + iterator.collect(2); Thread.sleep(1000); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); @@ -1350,4 +1352,326 @@ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { iterator.close(); } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode) + throws Exception { + // Verify that during async refresh, queries still return old partition data + // until the new partition is fully loaded and switched. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + + // immediately query before async refresh completes — should still return old partition data + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // old partition data (100, 200) should still be served + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // now wait for async refresh to complete and trigger switch + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // after switch, new partition data should be returned + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMode mode) + throws Exception { + // Verify that incremental data updates in the old partition are visible + // before async partition switch happens. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // update data in the current partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)"); + Thread.sleep(2000); // wait for incremental refresh + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // now insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithMultiPartitionKeys(LookupCacheMode mode) + throws Exception { + // Verify async partition refresh works correctly with multi-level partition keys. + sql( + "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt1`, `pt2`) WITH (" + + "'bucket' = '1', " + + "'scan.partitions' = 'pt1=max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2024', 1, 1, 100), ('2024', 1, 2, 200), " + + "('2024', 2, 1, 300), ('2024', 2, 2, 400)"); + + String query = + "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2024", 1, 1, 100), + Row.of("2024", 1, 2, 200), + Row.of("2024", 2, 1, 300), + Row.of("2024", 2, 2, 400)); + + // insert new max partition '2025' with sub-partitions + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), " + + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2025", 1, 1, 1000), + Row.of("2025", 1, 2, 2000), + Row.of("2025", 2, 1, 3000), + Row.of("2025", 2, 2, 4000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws Exception { + // Verify async partition refresh works correctly when a new max partition + // is created via INSERT OVERWRITE. + sql( + "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // overwrite current max partition with new data + sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // overwrite to create a new max partition + sql( + "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2), (3)"); + iterator.collect(3); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2), (3)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), Row.of(3, 3000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithMaxTwoPt(LookupCacheMode mode) throws Exception { + // Verify async partition refresh works correctly with max_two_pt() strategy. + sql( + "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_two_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partitions '1' and '2' + sql( + "INSERT INTO TWO_PT_DIM VALUES " + + "('1', 1, 100), ('1', 2, 200), " + + "('2', 1, 300), ('2', 2, 400)"); + + String query = + "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("1", 1, 100), + Row.of("1", 2, 200), + Row.of("2", 1, 300), + Row.of("2", 2, 400)); + + // insert new partition '3', now max_two_pt should be '2' and '3' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + // should now see data from partitions '2' and '3' + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2", 1, 300), + Row.of("2", 2, 400), + Row.of("3", 1, 1000), + Row.of("3", 2, 2000)); + + // insert another partition '4', max_two_pt should be '3' and '4' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("3", 1, 1000), + Row.of("3", 2, 2000), + Row.of("4", 1, 10000), + Row.of("4", 2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws Exception { + // Verify async partition refresh works correctly with non-primary-key append tables. + sql( + "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(3); + // non-pk table may return multiple matches + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)"); + Thread.sleep(2000); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(3); + Thread.sleep(1000); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), Row.of(2, 2000)); + + iterator.close(); + } } From 4af58745351217f5092e2cf92590c65a73b36a24 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:23:32 +0800 Subject: [PATCH 10/18] fix --- .../paimon/flink/lookup/FileStoreLookupFunction.java | 4 +--- .../apache/paimon/flink/lookup/FullCacheLookupTable.java | 9 +++++---- .../java/org/apache/paimon/flink/lookup/LookupTable.java | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index e15610e73e8d..ce5068345fc9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -330,11 +330,9 @@ void tryRefresh() throws Exception { // 2. check if async partition refresh has completed, and switch if so LookupTable switchedTable = lookupTable.checkPartitionRefreshCompletion(); if (switchedTable != null) { - LookupTable oldTable = this.lookupTable; + close(); lookupTable = switchedTable; path = ((FullCacheLookupTable) switchedTable).context.tempPath; - // close old table - oldTable.close(); } // 3. refresh dynamic partition diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index becb9ba2670f..7b1ffbe026cb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -455,10 +455,11 @@ public LookupTable checkPartitionRefreshCompletion() throws Exception { Exception asyncException = partitionRefreshException.getAndSet(null); if (asyncException != null) { - LOG.warn( - "Async partition refresh had failed for table {}, " - + "will retry on next partition change. Continuing with old partitions.", - table.name()); + LOG.error( + "Async partition refresh failed for table {}, " + "will stop running.", + table.name(), + asyncException); + throw asyncException; } LookupTable newTable = pendingLookupTable.getAndSet(null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index 0002f93d80f0..fe51620df58e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -59,7 +59,7 @@ default LookupTable copyWithNewPath(File newPath) { } /** - * Start a partition refresh (synchronous or asynchronous depending on configuration). + * Start refresh partition refresh. * * @param newPartitions the new partitions to refresh to * @param partitionFilter the partition filter for the new partitions From 4d14a96c446fc7ddadee6a6f955d91e59b4a577b Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:27:58 +0800 Subject: [PATCH 11/18] fix --- .../apache/paimon/flink/lookup/FullCacheLookupTable.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 7b1ffbe026cb..e9efd4ed7f2c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -390,13 +390,6 @@ private void syncPartitionRefresh( private void asyncPartitionRefresh( List newPartitions, @Nullable Predicate partitionFilter) { - if (partitionRefreshFuture != null && !partitionRefreshFuture.isDone()) { - LOG.info( - "Async partition refresh is already in progress for table {}, " - + "skipping new partition change.", - table.name()); - return; - } LOG.info( "Starting async partition refresh for table {}, new partitions detected.", From 18c998861f8fad10423dd01c781a8c923ab3dc22 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:33:14 +0800 Subject: [PATCH 12/18] fix --- .../org/apache/paimon/flink/lookup/FullCacheLookupTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index e9efd4ed7f2c..09ddecd96c28 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -430,7 +430,7 @@ private void asyncPartitionRefresh( @Override public LookupTable copyWithNewPath(File newPath) { - Context newContext = context.copyWithNewPath(newPath); + Context newContext = context.copy(newPath); Options options = Options.fromMap(context.table.options()); FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS)); if (cacheRowFilter != null) { @@ -555,7 +555,7 @@ public Context copy(int[] newProjection) { requiredCachedBucketIds); } - public Context copyWithNewPath(File newTempPath) { + public Context copy(File newTempPath) { return new Context( table.wrapped(), projection, From 27414301131df5a335f16571bf23e3f35b625457 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:39:37 +0800 Subject: [PATCH 13/18] remove --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 4 ++-- paimon-python/pypaimon/write/commit/commit_rollback.py | 8 -------- paimon-python/pypaimon/write/file_store_commit.py | 2 +- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 0ad952f7bb0b..c4b64445f701 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -126,10 +126,10 @@ def _initialize_oss_fs(self, path) -> FileSystem: if self._pyarrow_gte_7: client_kwargs['force_virtual_addressing'] = True - client_kwargs['endpoint_override'] = "oss-cn-hangzhou.aliyuncs.com" + client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) else: client_kwargs['endpoint_override'] = (self._oss_bucket + "." + - "oss-cn-hangzhou.aliyuncs.com") + self.properties.get(OssOptions.OSS_ENDPOINT)) retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) diff --git a/paimon-python/pypaimon/write/commit/commit_rollback.py b/paimon-python/pypaimon/write/commit/commit_rollback.py index b8c706f35e09..66106efae7bd 100644 --- a/paimon-python/pypaimon/write/commit/commit_rollback.py +++ b/paimon-python/pypaimon/write/commit/commit_rollback.py @@ -20,12 +20,8 @@ Commit rollback to rollback 'COMPACT' commits for resolving conflicts. """ -import logging - from pypaimon.table.instant import Instant -logger = logging.getLogger(__name__) - class CommitRollback: """Rollback COMPACT commits to resolve conflicts. @@ -55,15 +51,11 @@ def try_to_rollback(self, latest_snapshot): Returns: True if rollback succeeded, False otherwise. """ - logger.warning("Try rollback...") if latest_snapshot.commit_kind == "COMPACT": latest_id = latest_snapshot.id try: self._table_rollback.rollback_to( Instant.snapshot(latest_id - 1), latest_id) - logger.warning( - "Successfully rolled back COMPACT snapshot %d to %d", - latest_id, latest_id - 1) return True except Exception: pass diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 164f2ee51f72..ee7d7a9694aa 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -314,7 +314,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str start_millis = int(time.time() * 1000) if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() - logger.warning("hello commit...") + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" From 8d6376e3cce780a8350bc56b4d2e99e644485a0c Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 15:44:57 +0800 Subject: [PATCH 14/18] fix --- .../main/java/org/apache/paimon/flink/lookup/LookupTable.java | 2 +- .../src/test/java/org/apache/paimon/flink/LookupJoinITCase.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index fe51620df58e..3a4117561e09 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -59,7 +59,7 @@ default LookupTable copyWithNewPath(File newPath) { } /** - * Start refresh partition refresh. + * Start refresh partition. * * @param newPartitions the new partitions to refresh to * @param partitionFilter the partition filter for the new partitions diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 1355b5b416c1..e17b22cdbcea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1640,7 +1640,6 @@ public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws sql( "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)" + "PARTITIONED BY (`pt`) WITH (" - + "'bucket' = '1', " + "'lookup.dynamic-partition' = 'max_pt()', " + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + "'lookup.dynamic-partition.refresh.async' = 'true', " From 6e95af40c503df12316fcb34e6cf0ecb5cc4ef77 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 16:01:05 +0800 Subject: [PATCH 15/18] rm --- .../flink/lookup/FullCacheLookupTable.java | 58 ++++++++----------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 09ddecd96c28..48bcb9418c9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -102,7 +102,6 @@ public abstract class FullCacheLookupTable implements LookupTable { // ---- Partition refresh fields ---- private final boolean partitionRefreshAsync; @Nullable private ExecutorService partitionRefreshExecutor; - private volatile Future partitionRefreshFuture; private AtomicReference pendingLookupTable; private AtomicReference partitionRefreshException; @@ -185,7 +184,6 @@ protected void init() throws Exception { private void initPartitionRefresh() { this.pendingLookupTable = new AtomicReference<>(null); this.partitionRefreshException = new AtomicReference<>(null); - this.partitionRefreshFuture = null; this.scanPartitions = scanPartitions != null ? scanPartitions : Collections.emptyList(); this.partitionRefreshExecutor = Executors.newSingleThreadExecutor( @@ -395,37 +393,31 @@ private void asyncPartitionRefresh( "Starting async partition refresh for table {}, new partitions detected.", table.name()); - partitionRefreshFuture = - partitionRefreshExecutor.submit( - () -> { - File newPath = null; - try { - newPath = - new File( - context.tempPath.getParent(), - "lookup-" + java.util.UUID.randomUUID()); - if (!newPath.mkdirs()) { - throw new RuntimeException("Failed to create dir: " + newPath); - } - LookupTable newTable = copyWithNewPath(newPath); - newTable.specifyPartitions(newPartitions, partitionFilter); - newTable.open(); - - pendingLookupTable.set(newTable); - LOG.info( - "Async partition refresh completed for table {}.", - table.name()); - } catch (Exception e) { - LOG.error( - "Async partition refresh failed for table {}.", - table.name(), - e); - partitionRefreshException.set(e); - if (newPath != null) { - FileIOUtils.deleteDirectoryQuietly(newPath); - } - } - }); + partitionRefreshExecutor.submit( + () -> { + File newPath = null; + try { + newPath = + new File( + context.tempPath.getParent(), + "lookup-" + java.util.UUID.randomUUID()); + if (!newPath.mkdirs()) { + throw new RuntimeException("Failed to create dir: " + newPath); + } + LookupTable newTable = copyWithNewPath(newPath); + newTable.specifyPartitions(newPartitions, partitionFilter); + newTable.open(); + + pendingLookupTable.set(newTable); + LOG.info("Async partition refresh completed for table {}.", table.name()); + } catch (Exception e) { + LOG.error("Async partition refresh failed for table {}.", table.name(), e); + partitionRefreshException.set(e); + if (newPath != null) { + FileIOUtils.deleteDirectoryQuietly(newPath); + } + } + }); } @Override From 5a14e6d0a0e8f52b846bd8c5b73c9c0ba046980a Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 11 Mar 2026 18:20:16 +0800 Subject: [PATCH 16/18] addDoc --- .../generated/flink_connector_configuration.html | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 1003aff77cfb..9cde0eefe38a 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -80,6 +80,12 @@ Duration Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition. + +

lookup.dynamic-partition.refresh.async
+ false + Boolean + Whether to refresh dynamic partition lookup table asynchronously. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded. +
lookup.refresh.async
false @@ -357,4 +363,4 @@ Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. - + \ No newline at end of file From 4c10cc523e31ef2859e64186d350ef10988320b1 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 12 Mar 2026 12:40:53 +0800 Subject: [PATCH 17/18] change timeout-minutes to 120 --- .../workflows/utitcase-flink-1.x-common.yml | 2 +- .../apache/paimon/flink/LookupJoinITCase.java | 74 +++++++------------ 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/.github/workflows/utitcase-flink-1.x-common.yml b/.github/workflows/utitcase-flink-1.x-common.yml index 6825d8e7a369..f22b56014fe2 100644 --- a/.github/workflows/utitcase-flink-1.x-common.yml +++ b/.github/workflows/utitcase-flink-1.x-common.yml @@ -38,7 +38,7 @@ concurrency: jobs: build_test: runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 100 steps: - name: Checkout code diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index e17b22cdbcea..0e1b87276ab7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1252,7 +1252,9 @@ public void testFallbackCacheMode() throws Exception { } @ParameterizedTest - @EnumSource(LookupCacheMode.class) + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL"}) public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { // This test verifies synchronous partition refresh (default mode): // when max_pt() changes, the lookup table is refreshed synchronously, @@ -1282,18 +1284,10 @@ public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { // insert data into a new partition '2', which will trigger sync partition refresh sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); - Thread.sleep(2000); // wait for partition refresh interval to trigger sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); - // insert another new partition '3' and verify switch again - sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); - Thread.sleep(2000); // wait for partition refresh interval to trigger - sql("INSERT INTO T VALUES (1), (2)"); - result = iterator.collect(2); - assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); - iterator.close(); } @@ -1331,21 +1325,21 @@ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { // insert data into a new partition '2', which will trigger async partition refresh sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); - Thread.sleep(1000); // wait for async refresh to complete + Thread.sleep(500); // wait for async refresh to complete // trigger a lookup to check async completion and switch to new partition sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(2); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); // insert another new partition '3' and verify switch again sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); - Thread.sleep(1000); // wait for async refresh to complete + Thread.sleep(500); // wait for async refresh to complete sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(2); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); @@ -1393,7 +1387,7 @@ public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); // now wait for async refresh to complete and trigger switch - Thread.sleep(2000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); // after switch, new partition data should be returned @@ -1434,7 +1428,7 @@ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMod // update data in the current partition '1' sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)"); - Thread.sleep(2000); // wait for incremental refresh + Thread.sleep(500); // wait for incremental refresh sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); @@ -1443,7 +1437,7 @@ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMod sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(2); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); @@ -1451,11 +1445,8 @@ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMod iterator.close(); } - @ParameterizedTest - @EnumSource( - value = LookupCacheMode.class, - names = {"FULL", "MEMORY"}) - public void testAsyncPartitionRefreshWithMultiPartitionKeys(LookupCacheMode mode) + @Test + public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception { // Verify async partition refresh works correctly with multi-level partition keys. sql( @@ -1467,7 +1458,7 @@ public void testAsyncPartitionRefreshWithMultiPartitionKeys(LookupCacheMode mode + "'lookup.dynamic-partition.refresh.async' = 'true', " + "'lookup.cache' = '%s', " + "'continuous.discovery-interval'='1 ms')", - mode); + LookupCacheMode.FULL); sql( "INSERT INTO PARTITIONED_DIM VALUES " @@ -1493,10 +1484,10 @@ public void testAsyncPartitionRefreshWithMultiPartitionKeys(LookupCacheMode mode "INSERT INTO PARTITIONED_DIM VALUES " + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), " + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)"); - Thread.sleep(2000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(4); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(4); assertThat(result) @@ -1509,11 +1500,8 @@ public void testAsyncPartitionRefreshWithMultiPartitionKeys(LookupCacheMode mode iterator.close(); } - @ParameterizedTest - @EnumSource( - value = LookupCacheMode.class, - names = {"FULL", "MEMORY"}) - public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws Exception { + @Test + public void testAsyncPartitionRefreshWithOverwrite() throws Exception { // Verify async partition refresh works correctly when a new max partition // is created via INSERT OVERWRITE. sql( @@ -1525,7 +1513,7 @@ public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws + "'lookup.dynamic-partition.refresh.async' = 'true', " + "'lookup.cache' = '%s', " + "'continuous.discovery-interval'='1 ms')", - mode); + LookupCacheMode.FULL); sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)"); @@ -1540,7 +1528,7 @@ public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws // overwrite current max partition with new data sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)"); - Thread.sleep(2000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(2); assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); @@ -1548,10 +1536,10 @@ public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws // overwrite to create a new max partition sql( "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)"); - Thread.sleep(2000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2), (3)"); iterator.collect(3); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2), (3)"); result = iterator.collect(3); assertThat(result) @@ -1560,11 +1548,8 @@ public void testAsyncPartitionRefreshWithOverwrite(LookupCacheMode mode) throws iterator.close(); } - @ParameterizedTest - @EnumSource( - value = LookupCacheMode.class, - names = {"FULL", "MEMORY"}) - public void testAsyncPartitionRefreshWithMaxTwoPt(LookupCacheMode mode) throws Exception { + @Test + public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception { // Verify async partition refresh works correctly with max_two_pt() strategy. sql( "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" @@ -1575,7 +1560,7 @@ public void testAsyncPartitionRefreshWithMaxTwoPt(LookupCacheMode mode) throws E + "'lookup.dynamic-partition.refresh.async' = 'true', " + "'lookup.cache' = '%s', " + "'continuous.discovery-interval'='1 ms')", - mode); + LookupCacheMode.FULL); // insert data into partitions '1' and '2' sql( @@ -1599,10 +1584,9 @@ public void testAsyncPartitionRefreshWithMaxTwoPt(LookupCacheMode mode) throws E // insert new partition '3', now max_two_pt should be '2' and '3' sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)"); - Thread.sleep(2000); sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(4); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(4); // should now see data from partitions '2' and '3' @@ -1615,10 +1599,9 @@ public void testAsyncPartitionRefreshWithMaxTwoPt(LookupCacheMode mode) throws E // insert another partition '4', max_two_pt should be '3' and '4' sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)"); - Thread.sleep(2000); sql("INSERT INTO T VALUES (1), (2)"); - result = iterator.collect(4); - Thread.sleep(1000); + iterator.collect(4); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(4); assertThat(result) @@ -1662,10 +1645,9 @@ public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws // insert new partition '2' to trigger async refresh sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)"); - Thread.sleep(2000); sql("INSERT INTO T VALUES (1), (2)"); iterator.collect(3); - Thread.sleep(1000); + Thread.sleep(500); sql("INSERT INTO T VALUES (1), (2)"); result = iterator.collect(3); assertThat(result) From 6eee09b361114f52bb91ce28538e092ad05fd9a8 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 12 Mar 2026 13:17:28 +0800 Subject: [PATCH 18/18] fix --- .../test/java/org/apache/paimon/flink/LookupJoinITCase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 0e1b87276ab7..bda4bcc567d0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1446,8 +1446,7 @@ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMod } @Test - public void testAsyncPartitionRefreshWithMultiPartitionKeys() - throws Exception { + public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception { // Verify async partition refresh works correctly with multi-level partition keys. sql( "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)"