Skip to content

[DEBUG - DO NOT MERGE][FLINK-38544] hunt 1#28544

Draft
rkhachatryan wants to merge 7 commits into
apache:masterfrom
rkhachatryan:f38544-hunt-1
Draft

[DEBUG - DO NOT MERGE][FLINK-38544] hunt 1#28544
rkhachatryan wants to merge 7 commits into
apache:masterfrom
rkhachatryan:f38544-hunt-1

Conversation

@rkhachatryan

@rkhachatryan rkhachatryan commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

#28517 (cc @1996fanrui)

Handoff: UnalignedCheckpointRescaleITCase data loss — PR #28517 (FLINK-38544 / FLIP-547)

For whoever owns the PR. The PR introduces a rare, race-y in-flight data loss on unaligned-checkpoint
rescale, reproducible with the feature flag OFF. Needs root-cause + a regression test before merge.

TL;DR

  • Failing test: UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint[downscale keyed_broadcast from 7 to 2].
  • [NUM_OUTPUTS = NUM_INPUTS] expected: X but was: Y, Y < X (records lost). Flag
    execution.checkpointing.during-recovery.enabled is off → the legacy/always-on path.
  • Confirmed PR-caused: pre-PR base 0 losses / 3141 runs vs PR tip ≥4 losses; p < 0.2%.
  • Not a localized logic bug — every flag-off logic path matches base. Evidence points to a
    timing/orchestration race added by the recovery rewiring.

Reproduce (NOT visible single-threaded)

Green single-threaded (121/121); needs CPU contention to widen the race. Ready harness beside this file:
repro/eval_commit.sh + repro/narrow.py (narrows to the single param; N parallel workers; per-worker
surefire tempDir; 180s per-run timeout; classifies LOSS Y<X vs DUP Y>X). Rate ≈ 0.1–0.3%/run, so
budget ~1000+ runs per data point. Ignore LocalTransportException crashes and target/surefire
temp-file errors — contention artifacts, not the bug.

Evidence

Tree Runs LOSS DUP Hangs / notes
BASE d1914c63c95 3141 0 0 clean
c1 46d4b743b90 (queue-split) 1494 1 1 3 hangs — separate path-A bug
phase4 abe52fed9fa 256 0 0 ~80% crash, deterministic (broken intermediate)
HEAD d9fc48e9946 ~2000 ≥4 0 few hangs

