Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public final class StreamingDataflowWorker {
private final ComputationConfig.Fetcher configFetcher;
private final ComputationStateCache computationStateCache;
private final BoundedQueueExecutor workUnitExecutor;
private final ScheduledExecutorService commitFinalizerCleanupExecutor;
private final AtomicReference<StreamingWorkerHarness> streamingWorkerHarness =
new AtomicReference<>();
private final AtomicBoolean running = new AtomicBoolean();
Expand All @@ -208,6 +209,7 @@ private StreamingDataflowWorker(
ComputationStateCache computationStateCache,
WindmillStateCache windmillStateCache,
BoundedQueueExecutor workUnitExecutor,
ScheduledExecutorService commitFinalizerCleanupExecutor,
DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
DataflowWorkerHarnessOptions options,
HotKeyLogger hotKeyLogger,
Expand All @@ -232,6 +234,7 @@ private StreamingDataflowWorker(
Executors.newCachedThreadPool());
this.options = options;
this.workUnitExecutor = workUnitExecutor;
this.commitFinalizerCleanupExecutor = commitFinalizerCleanupExecutor;
this.harnessSwitchExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("HarnessSwitchExecutor").build());
Expand All @@ -252,6 +255,7 @@ private StreamingDataflowWorker(
readerCache,
mapTaskExecutorFactory,
workUnitExecutor,
commitFinalizerCleanupExecutor,
this.stateCache::forComputation,
failureTracker,
workFailureProcessor,
Expand Down Expand Up @@ -618,6 +622,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
ScheduledExecutorService commitFinalizerCleanupExecutor =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat("FinalizationCallbackCleanup-%d")
.setDaemon(true)
.build());
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
Expand Down Expand Up @@ -682,6 +693,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
computationStateCache,
windmillStateCache,
workExecutor,
commitFinalizerCleanupExecutor,
IntrinsicMapTaskExecutorFactory.defaultFactory(),
options,
new HotKeyLogger(),
Expand Down Expand Up @@ -844,6 +856,13 @@ static StreamingDataflowWorker forTesting(
WindmillStubFactoryFactory stubFactory) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
ScheduledExecutorService commitFinalizerCleanupExecutor =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat("FinalizationCallbackCleanup-%d")
.setDaemon(true)
.build());
WindmillStateCache stateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
Expand Down Expand Up @@ -932,6 +951,7 @@ static StreamingDataflowWorker forTesting(
computationStateCache,
stateCache,
workExecutor,
commitFinalizerCleanupExecutor,
mapTaskExecutorFactory,
options,
hotKeyLogger,
Expand Down Expand Up @@ -1123,6 +1143,7 @@ void stop() {
streamingWorkerHarness.get().shutdown();
memoryMonitor.shutdown();
workUnitExecutor.shutdown();
commitFinalizerCleanupExecutor.shutdown();
computationStateCache.closeAndInvalidateAll();
workerStatusReporter.stop();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -56,99 +53,82 @@ public abstract static class FinalizationInfo {

public abstract Runnable getCallback();

public static FinalizationInfo create(Long id, Instant expiryTime, Runnable callback) {
return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, expiryTime, callback);
public abstract ScheduledFuture<?> getCleanupFuture();

public static FinalizationInfo create(
Long id, Instant expiryTime, Runnable callback, ScheduledFuture<?> cleanupFuture) {
return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(
id, expiryTime, callback, cleanupFuture);
}
}

private final ReentrantLock lock = new ReentrantLock();
private final Condition queueMinChanged = lock.newCondition();

@GuardedBy("lock")
private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks = new HashMap<>();

@GuardedBy("lock")
private final PriorityQueue<FinalizationInfo> cleanUpQueue =
new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime));

@GuardedBy("lock")
private boolean cleanUpThreadStarted = false;

private final BoundedQueueExecutor finalizationExecutor;

private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) {
finalizationExecutor = finalizationCleanupExecutor;
}
// The cleanup threads run in their own Executor, so they don't block processing.
private final ScheduledExecutorService cleanupExecutor;

