persist: replace file-per-entry with WAL and refactor into generic indexedWAL#3044
persist: replace file-per-entry with WAL and refactor into generic indexedWAL#3044wen-coding wants to merge 2 commits intomainfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3044 +/- ##
==========================================
- Coverage 58.59% 58.56% -0.03%
==========================================
Files 2096 2092 -4
Lines 173372 173013 -359
==========================================
- Hits 101583 101323 -260
+ Misses 62746 62668 -78
+ Partials 9043 9022 -21
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| for lane, first := range laneFirsts { | ||
| lw, ok := bp.lanes[lane] | ||
| if !ok { | ||
| continue // no WAL yet; PersistBlock will create one lazily | ||
| } | ||
| lane, fileN, err := parseBlockFilename(entry.Name()) | ||
| if err != nil { | ||
| firstBN, ok := lw.firstBlockNum().Get() | ||
| if !ok || first <= firstBN { | ||
| continue | ||
| } | ||
| first, ok := laneFirsts[lane] | ||
| if ok && fileN >= first { | ||
| continue | ||
| walIdx := lw.firstIdx + uint64(first-firstBN) | ||
| if err := lw.TruncateBefore(walIdx); err != nil { | ||
| return fmt.Errorf("truncate lane %s WAL before block %d: %w", lane, first, err) | ||
| } | ||
| path := filepath.Join(bp.dir, entry.Name()) | ||
| if err := os.Remove(path); err != nil && !os.IsNotExist(err) { | ||
| logger.Warn("failed to delete block file", "path", path, "err", err) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map
| dbwal.Config{ | ||
| WriteBufferSize: 0, // synchronous writes | ||
| WriteBatchSize: 1, // no batching | ||
| FsyncEnabled: true, |
There was a problem hiding this comment.
Be aware of the latency impact here, if putting write in critical path, it would introduce some noticeable latency. Would recommend synchronous write + nofsync for perf reason, fsync does provide stronger guarantees, but the chance of all validators hitting power off at the same time is pretty rare
There was a problem hiding this comment.
That's reasonable, changed
There was a problem hiding this comment.
@yzang2019 Talked with @pompon0 about this, since lane block QC is only f+1, theoretically one OS crash can screw us. We are changing all lanes and commitqc writes to happen concurrently, so latency is less of a concern. Is it okay if I change the Fsync back to true here?
| // Used when all entries are stale (e.g. the prune anchor advanced past | ||
| // everything persisted). | ||
| // | ||
| // TODO: sei-db/wal doesn't expose tidwall/wal's AllowEmpty option, so there's |
There was a problem hiding this comment.
Is the plan to expose that in a separate PR? It should be pretty simple to add though?
There was a problem hiding this comment.
PR 3049 merged and switched to TruncateAll() here.
| if err := w.wal.Write(entry); err != nil { | ||
| return err | ||
| } | ||
| if w.firstIdx == 0 { |
There was a problem hiding this comment.
Recommend using Count() == 0 instead of relying on firstIdx == 0 as a sentinel for "WAL is empty" incase the assumption of wal starting index from 0 is not valid in the future if we switch the wal library
| return nil, nil | ||
| } | ||
| entries := make([]T, 0, w.Count()) | ||
| err := w.wal.Replay(w.firstIdx, w.nextIdx-1, func(_ uint64, entry T) error { |
There was a problem hiding this comment.
There's no validation that len(entries) == w.Count() after a successful replay. If Replay succeeds but returns fewer entries than expected (e.g., the underlying WAL silently truncated its tail on open due to corruption), ReadAll would return a short slice with no error.
| return nil | ||
| } | ||
| for _, lw := range bp.lanes { | ||
| if err := lw.Close(); err != nil { |
There was a problem hiding this comment.
If one lane's WAL fails to close, the remaining lanes are never closed. Use errors.Join to accumulate errors and close all lanes
c0727e6 to
422aa8f
Compare
| if ok && fileN >= first { | ||
| if first >= lw.nextBlockNum { | ||
| // Anchor advanced past all persisted blocks for this lane. | ||
| if err := lw.TruncateAll(); err != nil { |
There was a problem hiding this comment.
is truncation synchronous? How expensive it is?
There was a problem hiding this comment.
TruncateAll is synchronous, it removes the segment files and then change some internal pointers, not too expensive. I don't expect this to happen very often though. In practice if every validator keeps emitting blocks, there should always be 1 or 2 lane blocks which are generated but not in AppQC yet. Unless they do "generate a block then wait for a while, generate another block then wait for a while", are you imagining that as an attack?
There was a problem hiding this comment.
I'm interested in a happy path. It would be nice to make sure that removing blocks does not affect the latency of persisting blocks. What I want to avoid is that we do synchronously: remove block -> fsync -> insert block -> fsync (if we have a steady stream of 1 insertion/removal of block per batch) in which case the latency of insertion is 2x larger than it should be (2 fsyncs).
aeeddbf to
8de2a83
Compare
b6be8e4 to
273ab80
Compare
| } | ||
| } | ||
| last := proposals[len(proposals)-1] | ||
| s.markBlockPersisted(lane, last.Msg().Block().Header().BlockNumber()+1) |
There was a problem hiding this comment.
the whole point of persisting blocks one by one (although we will need to eventually benchmark that) is so that validator can sign a LaneVote ASAP. Hence markBlockPersisted should be moved inside the for loop. For consistency, same could be done to the CommitQC persisting loop, although there might be no real gains there.
There was a problem hiding this comment.
That's a good point, changed.
| for _, qc := range batch.commitQCs { | ||
| if err := pers.commitQCs.PersistCommitQC(qc); err != nil { | ||
| return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) | ||
| if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index(), utils.Some(anchor.CommitQC)); err != nil { |
There was a problem hiding this comment.
nit: I know we are kind of optimizing in the void without benchmarks, but there are still some disk writes here which are unnecessarily sequential: commitQCs.DeleteBefore can be parallel to blocks.DeleteBefore. CommitQCs.persist can be parallel to blocks.DeleteBefore. In fact each DeleteBefore/persist for each WAL is independent and only Anchor write needs to happen sequentially to everything else. Also we might want to persist first, then do DeleteBefore. Also DeleteBefore doesn't require fsync (I'm not sure how WAL behaves there).
There was a problem hiding this comment.
In blocks, we are mainly doing DeleteBefore before persisting for the "after the anchor, the commitQC must be contiguous check" as a defense in depth. If you think that is unnecessary and guaranteed by caller, I can remove it. But then one screw up and our indices calculation would be wrong. (Of course I can also maintain a map of WAL index to commitQC RoadIndex and discard the late commitQC which tried to fill a hole.)
What do you think?
There was a problem hiding this comment.
To answer your other question, DeleteBefore uses TruncateFront, it always fsyncs the critical new segment file via atomicWrite.
There was a problem hiding this comment.
Reorganized the code so we do these in parallel now:
- truncate and add blocks per lane
- truncate and add commitqcs
| blocks []*types.Signed[*types.LaneProposal] | ||
| commitQCs []*types.CommitQC | ||
| pruneAnchor utils.Option[*PruneAnchor] | ||
| laneFirsts map[types.LaneID]types.BlockNumber |
There was a problem hiding this comment.
nit: laneFirsts are implied by the anchor, they do not have to be computed in the collectPersistBatch function.
| type BlockPersister struct { | ||
| dir string // full path to the blocks/ subdirectory; empty when noop | ||
| noop bool | ||
| mu sync.RWMutex |
There was a problem hiding this comment.
nit: please use utils.RWMutex to make it explicit what does the mutex protect.
There was a problem hiding this comment.
in particular dir is immutable, so there is no need to wrap it with the mutex.
| // is derived: nextBlockNum - Count(). | ||
| // mu serializes all writes and truncations on this lane. | ||
| type laneWAL struct { | ||
| mu sync.Mutex |
There was a problem hiding this comment.
nit: please use utils.Mutex for explicit mutex protection. In particular, since the whole laneWAL needs to be mutex protected you can write:
type BlockPersister struct {
dir Option[string]
lanes utils.RWMutex[map[LaneID]*utils.Mutex[*laneWAL]]
}
There was a problem hiding this comment.
Hmm, this looks a bit weird, since laneWAL never changes after initialization, we are protecting the members of laneWAL, not the laneWAL value itself.
I changed to using utils.Mutex inside laneWAL.
| func (bp *BlockPersister) getOrCreateLane(lane types.LaneID) (*laneWAL, error) { | ||
| dir, ok := bp.dir.Get() | ||
| if !ok { | ||
| return nil, nil // no-op persister (persistence disabled) |
There was a problem hiding this comment.
nit: can we return an error instead, and just make each call check noop mode directly? I would like us to avoid passing nil as a valid result.
| type CommitQCPersister struct { | ||
| dir string // full path to the commitqcs/ subdirectory; empty when noop | ||
| noop bool | ||
| mu sync.Mutex |
There was a problem hiding this comment.
nit" ditto, here you probably will need an inner type to wrap it in utils.Mutex.
| // is derived directly from the anchor, making the safety invariant | ||
| // explicit: we only truncate entries the on-disk anchor covers. | ||
| if anchor, ok := batch.pruneAnchor.Get(); ok { | ||
| if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil { |
There was a problem hiding this comment.
afaiu pruneAnchor will still use the 2-file persister. Is this supposed to stay like this? If so, then we perhaps need to productionize the 2-file persister more - like adding a checksum in the file would be nice.
There was a problem hiding this comment.
I actually did that on the first draft, but felt WAL is not the best fit here. We only need one most recent snapshot at all times, there is no history at all.
If you agree with that conclusion, I could of course productionize the 2-file persister more, but I feel that should be its own PR, because this PR is mostly about WAL replacing local files.
34fac21 to
734094e
Compare
Replace file-per-block and file-per-commitQC persistence with sei-db/wal, extract common WAL mechanics into a generic indexedWAL[T], and expose per-lane truncate-then-append APIs so avail/state controls all parallelism. - wal.go: generic indexedWAL[T] with codec[T], monotonic index tracking, Write/ReadAll/TruncateBefore/TruncateAll/Close. AllowEmpty=true, fsync enabled, post-replay count check for silent data loss detection. - blocks.go: one WAL per lane in blocks/<hex_lane_id>/ subdirectories. PersistBlock with lazy lane creation (double-checked locking) and strict contiguity. PersistLaneAfterAnchor for truncate-then-append per lane. truncateLaneWAL derives prune cursors from CommitQC with defense-in-depth verification. Removed public DeleteBefore/LaneIDsWithWAL. - commitqcs.go: single WAL in commitqcs/, linear RoadIndex-to-WAL-index mapping. PersistAfterAnchor for truncate-then-append with internal crash-recovery (re-persists anchor QC after TruncateAll). deleteBefore unexported. Removed public DeleteBefore/CommitQCPersistBatch. - state.go: runPersist fans out via scope.Parallel — one task for commit-QCs, one per committee lane for blocks. Anchor persisted sequentially first. anchorQC typed as utils.Option[*types.CommitQC]. blocksByLane initialized with all static committee lanes. No goroutines inside persist package. - Both persisters are internally thread-safe (utils.Mutex/RWMutex). afterEach callbacks are infallible (func(T), not func(T) error). No-op persisters skip disk I/O but advance cursors via afterEach. Made-with: Cursor
734094e to
6024bba
Compare
The old comment warned about interleaving "internal truncation and PersistBlock" as separate caller-facing operations. Since callers now use MaybePruneAndPersistLane exclusively, restate the constraint as: don't call it concurrently for the same lane. Made-with: Cursor
Summary
Replace file-per-block and file-per-commitQC persistence with
sei-db/wal, extract common WAL mechanics into a genericindexedWAL[T], and expose per-lane truncate-then-append APIs so the caller (avail/state) controls all parallelism.New:
wal.go— genericindexedWAL[T]Generic wrapper around
sei-db/walwith monotonic index tracking, typed serialization viacodec[T], and lifecycle methods:Write,ReadAll,TruncateBefore(with verify callback and bounds validation),TruncateAll,FirstIdx,Count,Close. EnforcesINVARIANT: firstIdx <= nextIdx. Opens withAllowEmpty: true(critical for correct empty-log arithmetic).ReadAllincludes a post-replay count check to detect silent data loss. Fsync enabled for additional durability beyond the prune anchor.blocks.go— per-lane WAL persistenceblocks/<hex_lane_id>/subdirectories with independent per-lane truncation.PersistBlockcreates lane WALs lazily viagetOrCreateLane(double-checked locking underutils.RWMutex) and enforces strict contiguous block numbers.MaybePruneAndPersistLane(new): truncate-then-append API for a single lane. Truncates the lane WAL below the anchor's block range, then appends proposals in order, callingafterEachper entry. Does not spawn goroutines — the caller schedules parallelism.truncateLaneWAL(private): derives the per-lane prune cursor from the CommitQC, verifying block numbers before truncating via a defense-in-depth callback. HandlesTruncateAllwhen the anchor advanced past all persisted blocks and re-anchorsnextBlockNum.loadAlldetects gaps at replay time.close()unexported — only used by tests and constructor error cleanup; useserrors.Join.DeleteBefore,DeleteBeforeLane,LaneIDsWithWAL.commitqcs.go— single CommitQC WALcommitqcs/, linear RoadIndex-to-WAL-index mapping viaFirstIdx().PersistCommitQCsilently ignores duplicates (idx < next) for idempotent startup; rejects gaps (idx > next).MaybePruneAndPersist(new): truncate-then-append API. Truncates the WAL below the anchor's road index (handling TruncateAll + cursor advance for anchor-past-all), re-persists the anchor QC for crash recovery, then appends new QCs withafterEachcallback.deleteBeforeunexported — only called internally byMaybePruneAndPersist.Close()is idempotent.loadAllCommitQCsdetects gaps at replay time.DeleteBefore,CommitQCPersistBatchstruct.state.go— orchestration and parallelismNewStatestartup: prunes stale WAL entries viaMaybePruneAndPersistLane(per committee lane) andMaybePruneAndPersistwith anchor-only (nil proposals), replacing the oldDeleteBeforecalls.runPersistwrite order:scope.Parallelfans out: one task forMaybePruneAndPersist(commit-QCs), one task per committee lane forMaybePruneAndPersistLane(blocks). No early cancellation; first error returned after all tasks finish. Each path invokesmarkCommitQCsPersisted/markBlockPersistedper entry so voting unblocks ASAP.anchorQCtyped asutils.Option[*types.CommitQC]— drives both commit-QC and block WAL truncation.blocksByLaneinitialized with all static committee lanes (ensuring truncation runs even for lanes with no new blocks).inner.gonextBlockToPersistdoc clarifies per-entry notification frequency.Design decisions
stateDiris None, disk I/O is skipped but cursors advance viaafterEachcallbacks, preventingrunPersistfrom spinning.go.modrequires Go 1.25+, so per-iteration loop variables are guaranteed — no explicitv := vin closures.Concurrency design
Both
BlockPersisterandCommitQCPersisterare internally thread-safe. Mutable state is protected withutils.Mutex/utils.RWMutex.BlockPersister — two-level locking:
utils.RWMutex[map[LaneID]*laneWAL]on the lanes map:getOrCreateLaneuses double-checked locking;close()takes write Lock.utils.Mutex[*laneWALState]per lane: serializes writes and truncations.PersistBlockreleases the map RLock before acquiring the per-lane lock, so writes to different lanes are fully parallel.The caller (state.go) must not interleave truncation and
PersistBlockon the same lane from different goroutines;MaybePruneAndPersistLaneprovides the safe truncate-then-append API, andscope.ParallelinrunPersistschedules one task per lane.CommitQCPersister —
utils.Mutex[*commitQCState]protecting the WAL handle and cursor.deleteBeforecallspersistCommitQCto re-persist the anchor QC without releasing the lock.Test plan
-race) on persist and avail packagesTestState(no-op persist) andTestStateWithPersistence(disk persist with prune cycles)TestStateRestartFromPersisted(full restart from WAL data)