Ruled out (don't re-chase these)

  • LocalInputChannel.checkpointStarted emptyList() — restoring the scan changed nothing (0 effect, 0 dups).
  • Delivery drop race (onRecoveredStateBuffer on isReleased; releaseAllResources discarding pending) —
    instrumented, 0 drops in 659 passes + the loss run.
  • NoSpillingHandler.recover / getBufferbyte-identical to base.
  • RemoteInputChannel.checkpointStarted / checkReadability / onBuffer flag-off — = base.

Why it's confounded for commit bisection

Non-monotonic: c1/phase1 use an always-on LocalInputChannel.recoveredBuffers migration (path A);
phase 2 replaces it with the needsRecovery-gated push path, so HEAD flag-off uses RecoveredInputChannel
(path B). c1's bug ≠ HEAD's bug. And phases 2–3 don't compile standalone; phase 4 is ~80%-crash broken —
so HEAD's path-B loss localizes only to the phase 2–5 refactor.

Top suspects for the HEAD bug (recovery orchestration timing)

  1. SingleInputGate.convertRecoveredInputChannels — the 2-phase lock reorder ("convert +
    releaseAllResources outside the lock, then swap under the lock"). A swap-ordering race here matches
    all symptoms (rare, contention-sensitive, variable size, no per-path drop).
  2. StreamTask recovery wiring — channel-IO executor, recoverySetupCompleteFuture, lazy
    getRecoveryCheckpointTrigger.

Asks

  1. Audit/instrument the recovery orchestration timing above (you'll find it faster than black-box did).
  2. Add a regression test for recovered-buffer conservation under rescale + concurrent checkpoint
    (the queue-split commit shipped with none; a deterministic unit test would also catch c1's path-A bug).
  3. Make the intermediate commits compile/function (phases 2–3 don't build; phase 4 crashes ~80%) — and
    investigate the recovery hangs (the upstreamReady drain-liveness / B1 concern).

Full investigation: AI_CONCLUSION_FLINK-38544-dataloss.md in the main checkout.

1996fanrui and others added 7 commits June 23, 2026 15:33
…m toBeConsumedBuffers

Split the single toBeConsumedBuffers queue into two queues with disjoint
responsibilities:

  - recoveredBuffers (new): holds buffers migrated from RecoveredInputChannel
    during construction; consumed by getNextRecoveredBuffer() which retains
    the priority-event interleaving and last-buffer dynamic next-data-type
    detection introduced by FLINK-39018.
  - toBeConsumedBuffers (existing): reverted to its pre-FLINK-39018 role of
    holding FullyFilledBuffer partial-buffer splits only. The recovery-aware
    early branch in getNextBuffer() and the checkpointStarted inflight scan
    no longer touch this queue.

Restores the checkState(toBeConsumedBuffers.isEmpty()) guard in
requestSubpartitions() (removed by cebc174). hasPendingPriorityEvent,
notifyPriorityEvent, and the constructor signature are unchanged.

Pure refactor: no public API change, no new tests; verified by the 9 existing
LocalInputChannelTest regression cases.

(cherry picked from commit 292cc4b)
(cherry picked from commit 7fbfc78)
…lling v2

- Adds BufferRequester, RecoverableInputChannel, RecoveryCheckpointTrigger
  interfaces with their final signatures (including getChannelInfo on
  RecoverableInputChannel and NO_OP singleton on RecoveryCheckpointTrigger).
- Adds RecoveryCheckpointBarrier sentinel + DiskSnapshot data class with
  final 3-arg constructor signature and Chunk / StartPos / empty() helpers.
- ChannelStateWriter gains addInputDataFromSpill and peekWriteResult default
  methods so all callers can compile against the interface without the
  dispatcher implementation landing in this phase.
- RecoveredInputChannel#releaseAllResources visibility: package-private -> public

References to SpillFile in DiskSnapshot's constructor are forward references;
SpillFile itself lands in Phase 3. Each phase commit only needs to compile
as a whole tree at the final commit, not in isolation.

Design: requirements/38544/phase1_interfaces/design.md
(cherry picked from commit 98c7b42)
- Local/Remote InputChannel implement RecoverableInputChannel from Phase 1
- recoveredBuffers reshaped to Deque<Buffer>; allRecoveredBuffersDelivered flag
- getNextBuffer() unified under a single inRecovery predicate
- checkpointStarted split into mutually-exclusive in-recovery / not-in-recovery
- stateConsumedFuture triggered by (allRecoveredBuffersDelivered && queue empty)
- RecoveredInputChannel.toInputChannel migrates via the new push interface;
  the initialRecoveredBuffers constructor parameter is gone.
- LocalInputChannel.getNextRecoveredBuffer helper deleted

Design: requirements/38544/phase2_input_channel/design.md
(cherry picked from commit 8290409)
- New SpillFile: append-only segmented disk store with 64 MiB segments,
  reference counter + cleanedUp guard, and Snapshot view over segments and
  entries. All public signatures (append, snapshot, readBytesAt, acquire,
  release, isClosed) land in this commit; later phases only fill in bodies.
- New FilteredBufferWriter: prefilter + postfilter buffer accumulator,
  flushing the post-filter buffer to disk on rotation.
- New SpillFileWriter: thin facade exposing SpillFile lifecycle to filter
  callers.
- RecoveredChannelStateHandler.recover filter branch routes output to a
  SpillFile instead of channel.onRecoveredStateBuffer; the accumulator's
  prefilter and postfilter buffers are sourced from the source channel's
  exclusive pool (no heap fallback).
- InputChannelRecoveredStateHandler exposes getProducedSpillFile so Phase 4
  drain wiring can pick up the frozen file after filter completes;
  spill-tmp-directories argument is required (no backward-compat shim).

Design: requirements/38544/phase3_spill_writer/design.md
(cherry picked from commit 2cbbbd6)
… removal

- New SpillFileReader implements RecoveryCheckpointTrigger + Closeable.
  drain(): buffer alloc + disk read outside lock; deliver + offset advance
  inside lock. snapshotAndInsertBarriers(cpId): atomic startPos snapshot +
  per-channel barrier insert. Constructor derives the InputChannelInfo map
  internally; bodies pair acquire/release against SpillFile's ref counter.
- New RecoveredChannelBufferRequester delegates to RecoveredInputChannel pool.
- RecoveredInputChannel.requestBufferBlocking heap fallback removed
  (no more MemorySegmentFactory.allocateUnpooledSegment; OOM path eliminated).
- channelIOExecutor wired: filter-on submits drain after conversion completes;
  exceptions bubble via StreamTask.asyncExceptionHandler.

Design: requirements/38544/phase4_spill_reader/design.md
(cherry picked from commit 1315d38)
- ChannelState dispatcher onCheckpointStartedForAllInputs implements
  Step 1 (snapshotAndInsertBarriers) -> Step 2 (per-input checkpointStarted)
  -> Step 3 (addInputDataFromSpill) -> cpId-completion release callback.
- Hook AlternatingWaitingForFirstBarrierUnaligned.barrierReceived and
  AlternatingCollectingBarriers.alignedCheckpointTimeout into the dispatcher.
- ChannelStateWriterImpl.addInputDataFromSpill: async demux by Chunk.channelInfo,
  empty snapshot inline early return, failures propagate via ChannelStateWriteResult.
- Stream task pipelines (One/Two/Multiple) wire ChannelState through the
  InputProcessorUtil + SingleCheckpointBarrierHandler so the dispatcher hook
  reaches the right barrier-handler instance.
- ITCases (relocated under flink-runtime to share the package with SpillFile):
  rescale + filter + large record OOM regression, UC during recovery.

FLINK-38544 spilling v2 feature complete.

Design: requirements/38544/phase5_coordination/design.md
(cherry picked from commit 7badbd2)
@flinkbot

flinkbot commented Jun 25, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui

1996fanrui commented Jun 25, 2026

Copy link
Copy Markdown
Member

Independent repro on a separate machine, same harness (repro/repro.sh), narrowed to downscale keyed_broadcast 7→2, sourceSleepMs=0.

In the logs PseudoRandomValueSelector selected:

  • execution.checkpointing.unaligned.enabled = true
  • execution.checkpointing.unaligned.recover-output-on-downstream.enabled = true

(both true in the c1 failure log; I also checked the third test group and both flags are true there too.)

Tree Runs LOSS DUP INFRA Loss detail
base master 92e6fe8c9b19 run 1 2000 0 0 0
base master 92e6fe8c9b19 run 2 2000 0 0 1
base master 92e6fe8c9b19 run 3 3000 0 0 0
base master 92e6fe8c9b19 run 4 2000 0 0 0
c1 46d4b743b90 (queue-split) run 1 1496 1 0 0 expected 196578 / was 194884 (−1694)
c1 46d4b743b90 (queue-split) run 2 1310 0 1 0 expected: 46259L/ but was: 46398L (+139)
HEAD d9fc48e9946 — run 1 1501 0 0 clean
HEAD d9fc48e9946 — run 2 1904 1 0 3 expected 112286 / was 98186 (−14100)

@1996fanrui

1996fanrui commented Jun 26, 2026

Copy link
Copy Markdown
Member

Independent repro for downscale keyed_broadcast 7→2, sourceSleepMs=0.

Explicitly configurations:

  • execution.checkpointing.file-merging.enabled = false
  • execution.checkpointing.unaligned.enabled = true
  • execution.checkpointing.unaligned.recover-output-on-downstream.enabled = true
  • execution.checkpointing.during-recovery.enabled = true
Tree Runs LOSS DUP Corrupt INFRA Note
base master 92e6fe8c9b19 run 1 3011 0 0 0 3 Added sleep(100ms) in LocalInputChannel.checkpointStarted
c1 46d4b74 (queue-split) run 1 3014 0 0 0 0 Added sleep(100ms) in LocalInputChannel.checkpointStarted
c1 46d4b74 (queue-split) run 2 3014 0 0 0 0 Added sleep(100ms) in LocalInputChannel.checkpointStarted
c1 46d4b74 (queue-split) run 3 3009 0 0 0 0 remove sleep
c1 46d4b74 (queue-split) run 4 3009 0 0 0 0 remove sleep
head d9fc48e 1285 1 0 1 0 expected: 452082L but was: 442498L and Corrupt stream, found tag: -22,

@rkhachatryan

Copy link
Copy Markdown
Contributor Author

@1996fanrui I re-worked CDR in #28554 and don't see the data loss there:

#28554 (comment)

Validation — downscale data-loss reproducer from #28544 (UnalignedCheckpointRescaleITCase, keyed_broadcast 7→2, ~0.1–0.3%/run):

Branch Reproducer result
this branch 0 losses / 1498 runs
d9fc48 (pre-rewrite baseline) reproduces — 439 records lost / ~590 runs

So the reproducer is valid and the loss does not occur on this branch. Note: the loss is already present at d9fc48, i.e. it predates the async-recovery rewrite, so attribution to a single commit isn't isolated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants