Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/utitcase-flink-1.x-common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ concurrency:
jobs:
build_test:
runs-on: ubuntu-latest
timeout-minutes: 60
timeout-minutes: 100

steps:
- name: Checkout code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<td>Duration</td>
<td>Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition.</td>
</tr>
<tr>
<td><h5>lookup.dynamic-partition.refresh.async</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to refresh dynamic partition lookup table asynchronously. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded.</td>
</tr>
<tr>
<td><h5>lookup.refresh.async</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -357,4 +363,4 @@
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to refresh lookup table in an async thread.");

public static final ConfigOption<Boolean> LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC =
ConfigOptions.key("lookup.dynamic-partition.refresh.async")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to refresh dynamic partition lookup table asynchronously. "
+ "When enabled, partition changes will be loaded in a background thread "
+ "while the old partition data continues serving queries. "
+ "When disabled (default), partition refresh is synchronous and blocks queries "
+ "until the new partition data is fully loaded.");

public static final ConfigOption<Integer> LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,16 @@ public Collection<RowData> lookup(RowData keyRow) {
if (partitionLoader == null) {
return lookupInternal(key);
}

if (partitionLoader.partitions().isEmpty()) {
List<BinaryRow> partitions =
lookupTable.scanPartitions() == null
? partitionLoader.partitions()
: lookupTable.scanPartitions();
if (partitions.isEmpty()) {
return Collections.emptyList();
}

List<RowData> rows = new ArrayList<>();
for (BinaryRow partition : partitionLoader.partitions()) {
for (BinaryRow partition : partitions) {
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
Expand Down Expand Up @@ -324,7 +327,15 @@ void tryRefresh() throws Exception {
return;
}

// 2. refresh dynamic partition
// 2. check if async partition refresh has completed, and switch if so
LookupTable switchedTable = lookupTable.checkPartitionRefreshCompletion();
if (switchedTable != null) {
close();
lookupTable = switchedTable;
path = ((FullCacheLookupTable) switchedTable).context.tempPath;
}

// 3. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
Expand All @@ -334,18 +345,14 @@ void tryRefresh() throws Exception {
}

if (partitionChanged) {
// reopen with latest partition
lookupTable.specifyPartitions(
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
lookupTable.startPartitionRefresh(
partitions, partitionLoader.createSpecificPartFilter());
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
// no need to refresh the lookup table because it is reopened
return;
}
}

// 3. refresh lookup table
// 4. refresh lookup table
if (shouldRefreshLookupTable()) {
// Check if we should do full load (close and reopen table) instead of incremental
// refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -68,9 +69,11 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;
import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;

/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
Expand All @@ -96,6 +99,12 @@ public abstract class FullCacheLookupTable implements LookupTable {
@Nullable private Predicate partitionFilter;
@Nullable private Filter<InternalRow> cacheRowFilter;

// ---- Partition refresh fields ----
private final boolean partitionRefreshAsync;
@Nullable private ExecutorService partitionRefreshExecutor;
private AtomicReference<LookupTable> pendingLookupTable;
private AtomicReference<Exception> partitionRefreshException;

public FullCacheLookupTable(Context context) {
this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
Expand Down Expand Up @@ -137,6 +146,7 @@ public FullCacheLookupTable(Context context) {
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
this.cachedException = new AtomicReference<>();
this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
this.partitionRefreshAsync = options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC);
}

@Override
Expand Down Expand Up @@ -166,6 +176,21 @@ protected void init() throws Exception {
"%s-lookup-refresh",
Thread.currentThread().getName())))
: null;
if (partitionRefreshAsync) {
initPartitionRefresh();
}
}

private void initPartitionRefresh() {
this.pendingLookupTable = new AtomicReference<>(null);
this.partitionRefreshException = new AtomicReference<>(null);
this.scanPartitions = scanPartitions != null ? scanPartitions : Collections.emptyList();
this.partitionRefreshExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
String.format(
"%s-lookup-refresh-partition",
Thread.currentThread().getName())));
}

private StateFactory createStateFactory() throws IOException {
Expand Down Expand Up @@ -338,12 +363,119 @@ public Predicate projectedPredicate() {

public abstract TableBulkLoader createBulkLoader();

// ---- Partition refresh implementation ----

@Override
public void startPartitionRefresh(
List<BinaryRow> newPartitions, @Nullable Predicate partitionFilter) throws Exception {
if (partitionRefreshAsync) {
asyncPartitionRefresh(newPartitions, partitionFilter);
} else {
syncPartitionRefresh(newPartitions, partitionFilter);
}
}

private void syncPartitionRefresh(
List<BinaryRow> newPartitions, @Nullable Predicate partitionFilter) throws Exception {
LOG.info(
"Synchronously refreshing partition for table {}, new partitions detected.",
table.name());
close();
specifyPartitions(newPartitions, partitionFilter);
open();
LOG.info("Synchronous partition refresh completed for table {}.", table.name());
}

private void asyncPartitionRefresh(
List<BinaryRow> newPartitions, @Nullable Predicate partitionFilter) {

LOG.info(
"Starting async partition refresh for table {}, new partitions detected.",
table.name());

partitionRefreshExecutor.submit(
() -> {
File newPath = null;
try {
newPath =
new File(
context.tempPath.getParent(),
"lookup-" + java.util.UUID.randomUUID());
if (!newPath.mkdirs()) {
throw new RuntimeException("Failed to create dir: " + newPath);
}
LookupTable newTable = copyWithNewPath(newPath);
newTable.specifyPartitions(newPartitions, partitionFilter);
newTable.open();

pendingLookupTable.set(newTable);
LOG.info("Async partition refresh completed for table {}.", table.name());
} catch (Exception e) {
LOG.error("Async partition refresh failed for table {}.", table.name(), e);
partitionRefreshException.set(e);
if (newPath != null) {
FileIOUtils.deleteDirectoryQuietly(newPath);
}
}
});
}

@Override
public LookupTable copyWithNewPath(File newPath) {
Context newContext = context.copy(newPath);
Options options = Options.fromMap(context.table.options());
FullCacheLookupTable newTable = create(newContext, options.get(LOOKUP_CACHE_ROWS));
if (cacheRowFilter != null) {
newTable.specifyCacheRowFilter(cacheRowFilter);
}
return newTable;
}

@Nullable
@Override
public LookupTable checkPartitionRefreshCompletion() throws Exception {
if (!partitionRefreshAsync) {
return null;
}

Exception asyncException = partitionRefreshException.getAndSet(null);
if (asyncException != null) {
LOG.error(
"Async partition refresh failed for table {}, " + "will stop running.",
table.name(),
asyncException);
throw asyncException;
}

LookupTable newTable = pendingLookupTable.getAndSet(null);
if (newTable == null) {
return null;
}

LOG.info("Switched to new lookup table for table {} with new partitions.", table.name());
return newTable;
}

@Override
public List<BinaryRow> scanPartitions() {
return scanPartitions;
}

@Override
public void close() throws IOException {
try {
if (refreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor);
}
if (partitionRefreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, partitionRefreshExecutor);
}
if (pendingLookupTable != null) {
LookupTable pending = pendingLookupTable.getAndSet(null);
if (pending != null) {
pending.close();
}
}
} finally {
stateFactory.close();
FileIOUtils.deleteDirectory(context.tempPath);
Expand Down Expand Up @@ -414,5 +546,16 @@ public Context copy(int[] newProjection) {
joinKey,
requiredCachedBucketIds);
}

public Context copy(File newTempPath) {
return new Context(
table.wrapped(),
projection,
tablePredicate,
projectedPredicate,
newTempPath,
joinKey,
requiredCachedBucketIds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;

Expand All @@ -43,4 +44,54 @@ public interface LookupTable extends Closeable {
void specifyCacheRowFilter(Filter<InternalRow> filter);

Long nextSnapshotId();

// ---- Partition refresh methods ----

/**
* Create a new LookupTable instance with the same configuration but a different temp path. The
* new table is not opened yet.
*
* @throws UnsupportedOperationException if the implementation does not support this operation
*/
default LookupTable copyWithNewPath(File newPath) {
throw new UnsupportedOperationException(
"copyWithNewPath is not supported by " + getClass().getSimpleName());
}

/**
* Start refresh partition.
*
* @param newPartitions the new partitions to refresh to
* @param partitionFilter the partition filter for the new partitions
*/
default void startPartitionRefresh(
List<BinaryRow> newPartitions, @Nullable Predicate partitionFilter) throws Exception {
close();
specifyPartitions(newPartitions, partitionFilter);
open();
}

/**
* Check if an async partition refresh has completed. If a new table is ready, this method
* returns it and the caller should replace its current lookup table reference. Returns {@code
* null} if no switch is needed.
*
* <p>For synchronous partition refresh, this always returns {@code null}.
*/
@Nullable
default LookupTable checkPartitionRefreshCompletion() throws Exception {
return null;
}

/**
* Return the partitions that the current lookup table was loaded with. During async refresh,
* this may differ from the latest partitions detected by the partition loader.
*
* @return the active partitions, or {@code null} if partition refresh is not managed by this
* table
*/
@Nullable
default List<BinaryRow> scanPartitions() {
return null;
}
}
Loading
Loading