Skip to content

Commit f194168

Browse files
committed
[FLINK-38543][checkpoint] Change overall UC restore process for checkpoint during recovery
1 parent c1e6191 commit f194168

File tree

5 files changed

+112
-11
lines changed

5 files changed

+112
-11
lines changed

flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ void testIsUnalignedCheckpointInterruptibleTimersEnabled() {
330330
}
331331

332332
@Test
333-
void testIsUnalignedDuringRecoveryEnabled() {
333+
void testIsCheckpointingDuringRecoveryEnabled() {
334334
// Test when both options are disabled (default) - should return false
335335
Configuration defaultConfig = new Configuration();
336336
assertThat(CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(defaultConfig))

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,9 @@ private CompletableFuture<Void> restoreStateAndGates(
883883

884884
boolean checkpointingDuringRecoveryEnabled =
885885
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
886+
887+
// Must set the flag on input gates BEFORE starting the async read task, because
888+
// finishReadRecoveredState() checks this flag to complete bufferFilteringCompleteFuture.
886889
for (IndexedInputGate inputGate : inputGates) {
887890
inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled);
888891
}
@@ -899,18 +902,20 @@ private CompletableFuture<Void> restoreStateAndGates(
899902

900903
// We wait for all input channel state to recover before we go into RUNNING state, and thus
901904
// start checkpointing. If we implement incremental checkpointing of input channel state
902-
// we must make sure it supports CheckpointType#FULL_CHECKPOINT
905+
// we must make sure it supports CheckpointType#FULL_CHECKPOINT.
903906
List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length);
904907
for (InputGate inputGate : inputGates) {
905-
recoveredFutures.add(inputGate.getStateConsumedFuture());
906-
907-
inputGate
908-
.getStateConsumedFuture()
909-
.thenRun(
910-
() ->
911-
mainMailboxExecutor.execute(
912-
inputGate::requestPartitions,
913-
"Input gate request partitions"));
908+
CompletableFuture<?> requestPartitionsTrigger =
909+
checkpointingDuringRecoveryEnabled
910+
? inputGate.getBufferFilteringCompleteFuture()
911+
: inputGate.getStateConsumedFuture();
912+
913+
recoveredFutures.add(requestPartitionsTrigger);
914+
915+
requestPartitionsTrigger.thenRun(
916+
() ->
917+
mainMailboxExecutor.execute(
918+
inputGate::requestPartitions, "Input gate request partitions"));
914919
}
915920

916921
// Return allOf result instead of thenRun result.

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class SingleInputGateBuilder {
8383

8484
private TieredStorageConsumerClient tieredStorageConsumerClient = null;
8585

86+
private boolean isCheckpointingDuringRecoveryEnabled = false;
87+
8688
public SingleInputGateBuilder setPartitionProducerStateProvider(
8789
PartitionProducerStateProvider partitionProducerStateProvider) {
8890

@@ -167,6 +169,11 @@ public SingleInputGateBuilder setTieredStorageConsumerClient(
167169
return this;
168170
}
169171

172+
public SingleInputGateBuilder setCheckpointingDuringRecoveryEnabled(boolean enabled) {
173+
this.isCheckpointingDuringRecoveryEnabled = enabled;
174+
return this;
175+
}
176+
170177
public SingleInputGate build() {
171178
SingleInputGate gate =
172179
new SingleInputGate(
@@ -195,6 +202,7 @@ public SingleInputGate build() {
195202
.toArray(InputChannel[]::new));
196203
}
197204
gate.setTieredStorageService(null, tieredStorageConsumerClient, null);
205+
gate.setCheckpointingDuringRecoveryEnabled(isCheckpointingDuringRecoveryEnabled);
198206
return gate;
199207
}
200208

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,36 @@ void testCheckpointsDeclinedUnlessStateConsumed() {
142142
.isInstanceOf(CheckpointException.class);
143143
}
144144

145+
@Test
146+
void testBufferFilteringCompleteFutureAggregation() throws Exception {
147+
final NettyShuffleEnvironment environment = createNettyShuffleEnvironment();
148+
final SingleInputGate inputGate = createInputGate(environment);
149+
try (Closer closer = Closer.create()) {
150+
closer.register(environment::close);
151+
closer.register(inputGate::close);
152+
153+
// Enable unaligned during recovery for this test so that
154+
// bufferFilteringCompleteFuture is completed by finishReadRecoveredState()
155+
inputGate.setCheckpointingDuringRecoveryEnabled(true);
156+
inputGate.setup();
157+
158+
// Initially, the aggregated future should not be completed
159+
assertThat(inputGate.getBufferFilteringCompleteFuture()).isNotDone();
160+
161+
// After finishing read recovered state, bufferFilteringCompleteFuture should be
162+
// completed (only when config is enabled)
163+
inputGate.finishReadRecoveredState();
164+
assertThat(inputGate.getBufferFilteringCompleteFuture()).isDone();
165+
166+
// stateConsumedFuture should not be completed until data is consumed
167+
assertThat(inputGate.getStateConsumedFuture()).isNotDone();
168+
169+
// Consuming the EndOfInputChannelStateEvent should complete stateConsumedFuture
170+
inputGate.pollNext();
171+
assertThat(inputGate.getStateConsumedFuture()).isDone();
172+
}
173+
}
174+
145175
/**
146176
* Tests {@link InputGate#setup()} should create the respective {@link BufferPool} and assign
147177
* exclusive buffers for {@link RemoteInputChannel}s, but should not request partitions.

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
package org.apache.flink.runtime.io.network.partition.consumer;
2020

21+
import org.apache.flink.metrics.SimpleCounter;
2122
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2223
import org.apache.flink.runtime.io.PullingAsyncDataInput;
2324
import org.apache.flink.runtime.io.network.api.StopMode;
2425
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
2526
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
2627
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
28+
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
2729
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager;
2830

2931
import org.junit.jupiter.api.Test;
@@ -275,6 +277,62 @@ void testGetChannelWithShiftedGateIndexes() {
275277
assertThat(unionInputGate.getChannel(1)).isEqualTo(inputChannel2);
276278
}
277279

280+
@Test
281+
void testBufferFilteringCompleteFutureAggregation() throws IOException {
282+
// Create 2 SingleInputGates, each with 1 RecoveredInputChannel
283+
SingleInputGate ig1 =
284+
new SingleInputGateBuilder().setCheckpointingDuringRecoveryEnabled(true).build();
285+
RecoveredInputChannel channel1 = buildRecoveredChannel(ig1);
286+
ig1.setInputChannels(channel1);
287+
288+
SingleInputGate ig2 =
289+
new SingleInputGateBuilder()
290+
.setSingleInputGateIndex(1)
291+
.setCheckpointingDuringRecoveryEnabled(true)
292+
.build();
293+
RecoveredInputChannel channel2 = buildRecoveredChannel(ig2);
294+
ig2.setInputChannels(channel2);
295+
296+
UnionInputGate union = new UnionInputGate(ig1, ig2);
297+
298+
// Initially, bufferFilteringCompleteFuture should not be done
299+
assertThat(union.getBufferFilteringCompleteFuture()).isNotDone();
300+
assertThat(union.getStateConsumedFuture()).isNotDone();
301+
302+
// Complete buffer filtering on first gate only
303+
channel1.finishReadRecoveredState();
304+
assertThat(ig1.getBufferFilteringCompleteFuture()).isDone();
305+
assertThat(union.getBufferFilteringCompleteFuture()).isNotDone();
306+
307+
// Complete buffer filtering on second gate
308+
channel2.finishReadRecoveredState();
309+
assertThat(ig2.getBufferFilteringCompleteFuture()).isDone();
310+
assertThat(union.getBufferFilteringCompleteFuture()).isDone();
311+
312+
// State consumed futures should still NOT be done (state not consumed yet)
313+
assertThat(union.getStateConsumedFuture()).isNotDone();
314+
}
315+
316+
private static RecoveredInputChannel buildRecoveredChannel(SingleInputGate inputGate) {
317+
return new RecoveredInputChannel(
318+
inputGate,
319+
0,
320+
new ResultPartitionID(),
321+
new ResultSubpartitionIndexSet(0),
322+
0,
323+
0,
324+
new SimpleCounter(),
325+
new SimpleCounter(),
326+
10) {
327+
@Override
328+
protected InputChannel toInputChannelInternal(
329+
java.util.ArrayDeque<org.apache.flink.runtime.io.network.buffer.Buffer>
330+
remainingBuffers) {
331+
throw new UnsupportedOperationException();
332+
}
333+
};
334+
}
335+
278336
@Test
279337
void testEmptyPull() throws IOException, InterruptedException {
280338
final SingleInputGate inputGate1 = createInputGate(1);

0 commit comments

Comments
 (0)