Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions HANDOFF-FLINK-38544-dataloss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Handoff: UnalignedCheckpointRescaleITCase data loss — PR #28517 (FLINK-38544 / FLIP-547)

For whoever owns the PR. **The PR introduces a rare, race-y in-flight data loss on unaligned-checkpoint
rescale, reproducible with the feature flag OFF. Needs root-cause + a regression test before merge.**

## TL;DR
- Failing test: `UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint[downscale keyed_broadcast from 7 to 2]`.
- `[NUM_OUTPUTS = NUM_INPUTS] expected: X but was: Y`, `Y < X` (records lost). Flag
`execution.checkpointing.during-recovery.enabled` is **off** → the legacy/always-on path.
- Confirmed PR-caused: pre-PR base **0 losses / 3141 runs** vs PR tip **≥4 losses**; p < 0.2%.
- **Not** a localized logic bug — every flag-off logic path matches base. Evidence points to a
**timing/orchestration race** added by the recovery rewiring.

## Reproduce (NOT visible single-threaded)
Green single-threaded (121/121); needs CPU contention to widen the race. Ready harness beside this file:
`repro/eval_commit.sh` + `repro/narrow.py` (narrows to the single param; N parallel workers; per-worker
surefire `tempDir`; 180s per-run timeout; classifies LOSS `Y<X` vs DUP `Y>X`). Rate ≈ 0.1–0.3%/run, so
budget ~1000+ runs per data point. Ignore `LocalTransportException` crashes and `target/surefire`
temp-file errors — contention artifacts, not the bug.

## Evidence
| Tree | Runs | LOSS | DUP | Hangs / notes |
|------|-----:|-----:|----:|------|
| BASE `d1914c63c95` | 3141 | 0 | 0 | clean |
| c1 `46d4b743b90` (queue-split) | 1494 | 1 | 1 | 3 hangs — **separate path-A bug** |
| phase4 `abe52fed9fa` | 256 | 0 | 0 | **~80% crash, deterministic** (broken intermediate) |
| HEAD `d9fc48e9946` | ~2000 | ≥4 | 0 | few hangs |

### Ruled out (don't re-chase these)
- `LocalInputChannel.checkpointStarted` `emptyList()` — restoring the scan changed nothing (0 effect, 0 dups).
- Delivery drop race (`onRecoveredStateBuffer` on `isReleased`; `releaseAllResources` discarding pending) —
instrumented, **0 drops** in 659 passes + the loss run.
- `NoSpillingHandler.recover` / `getBuffer` — **byte-identical to base**.
- `RemoteInputChannel.checkpointStarted` / `checkReadability` / `onBuffer` flag-off — **= base**.

### Why it's confounded for commit bisection
Non-monotonic: c1/phase1 use an always-on `LocalInputChannel.recoveredBuffers` migration (path A);
phase 2 replaces it with the `needsRecovery`-gated push path, so HEAD flag-off uses `RecoveredInputChannel`
(path B). c1's bug ≠ HEAD's bug. And phases 2–3 don't compile standalone; phase 4 is ~80%-crash broken —
so HEAD's path-B loss localizes only to the **phase 2–5** refactor.

