Skip to content

Commit 907bf3a

Browse files
committed
[FLINK-38543][checkpoint] Change overall UC restore process for checkpoint during recovery
1 parent 5a80d1b commit 907bf3a

5 files changed

Lines changed: 201 additions & 18 deletions

File tree

flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ void testIsUnalignedCheckpointInterruptibleTimersEnabled() {
330330
}
331331

332332
@Test
333-
void testIsUnalignedDuringRecoveryEnabled() {
333+
void testIsCheckpointingDuringRecoveryEnabled() {
334334
// Test when both options are disabled (default) - should return false
335335
Configuration defaultConfig = new Configuration();
336336
assertThat(CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(defaultConfig))

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,9 @@ private CompletableFuture<Void> restoreStateAndGates(
883883

884884
boolean checkpointingDuringRecoveryEnabled =
885885
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
886+
887+
// Must set the flag on input gates BEFORE starting the async read task, because
888+
// finishReadRecoveredState() checks this flag to complete bufferFilteringCompleteFuture.
886889
for (IndexedInputGate inputGate : inputGates) {
887890
inputGate.setCheckpointingDuringRecoveryEnabled(checkpointingDuringRecoveryEnabled);
888891
}
@@ -899,18 +902,20 @@ private CompletableFuture<Void> restoreStateAndGates(
899902

900903
// We wait for all input channel state to recover before we go into RUNNING state, and thus
901904
// start checkpointing. If we implement incremental checkpointing of input channel state
902-
// we must make sure it supports CheckpointType#FULL_CHECKPOINT
905+
// we must make sure it supports CheckpointType#FULL_CHECKPOINT.
903906
List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length);
904907
for (InputGate inputGate : inputGates) {
905-
recoveredFutures.add(inputGate.getStateConsumedFuture());
906-
907-
inputGate
908-
.getStateConsumedFuture()
909-
.thenRun(
910-
() ->
911-
mainMailboxExecutor.execute(
912-
inputGate::requestPartitions,
913-
"Input gate request partitions"));
908+
CompletableFuture<?> requestPartitionsTrigger =
909+
checkpointingDuringRecoveryEnabled
910+
? inputGate.getBufferFilteringCompleteFuture()
911+
: inputGate.getStateConsumedFuture();
912+
913+
recoveredFutures.add(requestPartitionsTrigger);
914+
915+
requestPartitionsTrigger.thenRun(
916+
() ->
917+
mainMailboxExecutor.execute(
918+
inputGate::requestPartitions, "Input gate request partitions"));
914919
}
915920

916921
// Return allOf result instead of thenRun result.

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java

Lines changed: 147 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,44 @@
2828

2929
import org.junit.jupiter.api.Test;
3030

31+
import java.io.IOException;
3132
import java.util.ArrayDeque;
3233

3334
import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
3435
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
36+
import static org.assertj.core.api.Assertions.assertThat;
3537
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3638

3739
/** Tests for {@link RecoveredInputChannel}. */
3840
class RecoveredInputChannelTest {
3941

4042
@Test
41-
void testConversionOnlyPossibleAfterConsumed() {
42-
assertThatThrownBy(() -> buildChannel().toInputChannel())
43-
.isInstanceOf(IllegalStateException.class);
43+
void testConversionOnlyPossibleAfterConsumedWhenConfigDisabled() {
44+
// When config is disabled, toInputChannel() checks stateConsumedFuture
45+
assertThatThrownBy(() -> buildChannel(false).toInputChannel())
46+
.isInstanceOf(IllegalStateException.class)
47+
.hasMessageContaining("recovered state is not fully consumed");
48+
}
49+
50+
@Test
51+
void testConversionOnlyPossibleAfterFilteringWhenConfigEnabled() {
52+
// When config is enabled, toInputChannel() checks bufferFilteringCompleteFuture
53+
assertThatThrownBy(() -> buildChannel(true).toInputChannel())
54+
.isInstanceOf(IllegalStateException.class)
55+
.hasMessageContaining("buffer filtering is not complete");
4456
}
4557

4658
@Test
4759
void testRequestPartitionsImpossible() {
48-
assertThatThrownBy(() -> buildChannel().requestSubpartitions())
60+
assertThatThrownBy(() -> buildChannel(false).requestSubpartitions())
4961
.isInstanceOf(UnsupportedOperationException.class);
5062
}
5163

5264
@Test
5365
void testCheckpointStartImpossible() {
5466
assertThatThrownBy(
5567
() ->
56-
buildChannel()
68+
buildChannel(false)
5769
.checkpointStarted(
5870
new CheckpointBarrier(
5971
0L,
@@ -64,10 +76,101 @@ void testCheckpointStartImpossible() {
6476
.isInstanceOf(CheckpointException.class);
6577
}
6678

67-
private RecoveredInputChannel buildChannel() {
79+
@Test
80+
void testToInputChannelAllowedWhenBufferFilteringCompleteAndConfigEnabled() throws IOException {
81+
// When config is enabled, conversion is allowed when bufferFilteringCompleteFuture is done
82+
TestableRecoveredInputChannel channel = buildTestableChannel(true);
83+
84+
// Initially, conversion should fail
85+
assertThatThrownBy(() -> channel.toInputChannel())
86+
.isInstanceOf(IllegalStateException.class)
87+
.hasMessageContaining("buffer filtering is not complete");
88+
89+
// After finishReadRecoveredState(), bufferFilteringCompleteFuture should be done
90+
channel.finishReadRecoveredState();
91+
assertThat(channel.getBufferFilteringCompleteFuture()).isDone();
92+
assertThat(channel.getStateConsumedFuture()).isNotDone();
93+
94+
// Conversion should now succeed (no exception)
95+
InputChannel converted = channel.toInputChannel();
96+
assertThat(converted).isNotNull();
97+
}
98+
99+
@Test
100+
void testToInputChannelAllowedWhenStateConsumedAndConfigDisabled() throws IOException {
101+
// When config is disabled, conversion is allowed when stateConsumedFuture is done
102+
TestableRecoveredInputChannel channel = buildTestableChannel(false);
103+
104+
// Initially, conversion should fail
105+
assertThatThrownBy(() -> channel.toInputChannel())
106+
.isInstanceOf(IllegalStateException.class)
107+
.hasMessageContaining("recovered state is not fully consumed");
108+
109+
// After finishReadRecoveredState(), bufferFilteringCompleteFuture should NOT be done
110+
// because config is disabled
111+
channel.finishReadRecoveredState();
112+
assertThat(channel.getBufferFilteringCompleteFuture()).isNotDone();
113+
assertThat(channel.getStateConsumedFuture()).isNotDone();
114+
115+
// Conversion should still fail because stateConsumedFuture is not done
116+
assertThatThrownBy(() -> channel.toInputChannel())
117+
.isInstanceOf(IllegalStateException.class)
118+
.hasMessageContaining("recovered state is not fully consumed");
119+
120+
// Consume the EndOfInputChannelStateEvent to complete stateConsumedFuture.
121+
// getNextBuffer() returns empty when it encounters the event internally.
122+
assertThat(channel.getNextBuffer()).isNotPresent();
123+
assertThat(channel.getStateConsumedFuture()).isDone();
124+
125+
// Now conversion should succeed
126+
InputChannel converted = channel.toInputChannel();
127+
assertThat(converted).isNotNull();
128+
}
129+
130+
@Test
131+
void testBufferFilteringCompleteFutureOnlyCompletesWhenConfigEnabled() throws IOException {
132+
// Config enabled: finishReadRecoveredState() completes bufferFilteringCompleteFuture
133+
RecoveredInputChannel channelEnabled = buildChannel(true);
134+
assertThat(channelEnabled.getBufferFilteringCompleteFuture()).isNotDone();
135+
channelEnabled.finishReadRecoveredState();
136+
assertThat(channelEnabled.getBufferFilteringCompleteFuture()).isDone();
137+
138+
// Config disabled: finishReadRecoveredState() does NOT complete
139+
// bufferFilteringCompleteFuture
140+
RecoveredInputChannel channelDisabled = buildChannel(false);
141+
assertThat(channelDisabled.getBufferFilteringCompleteFuture()).isNotDone();
142+
channelDisabled.finishReadRecoveredState();
143+
assertThat(channelDisabled.getBufferFilteringCompleteFuture()).isNotDone();
144+
}
145+
146+
@Test
147+
void testStateConsumedFutureCompletesAfterConsumingAllBuffers() throws IOException {
148+
// This test verifies that stateConsumedFuture completes after consuming
149+
// EndOfInputChannelStateEvent regardless of the config setting
150+
for (boolean configEnabled : new boolean[] {true, false}) {
151+
RecoveredInputChannel channel = buildChannel(configEnabled);
152+
153+
assertThat(channel.getStateConsumedFuture()).isNotDone();
154+
155+
channel.finishReadRecoveredState();
156+
assertThat(channel.getStateConsumedFuture()).isNotDone();
157+
158+
// Consuming the EndOfInputChannelStateEvent should complete the future.
159+
// getNextBuffer() returns empty when it encounters the event internally.
160+
assertThat(channel.getNextBuffer()).isNotPresent();
161+
assertThat(channel.getStateConsumedFuture()).isDone();
162+
}
163+
}
164+
165+
private RecoveredInputChannel buildChannel(boolean checkpointingDuringRecoveryEnabled) {
68166
try {
167+
SingleInputGate inputGate =
168+
new SingleInputGateBuilder()
169+
.setCheckpointingDuringRecoveryEnabled(
170+
checkpointingDuringRecoveryEnabled)
171+
.build();
69172
return new RecoveredInputChannel(
70-
new SingleInputGateBuilder().build(),
173+
inputGate,
71174
0,
72175
new ResultPartitionID(),
73176
new ResultSubpartitionIndexSet(0),
@@ -85,4 +188,41 @@ protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffer
85188
throw new AssertionError("channel creation failed", e);
86189
}
87190
}
191+
192+
private TestableRecoveredInputChannel buildTestableChannel(
193+
boolean checkpointingDuringRecoveryEnabled) {
194+
try {
195+
SingleInputGate inputGate =
196+
new SingleInputGateBuilder()
197+
.setCheckpointingDuringRecoveryEnabled(
198+
checkpointingDuringRecoveryEnabled)
199+
.build();
200+
return new TestableRecoveredInputChannel(inputGate);
201+
} catch (Exception e) {
202+
throw new AssertionError("channel creation failed", e);
203+
}
204+
}
205+
206+
/**
207+
* A RecoveredInputChannel that returns a TestInputChannel when converted, for testing purposes.
208+
*/
209+
private static class TestableRecoveredInputChannel extends RecoveredInputChannel {
210+
TestableRecoveredInputChannel(SingleInputGate inputGate) {
211+
super(
212+
inputGate,
213+
0,
214+
new ResultPartitionID(),
215+
new ResultSubpartitionIndexSet(0),
216+
0,
217+
0,
218+
new SimpleCounter(),
219+
new SimpleCounter(),
220+
10);
221+
}
222+
223+
@Override
224+
protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers) {
225+
return new TestInputChannel(inputGate, 0);
226+
}
227+
}
88228
}

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class SingleInputGateBuilder {
8383

8484
private TieredStorageConsumerClient tieredStorageConsumerClient = null;
8585

86+
private boolean isCheckpointingDuringRecoveryEnabled = false;
87+
8688
public SingleInputGateBuilder setPartitionProducerStateProvider(
8789
PartitionProducerStateProvider partitionProducerStateProvider) {
8890

@@ -167,6 +169,11 @@ public SingleInputGateBuilder setTieredStorageConsumerClient(
167169
return this;
168170
}
169171

172+
public SingleInputGateBuilder setCheckpointingDuringRecoveryEnabled(boolean enabled) {
173+
this.isCheckpointingDuringRecoveryEnabled = enabled;
174+
return this;
175+
}
176+
170177
public SingleInputGate build() {
171178
SingleInputGate gate =
172179
new SingleInputGate(
@@ -195,6 +202,7 @@ public SingleInputGate build() {
195202
.toArray(InputChannel[]::new));
196203
}
197204
gate.setTieredStorageService(null, tieredStorageConsumerClient, null);
205+
gate.setCheckpointingDuringRecoveryEnabled(isCheckpointingDuringRecoveryEnabled);
198206
return gate;
199207
}
200208

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,36 @@ void testCheckpointsDeclinedUnlessStateConsumed() {
142142
.isInstanceOf(CheckpointException.class);
143143
}
144144

145+
@Test
146+
void testBufferFilteringCompleteFutureAggregation() throws Exception {
147+
final NettyShuffleEnvironment environment = createNettyShuffleEnvironment();
148+
final SingleInputGate inputGate = createInputGate(environment);
149+
try (Closer closer = Closer.create()) {
150+
closer.register(environment::close);
151+
closer.register(inputGate::close);
152+
153+
// Enable unaligned during recovery for this test so that
154+
// bufferFilteringCompleteFuture is completed by finishReadRecoveredState()
155+
inputGate.setCheckpointingDuringRecoveryEnabled(true);
156+
inputGate.setup();
157+
158+
// Initially, the aggregated future should not be completed
159+
assertThat(inputGate.getBufferFilteringCompleteFuture()).isNotDone();
160+
161+
// After finishing read recovered state, bufferFilteringCompleteFuture should be
162+
// completed (only when config is enabled)
163+
inputGate.finishReadRecoveredState();
164+
assertThat(inputGate.getBufferFilteringCompleteFuture()).isDone();
165+
166+
// stateConsumedFuture should not be completed until data is consumed
167+
assertThat(inputGate.getStateConsumedFuture()).isNotDone();
168+
169+
// Consuming the EndOfInputChannelStateEvent should complete stateConsumedFuture
170+
inputGate.pollNext();
171+
assertThat(inputGate.getStateConsumedFuture()).isDone();
172+
}
173+
}
174+
145175
/**
146176
* Tests {@link InputGate#setup()} should create the respective {@link BufferPool} and assign
147177
* exclusive buffers for {@link RemoteInputChannel}s, but should not request partitions.

0 commit comments

Comments
 (0)