private void cleanupThreadBody() {
lock.lock();
try {
while (true) {
final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
if (minValue == null) {
// Wait for an element to be added and loop to re-examine the min.
queueMinChanged.await();
continue;
}

Instant now = Instant.now();
Duration timeDifference = new Duration(now, minValue.getExpiryTime());
if (timeDifference.getMillis() < 0
|| (queueMinChanged.await(timeDifference.getMillis(), TimeUnit.MILLISECONDS)
&& cleanUpQueue.peek() == minValue)) {
// The minimum element has an expiry time before now, either because it had elapsed when
// we pulled it or because we awaited it, and it is still the minimum.
checkState(minValue == cleanUpQueue.poll());
checkState(commitFinalizationCallbacks.remove(minValue.getId()) == minValue);
}
}
} catch (InterruptedException e) {
// We're being shutdown.
} finally {
lock.unlock();
}
private StreamingCommitFinalizer(
BoundedQueueExecutor finalizationExecutor, ScheduledExecutorService cleanupExecutor) {
this.finalizationExecutor = finalizationExecutor;
this.cleanupExecutor = cleanupExecutor;
}

static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
return new StreamingCommitFinalizer(workExecutor);
static StreamingCommitFinalizer create(
BoundedQueueExecutor workExecutor, ScheduledExecutorService cleanupExecutor) {
return new StreamingCommitFinalizer(workExecutor, cleanupExecutor);
}

/**
* Stores a map of user worker generated finalization ids and callbacks to execute once a commit
* has been successfully committed to the backing state store.
*/
public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> callbacks) {
for (Map.Entry<Long, Pair<Instant, Runnable>> entry : callbacks.entrySet()) {
Long finalizeId = entry.getKey();
final FinalizationInfo info =
FinalizationInfo.create(
finalizeId, entry.getValue().getLeft(), entry.getValue().getRight());

lock.lock();
try {
FinalizationInfo existingInfo = commitFinalizationCallbacks.put(finalizeId, info);
if (callbacks.isEmpty()) {
return;
}
Instant now = Instant.now();
lock.lock();
try {
for (Map.Entry<Long, Pair<Instant, Runnable>> entry : callbacks.entrySet()) {
Instant cleanupTime = entry.getValue().getLeft();
// Ignore finalizers that have already expired.
if (cleanupTime.isBefore(now)) {
continue;
}
ScheduledFuture<?> cleanupFuture =
cleanupExecutor.schedule(
() -> {
lock.lock();
try {
commitFinalizationCallbacks.remove(entry.getKey());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race here and this can execute before commitFinalizationCallbacks.put below.

We can move cleanupExecutor.schedule under lock.lock to fix the race. since cleanupExecutor is unbounded, it is safe to call schedule under lock.

} finally {
lock.unlock();
}
},
new Duration(now, cleanupTime).getMillis(),
TimeUnit.MILLISECONDS);
FinalizationInfo info =
FinalizationInfo.create(
entry.getKey(),
entry.getValue().getLeft(),
entry.getValue().getRight(),
cleanupFuture);
FinalizationInfo existingInfo = commitFinalizationCallbacks.put(info.getId(), info);
if (existingInfo != null) {
throw new IllegalStateException(
"Expected to not have any past callbacks for bundle "
+ finalizeId
+ info.getId()
+ " but had "
+ existingInfo);
}
if (!cleanUpThreadStarted) {
// Start the cleanup thread lazily for pipelines that don't use finalization callbacks
// and some tests.
cleanUpThreadStarted = true;
finalizationExecutor.execute(this::cleanupThreadBody, 0);
}
cleanUpQueue.add(info);
@SuppressWarnings("ReferenceEquality")
boolean newMin = cleanUpQueue.peek() == info;
if (newMin) {
queueMinChanged.signal();
}
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}

Expand All @@ -167,8 +147,8 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
for (long finalizeId : finalizeIds) {
@Nullable FinalizationInfo info = commitFinalizationCallbacks.remove(finalizeId);
if (info != null) {
checkState(cleanUpQueue.remove(info));
callbacksToExecute.add(info.getCallback());
info.getCleanupFuture().cancel(true);
}
}
} finally {
Expand All @@ -186,10 +166,10 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
}

@VisibleForTesting
int cleanupQueueSize() {
int pendingCallbacksSize() {
lock.lock();
try {
return cleanUpQueue.size();
return commitFinalizationCallbacks.size();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.auto.value.AutoValue;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -121,6 +122,7 @@ public static StreamingWorkScheduler create(
ReaderCache readerCache,
DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
BoundedQueueExecutor workExecutor,
ScheduledExecutorService commitFinalizerCleanupExecutor,
Function<String, WindmillStateCache.ForComputation> stateCacheFactory,
FailureTracker failureTracker,
WorkFailureProcessor workFailureProcessor,
Expand Down Expand Up @@ -148,7 +150,7 @@ public static StreamingWorkScheduler create(
SideInputStateFetcherFactory.fromOptions(options),
failureTracker,
workFailureProcessor,
StreamingCommitFinalizer.create(workExecutor),
StreamingCommitFinalizer.create(workExecutor, commitFinalizerCleanupExecutor),
streamingCounters,
hotKeyLogger,
stageInfoMap,
Expand Down
Loading
Loading