## Top suspects for the HEAD bug (recovery orchestration timing)
1. **`SingleInputGate.convertRecoveredInputChannels`** — the 2-phase lock reorder ("convert +
`releaseAllResources` outside the lock, then swap under the lock"). A swap-ordering race here matches
all symptoms (rare, contention-sensitive, variable size, no per-path drop).
2. **`StreamTask` recovery wiring** — channel-IO executor, `recoverySetupCompleteFuture`, lazy
`getRecoveryCheckpointTrigger`.

## Asks
1. Audit/instrument the recovery **orchestration** timing above (you'll find it faster than black-box did).
2. Add a **regression test** for recovered-buffer conservation under rescale + concurrent checkpoint
(the queue-split commit shipped with none; a deterministic unit test would also catch c1's path-A bug).
3. Make the **intermediate commits compile/function** (phases 2–3 don't build; phase 4 crashes ~80%) — and
investigate the recovery **hangs** (the `upstreamReady` drain-liveness / B1 concern).

Full investigation: `AI_CONCLUSION_FLINK-38544-dataloss.md` in the main checkout.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class OffsetAwareOutputStream implements Closeable {

private long position;

OffsetAwareOutputStream(OutputStream currentOut, long position) {
public OffsetAwareOutputStream(OutputStream currentOut, long position) {
this.currentOut = checkNotNull(currentOut);
this.position = position;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.FetchedChannelStateReader.SpillSegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand All @@ -41,6 +42,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
Expand Down Expand Up @@ -161,6 +163,48 @@ void writeInput(
}
}

void writeInputFromSpill(
JobVertexID jobVertexID, int subtaskIndex, FetchedChannelStateReader reader) {
if (isDone()) {
try {
reader.close();
} catch (Exception ignored) {
}
return;
}
ChannelStatePendingResult pendingResult =
getChannelStatePendingResult(jobVertexID, subtaskIndex);
runWithChecks(
() -> {
checkState(!pendingResult.isAllInputsReceived());
try {
String action = "ChannelStateCheckpointWriter#writeInputFromSpill";
Optional<SpillSegment> next;
while ((next = reader.nextSegment()).isPresent()) {
SpillSegment seg = next.get();
long offset = checkpointStream.getPos();
try (AutoCloseable ignored =
NetworkActionsLogger.measureIO(action, seg.channelInfo())) {
serializer.writeData(dataStream, seg.bodyStream(), seg.length());
}
long size = checkpointStream.getPos() - offset;
pendingResult
.getInputChannelOffsets()
.computeIfAbsent(
seg.channelInfo(), unused -> new StateContentMetaInfo())
.withDataAdded(offset, size);
NetworkActionsLogger.tracePersist(
action,
seg.length() + " bytes",
seg.channelInfo(),
checkpointId);
}
} finally {
reader.close();
}
});
}

void writeOutput(
JobVertexID jobVertexID, int subtaskIndex, ResultSubpartitionInfo info, Buffer buffer) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,18 @@ public static ChannelStateFilteringHandler createFromContext(
}

/**
* Filters a recovered buffer from the specified virtual channel, returning new buffers
* containing only the records that belong to the current subtask.
*
* <p>One source buffer may produce 0 to N result buffers: 0 if all records are filtered out,
* and potentially more than 1 when a spanning record completes in this buffer. The deserializer
* caches partial record data from previous buffers, so the output may contain data that was not
* in the current source buffer, causing the total output size to exceed one buffer capacity.
* This can happen with any spanning record regardless of its size.
*
* @return filtered buffers, possibly empty if all records were filtered out.
* Filters {@code sourceBuffer} through the virtual channel identified by {@code gateIndex} /
* {@code oldChannelIndex}, appending each surviving record (length-prefixed) into {@code
* outputSerializer}. One call may emit 0..N records depending on the filter result and whether
* records spanning previous buffers complete here. The caller owns the segment boundary.
*/
public List<Buffer> filterAndRewrite(
public void filterAndRewrite(
int gateIndex,
int oldSubtaskIndex,
int oldChannelIndex,
Buffer sourceBuffer,
BufferSupplier bufferSupplier)
throws IOException, InterruptedException {
DataOutputSerializer outputSerializer)
throws IOException {

if (gateIndex < 0 || gateIndex >= gateHandlers.length) {
throw new IllegalStateException(
Expand All @@ -135,8 +129,8 @@ public List<Buffer> filterAndRewrite(
+ gateIndex
+ ". This gate is not a network input and should not have recovered buffers.");
}
return gateHandler.filterAndRewrite(
oldSubtaskIndex, oldChannelIndex, sourceBuffer, bufferSupplier);
gateHandler.filterAndRewrite(
oldSubtaskIndex, oldChannelIndex, sourceBuffer, outputSerializer);
}

/** Returns {@code true} if any virtual channel has a partial (spanning) record pending. */
Expand Down Expand Up @@ -215,7 +209,8 @@ private static <T> GateFilterHandler<T> createGateHandler(
: VirtualChannelRecordFilterFactory.createPassThroughFilter();

RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer =
createDeserializer(filterContext.getTmpDirectories());
new SpillingAdaptiveSpanningRecordDeserializer<>(
filterContext.getTmpDirectories());

VirtualChannel<T> vc = new VirtualChannel<>(deserializer, recordFilter);
gateVirtualChannels.put(key, vc);
Expand Down Expand Up @@ -246,26 +241,10 @@ private static int[] getOldChannelIndexes(RescaleMappings channelMapping, int nu
return oldIndexes.stream().mapToInt(Integer::intValue).toArray();
}

private static RecordDeserializer<DeserializationDelegate<StreamElement>> createDeserializer(
String[] tmpDirectories) {
if (tmpDirectories != null && tmpDirectories.length > 0) {
return new SpillingAdaptiveSpanningRecordDeserializer<>(tmpDirectories);
} else {
String[] defaultDirs = new String[] {System.getProperty("java.io.tmpdir")};
return new SpillingAdaptiveSpanningRecordDeserializer<>(defaultDirs);
}
}

// -------------------------------------------------------------------------------------------
// Inner classes
// -------------------------------------------------------------------------------------------

/** Provides buffers for re-serializing filtered records. Implementations may block. */
@FunctionalInterface
public interface BufferSupplier {
Buffer requestBufferBlocking() throws IOException, InterruptedException;
}

/**
* Handles record filtering for a single input gate. Each gate has its own serializer and set of
* virtual channels, allowing different gates to handle different record types independently.
Expand All @@ -275,32 +254,28 @@ static class GateFilterHandler<T> {
private final Map<SubtaskConnectionDescriptor, VirtualChannel<T>> virtualChannels;
private final StreamElementSerializer<T> serializer;
private final DeserializationDelegate<StreamElement> deserializationDelegate;
private final DataOutputSerializer outputSerializer;
private final byte[] lengthBuffer = new byte[4];

GateFilterHandler(
Map<SubtaskConnectionDescriptor, VirtualChannel<T>> virtualChannels,
StreamElementSerializer<T> serializer) {
this.virtualChannels = checkNotNull(virtualChannels);
this.serializer = checkNotNull(serializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(serializer);
this.outputSerializer = new DataOutputSerializer(128);
}

/**
* Deserializes records from {@code sourceBuffer}, applies the virtual channel's record
* filter, and immediately re-serializes each surviving record into output buffers.
* filter, and re-serializes each surviving record into {@code outputSerializer}. No
* intermediate network buffer is used; the caller owns the segment boundary.
*/
List<Buffer> filterAndRewrite(
void filterAndRewrite(
int oldSubtaskIndex,
int oldChannelIndex,
Buffer sourceBuffer,
BufferSupplier bufferSupplier)
throws IOException, InterruptedException {
DataOutputSerializer outputSerializer)
throws IOException {

boolean sourceBufferOwnershipTransferred = false;
List<Buffer> resultBuffers = new ArrayList<>();
Buffer currentBuffer = null;
try {
SubtaskConnectionDescriptor key =
new SubtaskConnectionDescriptor(oldSubtaskIndex, oldChannelIndex);
Expand All @@ -319,132 +294,33 @@ List<Buffer> filterAndRewrite(
while (true) {
DeserializationResult result = vc.getNextRecord(deserializationDelegate);
if (result.isFullRecord()) {
if (currentBuffer == null) {
currentBuffer = bufferSupplier.requestBufferBlocking();
}
currentBuffer =
serializeElement(
deserializationDelegate.getInstance(),
currentBuffer,
resultBuffers,
bufferSupplier);
serializeElement(deserializationDelegate.getInstance(), outputSerializer);
}
if (result.isBufferConsumed()) {
break;
}
}

if (currentBuffer != null) {
if (currentBuffer.readableBytes() > 0) {
resultBuffers.add(currentBuffer);
} else {
currentBuffer.recycleBuffer();
}
currentBuffer = null;
}

return resultBuffers;
} catch (Throwable t) {
if (!sourceBufferOwnershipTransferred) {
sourceBuffer.recycleBuffer();
}
// Avoid double-recycle: currentBuffer may already be the last element in
// resultBuffers if serializeElement added it before the exception.
if (currentBuffer != null
&& (resultBuffers.isEmpty()
|| resultBuffers.get(resultBuffers.size() - 1) != currentBuffer)) {
currentBuffer.recycleBuffer();
}
for (Buffer buf : resultBuffers) {
buf.recycleBuffer();
}
resultBuffers.clear();
throw t;
}
}

/**
* Serializes a single stream element into the current buffer using the length-prefixed
* format (4-byte big-endian length + record bytes) expected by Flink's record
* deserializers. Spills into new buffers from {@code bufferSupplier} when needed.
*
* @return the buffer to continue writing into (may differ from the input buffer).
* Appends one stream element as a length-prefixed record. Reserves the 4B prefix,
* serializes the element, then backfills the length, because {@code outputSerializer}
* already holds the segment header and earlier records, so the prefix cannot be written
* from a fixed offset.
*/
private Buffer serializeElement(
StreamElement element,
Buffer currentBuffer,
List<Buffer> resultBuffers,
BufferSupplier bufferSupplier)
throws IOException, InterruptedException {
outputSerializer.clear();
private void serializeElement(StreamElement element, DataOutputSerializer outputSerializer)
throws IOException {
int startPos = outputSerializer.length();
outputSerializer.writeInt(0); // length placeholder
serializer.serialize(element, outputSerializer);
int recordLength = outputSerializer.length();

writeLengthToBuffer(recordLength);
currentBuffer =
writeDataToBuffer(
lengthBuffer, 0, 4, currentBuffer, resultBuffers, bufferSupplier);

byte[] serializedData = outputSerializer.getSharedBuffer();
currentBuffer =
writeDataToBuffer(
serializedData,
0,
recordLength,
currentBuffer,
resultBuffers,
bufferSupplier);
return currentBuffer;
}

private void writeLengthToBuffer(int length) {
lengthBuffer[0] = (byte) (length >> 24);
lengthBuffer[1] = (byte) (length >> 16);
lengthBuffer[2] = (byte) (length >> 8);
lengthBuffer[3] = (byte) length;
}

/**
* Writes data to the current buffer, spilling into new buffers from {@code bufferSupplier}
* when the current one is full.
*
* @return the buffer to continue writing into (may differ from the input buffer).
*/
private Buffer writeDataToBuffer(
byte[] data,
int dataOffset,
int dataLength,
Buffer currentBuffer,
List<Buffer> resultBuffers,
BufferSupplier bufferSupplier)
throws IOException, InterruptedException {
int offset = dataOffset;
int remaining = dataLength;

while (remaining > 0) {
int writableBytes = currentBuffer.getMaxCapacity() - currentBuffer.getSize();

if (writableBytes == 0) {
// Buffer is full, transfer ownership to resultBuffers
resultBuffers.add(currentBuffer);
currentBuffer = bufferSupplier.requestBufferBlocking();
writableBytes = currentBuffer.getMaxCapacity();
}

int bytesToWrite = Math.min(remaining, writableBytes);
currentBuffer
.getMemorySegment()
.put(
currentBuffer.getMemorySegmentOffset() + currentBuffer.getSize(),
data,
offset,
bytesToWrite);
currentBuffer.setSize(currentBuffer.getSize() + bytesToWrite);

offset += bytesToWrite;
remaining -= bytesToWrite;
}
return currentBuffer;
int recordLength = outputSerializer.length() - startPos - Integer.BYTES;
outputSerializer.writeIntUnsafe(recordLength, startPos);
}

boolean hasPartialData() {
Expand Down
Loading