refactor: msg pool to make more structured part 2#7006
refactor: msg pool to make more structured part 2#7006akaladarshi wants to merge 6 commits intomainfrom
Conversation
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughIntroduces ChangesMessage Pool State Consolidation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
dd74a63 to
a86d8c4
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/message_pool/msgpool/local_store.rs`:
- Around line 26-28: The add() method currently unconditionally appends
resolved_from to self.local_addrs causing duplicates; change add() to avoid
duplicates by inserting only if the address is not already present (e.g., check
self.local_addrs.read().contains(&resolved_from) or convert local_addrs to a
HashSet and insert), so known_local_addrs() no longer grows per-message and
republish_pending_messages() won't re-resolve the same sender repeatedly; update
any code that assumes a Vec to handle the new container if you switch to
HashSet.
In `@src/message_pool/msgpool/mod.rs`:
- Around line 275-284: The republish trigger is using
RepublishState::mark_republished (which inserts) causing the logic to wake on
new CIDs instead of on CIDs already republished; change the check to a read-only
membership test by calling a new or existing RepublishState::was_republished
(implement it to return republished.contains(cid) without mutating state) and
use that in both loops (the branches around mpool_ctx.remove_from_selected_msgs
and the repub flag) so you only set repub = true when the CID was already in the
republished set.
In `@src/message_pool/msgpool/msg_pool.rs`:
- Around line 485-493: The load_local() implementation iterates
LocalStore::snapshot_msgs() (a HashSet) in non-deterministic order which causes
add() to fail with sequencing errors (SequenceTooLow, NonceGap,
DuplicateSequence) and may silently drop messages; fix by collecting
snapshot_msgs() into a vector, sort it deterministically by sender and
message().sequence before iterating, then call self.add(...) for each; update
the add() error handling in the closure used in load_local() so SequenceTooLow
still triggers local.remove_msg(&k) but other errors are either logged/warned
(including error kind) and left in local_msgs (or retried) rather than silently
ignored, referencing load_local, LocalStore::snapshot_msgs, add,
local.remove_msg, and the Error variants
SequenceTooLow/NonceGap/DuplicateSequence.
In `@src/message_pool/msgpool/republish.rs`:
- Around line 39-44: The trigger() method currently uses
self.trigger.send_async(()).await which can await and block head_change() when
the 4-slot wakeup buffer is full; replace the await send with a non-blocking
self.trigger.try_send(()) and treat a Full error as a no-op (return Ok(()))
because a full buffer already indicates a pending wake, while mapping other
errors into Error::Other with the error details. Keep the function signature and
callers (head_change(), republish_pending_messages()) unchanged; only change
send_async() -> try_send() and handle TrySendError::Full by dropping the signal
and returning Ok(()) while converting other TrySendError variants into the
existing Error::Other format.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 36a9a2fb-5de4-4a0e-85d3-0e9d2a8e6759
📒 Files selected for processing (5)
src/message_pool/msgpool/local_store.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/republish.rssrc/message_pool/msgpool/selection.rs
Codecov Report❌ Patch coverage is Additional details and impacted files
... and 25 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/message_pool/msgpool/republish.rs (1)
38-42:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
trigger()should treat a full channel as success, not an error.The current implementation maps all
try_senderrors (includingFull) to an error. However, a full buffer simply means the republish task is already scheduled to wake—the signal should be dropped silently rather than failinghead_change().Proposed fix
pub(in crate::message_pool) fn trigger(&self) -> Result<(), Error> { - self.trigger - .try_send(()) - .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) + match self.trigger.try_send(()) { + Ok(()) | Err(flume::TrySendError::Full(())) => Ok(()), + Err(flume::TrySendError::Disconnected(())) => { + Err(Error::Other("Republish receiver dropped".to_owned())) + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/message_pool/msgpool/republish.rs` around lines 38 - 42, The trigger() method currently maps all try_send errors to Error::Other; change it to treat a Full error as success (drop the signal silently) and only return an Err for Disconnected (or other non-Full) failures. Locate the pub(in crate::message_pool) fn trigger(&self) and adjust the try_send error handling so that match/if distinguishes tokio::sync::mpsc::error::TrySendError::Full => Ok(()) and returns Error::Other(...) for the disconnected case, ensuring callers like head_change() no longer fail when the channel buffer is full.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@src/message_pool/msgpool/republish.rs`:
- Around line 38-42: The trigger() method currently maps all try_send errors to
Error::Other; change it to treat a Full error as success (drop the signal
silently) and only return an Err for Disconnected (or other non-Full) failures.
Locate the pub(in crate::message_pool) fn trigger(&self) and adjust the try_send
error handling so that match/if distinguishes
tokio::sync::mpsc::error::TrySendError::Full => Ok(()) and returns
Error::Other(...) for the disconnected case, ensuring callers like head_change()
no longer fail when the channel buffer is full.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 6e5f7773-6a80-40ca-857e-c3ffe9e85a90
📒 Files selected for processing (3)
src/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/republish.rs
Summary of changes
Changes introduced in this pull request:
This PR is part 2 of restructuring of msg pool, it contains:
This change should be applied on top of the refactor: msg pool to make more structured #6965
Next part will have major changes:
MessagePoolitself rather than each individual field, this will allow us to:headchangetrigger which will become part of theMessagePool, instead of being a free function with unlimited paramsMessagePool, instead of being a free function with unlimited paramsReference issue to close (if applicable)
Part of #7010
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit