[DEBUG - DO NOT MERGE][FLINK-38544] hunt 1#28544
Conversation
…m toBeConsumedBuffers
Split the single toBeConsumedBuffers queue into two queues with disjoint
responsibilities:
- recoveredBuffers (new): holds buffers migrated from RecoveredInputChannel
during construction; consumed by getNextRecoveredBuffer() which retains
the priority-event interleaving and last-buffer dynamic next-data-type
detection introduced by FLINK-39018.
- toBeConsumedBuffers (existing): reverted to its pre-FLINK-39018 role of
holding FullyFilledBuffer partial-buffer splits only. The recovery-aware
early branch in getNextBuffer() and the checkpointStarted inflight scan
no longer touch this queue.
Restores the checkState(toBeConsumedBuffers.isEmpty()) guard in
requestSubpartitions() (removed by cebc174). hasPendingPriorityEvent,
notifyPriorityEvent, and the constructor signature are unchanged.
Pure refactor: no public API change, no new tests; verified by the 9 existing
LocalInputChannelTest regression cases.
(cherry picked from commit 292cc4b)
(cherry picked from commit 7fbfc78)
…lling v2 - Adds BufferRequester, RecoverableInputChannel, RecoveryCheckpointTrigger interfaces with their final signatures (including getChannelInfo on RecoverableInputChannel and NO_OP singleton on RecoveryCheckpointTrigger). - Adds RecoveryCheckpointBarrier sentinel + DiskSnapshot data class with final 3-arg constructor signature and Chunk / StartPos / empty() helpers. - ChannelStateWriter gains addInputDataFromSpill and peekWriteResult default methods so all callers can compile against the interface without the dispatcher implementation landing in this phase. - RecoveredInputChannel#releaseAllResources visibility: package-private -> public References to SpillFile in DiskSnapshot's constructor are forward references; SpillFile itself lands in Phase 3. Each phase commit only needs to compile as a whole tree at the final commit, not in isolation. Design: requirements/38544/phase1_interfaces/design.md (cherry picked from commit 98c7b42)
- Local/Remote InputChannel implement RecoverableInputChannel from Phase 1 - recoveredBuffers reshaped to Deque<Buffer>; allRecoveredBuffersDelivered flag - getNextBuffer() unified under a single inRecovery predicate - checkpointStarted split into mutually-exclusive in-recovery / not-in-recovery - stateConsumedFuture triggered by (allRecoveredBuffersDelivered && queue empty) - RecoveredInputChannel.toInputChannel migrates via the new push interface; the initialRecoveredBuffers constructor parameter is gone. - LocalInputChannel.getNextRecoveredBuffer helper deleted Design: requirements/38544/phase2_input_channel/design.md (cherry picked from commit 8290409)
- New SpillFile: append-only segmented disk store with 64 MiB segments, reference counter + cleanedUp guard, and Snapshot view over segments and entries. All public signatures (append, snapshot, readBytesAt, acquire, release, isClosed) land in this commit; later phases only fill in bodies. - New FilteredBufferWriter: prefilter + postfilter buffer accumulator, flushing the post-filter buffer to disk on rotation. - New SpillFileWriter: thin facade exposing SpillFile lifecycle to filter callers. - RecoveredChannelStateHandler.recover filter branch routes output to a SpillFile instead of channel.onRecoveredStateBuffer; the accumulator's prefilter and postfilter buffers are sourced from the source channel's exclusive pool (no heap fallback). - InputChannelRecoveredStateHandler exposes getProducedSpillFile so Phase 4 drain wiring can pick up the frozen file after filter completes; spill-tmp-directories argument is required (no backward-compat shim). Design: requirements/38544/phase3_spill_writer/design.md (cherry picked from commit 2cbbbd6)
… removal - New SpillFileReader implements RecoveryCheckpointTrigger + Closeable. drain(): buffer alloc + disk read outside lock; deliver + offset advance inside lock. snapshotAndInsertBarriers(cpId): atomic startPos snapshot + per-channel barrier insert. Constructor derives the InputChannelInfo map internally; bodies pair acquire/release against SpillFile's ref counter. - New RecoveredChannelBufferRequester delegates to RecoveredInputChannel pool. - RecoveredInputChannel.requestBufferBlocking heap fallback removed (no more MemorySegmentFactory.allocateUnpooledSegment; OOM path eliminated). - channelIOExecutor wired: filter-on submits drain after conversion completes; exceptions bubble via StreamTask.asyncExceptionHandler. Design: requirements/38544/phase4_spill_reader/design.md (cherry picked from commit 1315d38)
- ChannelState dispatcher onCheckpointStartedForAllInputs implements Step 1 (snapshotAndInsertBarriers) -> Step 2 (per-input checkpointStarted) -> Step 3 (addInputDataFromSpill) -> cpId-completion release callback. - Hook AlternatingWaitingForFirstBarrierUnaligned.barrierReceived and AlternatingCollectingBarriers.alignedCheckpointTimeout into the dispatcher. - ChannelStateWriterImpl.addInputDataFromSpill: async demux by Chunk.channelInfo, empty snapshot inline early return, failures propagate via ChannelStateWriteResult. - Stream task pipelines (One/Two/Multiple) wire ChannelState through the InputProcessorUtil + SingleCheckpointBarrierHandler so the dispatcher hook reaches the right barrier-handler instance. - ITCases (relocated under flink-runtime to share the package with SpillFile): rescale + filter + large record OOM regression, UC during recovery. FLINK-38544 spilling v2 feature complete. Design: requirements/38544/phase5_coordination/design.md (cherry picked from commit 7badbd2)
|
Independent repro on a separate machine, same harness ( In the logs
(both true in the c1 failure log; I also checked the third test group and both flags are true there too.)
|
|
Independent repro for Explicitly configurations:
|
|
@1996fanrui I re-worked CDR in #28554 and don't see the data loss there: Validation — downscale data-loss reproducer from #28544 (
So the reproducer is valid and the loss does not occur on this branch. Note: the loss is already present at |
#28517 (cc @1996fanrui)
Handoff: UnalignedCheckpointRescaleITCase data loss — PR #28517 (FLINK-38544 / FLIP-547)
For whoever owns the PR. The PR introduces a rare, race-y in-flight data loss on unaligned-checkpoint
rescale, reproducible with the feature flag OFF. Needs root-cause + a regression test before merge.
TL;DR
UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint[downscale keyed_broadcast from 7 to 2].[NUM_OUTPUTS = NUM_INPUTS] expected: X but was: Y,Y < X(records lost). Flagexecution.checkpointing.during-recovery.enabledis off → the legacy/always-on path.timing/orchestration race added by the recovery rewiring.
Reproduce (NOT visible single-threaded)
Green single-threaded (121/121); needs CPU contention to widen the race. Ready harness beside this file:
repro/eval_commit.sh+repro/narrow.py(narrows to the single param; N parallel workers; per-workersurefire
tempDir; 180s per-run timeout; classifies LOSSY<Xvs DUPY>X). Rate ≈ 0.1–0.3%/run, sobudget ~1000+ runs per data point. Ignore
LocalTransportExceptioncrashes andtarget/surefiretemp-file errors — contention artifacts, not the bug.
Evidence
d1914c63c9546d4b743b90(queue-split)abe52fed9fad9fc48e9946Ruled out (don't re-chase these)
LocalInputChannel.checkpointStartedemptyList()— restoring the scan changed nothing (0 effect, 0 dups).onRecoveredStateBufferonisReleased;releaseAllResourcesdiscarding pending) —instrumented, 0 drops in 659 passes + the loss run.
NoSpillingHandler.recover/getBuffer— byte-identical to base.RemoteInputChannel.checkpointStarted/checkReadability/onBufferflag-off — = base.Why it's confounded for commit bisection
Non-monotonic: c1/phase1 use an always-on
LocalInputChannel.recoveredBuffersmigration (path A);phase 2 replaces it with the
needsRecovery-gated push path, so HEAD flag-off usesRecoveredInputChannel(path B). c1's bug ≠ HEAD's bug. And phases 2–3 don't compile standalone; phase 4 is ~80%-crash broken —
so HEAD's path-B loss localizes only to the phase 2–5 refactor.
Top suspects for the HEAD bug (recovery orchestration timing)
SingleInputGate.convertRecoveredInputChannels— the 2-phase lock reorder ("convert +releaseAllResourcesoutside the lock, then swap under the lock"). A swap-ordering race here matchesall symptoms (rare, contention-sensitive, variable size, no per-path drop).
StreamTaskrecovery wiring — channel-IO executor,recoverySetupCompleteFuture, lazygetRecoveryCheckpointTrigger.Asks
(the queue-split commit shipped with none; a deterministic unit test would also catch c1's path-A bug).
investigate the recovery hangs (the
upstreamReadydrain-liveness / B1 concern).Full investigation:
AI_CONCLUSION_FLINK-38544-dataloss.mdin the main checkout.