chore: Move flow expiration from PQ to timers#1327
Conversation
There was a problem hiding this comment.
Pull request overview
This PR replaces the flow-table expiration mechanism (thread-local PQ + ExpirationsNF pipeline stage) with per-flow tokio timers that mark flows Expired, relying on lazy cleanup in lookup() / drain_stale().
Changes:
- Remove
ExpirationsNFand the thread-local priority-queue expiration machinery. - Add per-flow tokio timer spawning on insert, plus lazy time-based expiration in
lookup()and proactive stale draining on size threshold. - Update dataplane pipeline composition, tests, docs, and dependencies to match the new expiration model.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| nat/src/portfw/test.rs | Removes ExpirationsNF from the test pipeline and uses FlowTable::default(). |
| flow-entry/src/flow_table/thread_local_pq.rs | Deletes the thread-local PQ implementation used for expirations. |
| flow-entry/src/flow_table/table.rs | Stores Arc<FlowInfo> directly; spawns per-flow tokio timers; adds lazy expiration in lookup() and drain_stale(); updates tests. |
| flow-entry/src/flow_table/nf_lookup.rs | Updates tests to use tokio timing and removes the expirations stage dependency. |
| flow-entry/src/flow_table/nf_expirations.rs | Deletes ExpirationsNF. |
| flow-entry/src/flow_table/mod.rs | Removes module exports for expirations/PQ; keeps FlowLookup/FlowTable exports. |
| flow-entry/src/flow_table/display.rs | Simplifies display logic now that table values are strong Arcs. |
| flow-entry/src/flow_table/README.md | Updates documentation to describe tokio timers + lazy cleanup design. |
| flow-entry/Cargo.toml | Drops priority-queue/thread_local; adds tokio (and dev features for async tests). |
| dataplane/src/packet_processor/mod.rs | Removes ExpirationsNF from the router pipeline. |
| Cargo.lock | Removes priority-queue from flow-entry’s dependency set and adds tokio. |
flow-entry/src/flow_table/table.rs
Outdated
| if tokio::runtime::Handle::try_current().is_ok() { | ||
| let fi = val.clone(); | ||
| tokio::task::spawn(async move { | ||
| loop { | ||
| let deadline = fi.expires_at(); |
There was a problem hiding this comment.
confidence: 8
tags: [logic, other]insert_common spawns a new tokio task on every call whenever a runtime is present. This means repeated reinsert() calls (or inserting the same Arc<FlowInfo> again under the same key) will create multiple concurrent timers for the same flow, which can lead to unbounded task growth and extra wakeups.
Consider deduplicating timer creation (e.g., skip spawning when the replaced value is the same Arc via Arc::ptr_eq, or track a “timer started” flag per FlowInfo).
| tokio::task::spawn(async move { | ||
| loop { | ||
| let deadline = fi.expires_at(); | ||
| tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await; | ||
| let new_deadline = fi.expires_at(); | ||
| if new_deadline > deadline { | ||
| // Deadline was extended (e.g. by StatefulNat); sleep again. | ||
| continue; | ||
| } | ||
| fi.update_status(FlowStatus::Expired); | ||
| break; | ||
| } | ||
| }); |
There was a problem hiding this comment.
confidence: 7
tags: [logic, other]The per-flow timer task holds an Arc<FlowInfo> until the deadline elapses, even if the entry is removed from the DashMap earlier (e.g., via lookup() removing Cancelled/Expired entries or drain_stale). With many flows and long timeouts this can keep a large number of FlowInfo allocations (and their timer tasks) alive longer than necessary.
If early cleanup matters, consider wiring a cancellation mechanism (e.g., store/abort a JoinHandle, use a cancellation token/notify in FlowInfo, or have a single shared reaper task rather than one task per flow).
| // Entries are all gone (removed by per-flow tokio timers) | ||
| let num_entries = flow_table.len().unwrap(); |
There was a problem hiding this comment.
confidence: 9
tags: [docs, style]This test comment says entries are “removed by per-flow tokio timers”, but the current implementation only marks flows as Expired in the timer and relies on lazy removal (e.g., lookup() / drain_stale) to actually delete DashMap entries. Consider rewording this to reflect the real behavior, or add an explicit cleanup step if the test intends to assert removal rather than just expiration.
| #[allow(clippy::len_without_is_empty)] | ||
| /// Tell how many flows are in the table if it can be locked | ||
| /// This is mostly for testing | ||
| /// Returns the number of *active* (non-expired, non-cancelled) flows in the table. | ||
| /// This is mostly for testing. | ||
| pub fn len(&self) -> Option<usize> { | ||
| let table = self.table.try_read().ok()?; | ||
| Some(table.len()) | ||
| Some( | ||
| table | ||
| .iter() | ||
| .filter(|e| e.value().status() == FlowStatus::Active) | ||
| .count(), | ||
| ) |
There was a problem hiding this comment.
confidence: 8
tags: [api_design]FlowTable::len() now does an O(n) scan and returns only the count of Active entries (not the raw map size). Since the method name strongly implies a cheap “number of entries” operation, this is easy to misuse in non-test code (e.g., stats/telemetry paths) and can produce surprising results.
Consider either renaming this to something like active_len() (and/or adding a separate raw_len()), or making the behavior more explicit in the API so callers don’t accidentally rely on len() for map size/perf-sensitive code.
0dfae20 to
27dedc9
Compare
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
27dedc9 to
a08d109
Compare
| // Proactively purge stale entries when the raw table size exceeds the threshold. | ||
| // This bounds memory growth when flows expire faster than they are looked up, | ||
| // since expired entries otherwise accumulate in the `DashMap` until a lookup hits them. | ||
| let raw_len = table.len(); | ||
| if raw_len > self.reap_threshold { | ||
| warn!( | ||
| "The number of flows ({raw_len}) exceeds {}. Reaping stale entries...", | ||
| self.reap_threshold | ||
| ); | ||
| Self::drain_stale_with_read_lock(&table); | ||
| } |
There was a problem hiding this comment.
confidence: 9
tags: [logic]insert_common triggers a full-table drain_stale_with_read_lock() scan on every insert once table.len() exceeds reap_threshold. If the table is legitimately large but mostly Active (so drain_stale removes little or nothing), this becomes O(n) work per insert and can degrade to O(n²) behavior under load.
Consider adding hysteresis/cooldown so the expensive scan only happens when the size crosses the threshold (from below), or at most once per time window / per X inserts, or only if a previous scan actually removed entries (and otherwise back off).
There was a problem hiding this comment.
The inserts_while_large: AtomicUsize counter + DRAIN_EVERY_N_INSERTS = 1000 constant were added specifically for this. The drain only fires on insert 0, 1000, 2000, etc while the table stays over threshold, turning O(n) per insert into O(n/1000) amortized.
| // Spawn a per-flow expiration timer when running inside a tokio runtime. | ||
| // The timer marks the flow as Expired; the `DashMap` entry is cleaned up | ||
| // lazily the next time lookup() is called for this key. | ||
| // In non-tokio contexts (shuttle tests, sync unit tests) the guard fails | ||
| // gracefully and lazy time-checking in `lookup` handles expiration instead. | ||
| // | ||
| // Only spawn a timer for a genuinely new Arc. If the same Arc is being | ||
| // reinserted (e.g. via reinsert()), its existing timer loop already handles | ||
| // extended deadlines via the `new_deadline > deadline` re-check, so spawning | ||
| // a second task would be redundant and would cause unbounded task growth. | ||
| // | ||
| // The timer holds a Weak<FlowInfo> rather than Arc<FlowInfo> and drops the | ||
| // upgrade before sleeping, so the timer task does not extend the lifetime of | ||
| // the FlowInfo allocation. Once the DashMap entry is removed (drain_stale, | ||
| // lookup lazy cleanup, or explicit remove) and all other callers drop their | ||
| // Arc clones, the allocation is freed even if the timer has not yet woken up. | ||
| // The status check after each sleep avoids redundant work for flows that were | ||
| // already Cancelled before their deadline elapsed. | ||
| let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val)); | ||
| if need_timer && tokio::runtime::Handle::try_current().is_ok() { | ||
| let fi_weak = Arc::downgrade(val); | ||
| tokio::task::spawn(async move { | ||
| loop { | ||
| // Upgrade to check status and read the deadline. If the Arc has | ||
| // already been dropped (no DashMap entry, no in-flight holders), | ||
| // there is nothing left to expire. | ||
| let Some(fi) = fi_weak.upgrade() else { break }; | ||
| if fi.status() != FlowStatus::Active { | ||
| // Already Cancelled or Expired by another path; nothing to do. | ||
| break; | ||
| } | ||
| let deadline = fi.expires_at(); | ||
| // Drop the strong ref before sleeping so this task does not | ||
| // prevent the FlowInfo allocation from being freed. | ||
| drop(fi); | ||
| tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await; | ||
| // Re-acquire after sleeping and re-check before committing. | ||
| let Some(fi) = fi_weak.upgrade() else { break }; | ||
| if fi.status() != FlowStatus::Active { | ||
| break; | ||
| } | ||
| let new_deadline = fi.expires_at(); | ||
| if new_deadline > deadline { | ||
| // Deadline was extended (e.g. by StatefulNat); sleep again. | ||
| continue; | ||
| } | ||
| fi.update_status(FlowStatus::Expired); | ||
| break; | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
confidence: 8
tags: [logic]Spawning a dedicated tokio::task per inserted flow can scale poorly: at high flow cardinalities this creates a task count on the order of the number of flows, increasing memory usage and scheduler overhead (and potentially hitting runtime limits). Given the flow table’s default shard count (1024) and aggressive reap threshold (1,000,000), this seems plausible in production.
If large flow counts are expected, it would be safer to use a shared expiration mechanism (e.g., a timer wheel / DelayQueue / per-shard expiration loop) or to add an explicit upper bound/feature flag/config knob to disable per-flow timers and fall back to lazy expiry + periodic drain.
There was a problem hiding this comment.
AFAIR Tokio is designed for things like that. 1M flows with be reasonable in terms of memory usage but not sure if so clinical.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.
You can also share your feedback on Copilot code review. Take the survey.
| let to_remove: Vec<FlowKey> = table | ||
| .iter() | ||
| .filter_map(|entry| { | ||
| let val = entry.value(); | ||
| match val.status() { | ||
| FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()), | ||
| FlowStatus::Active if val.expires_at() <= now => { | ||
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | ||
| val.update_status(FlowStatus::Expired); | ||
| Some(*entry.key()) | ||
| } | ||
| FlowStatus::Active => None, | ||
| } | ||
| }) | ||
| .collect(); | ||
| let removed = to_remove.len(); | ||
| for key in &to_remove { | ||
| table.remove(key); | ||
| } |
There was a problem hiding this comment.
confidence: 8
tags: [logic]drain_stale_with_read_lock() first collects FlowKeys to remove and then removes by key. Since the outer lock is a shared read lock, inserts/replacements for the same key can happen concurrently; this creates an ABA race where a freshly-inserted active flow under a reused key can be removed because the key was collected when an older entry was stale.
Consider removing conditionally (re-check status/deadline at removal time) or using a DashMap API that can filter/retain under the shard lock to avoid removing a different value than the one you inspected.
| let to_remove: Vec<FlowKey> = table | |
| .iter() | |
| .filter_map(|entry| { | |
| let val = entry.value(); | |
| match val.status() { | |
| FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()), | |
| FlowStatus::Active if val.expires_at() <= now => { | |
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | |
| val.update_status(FlowStatus::Expired); | |
| Some(*entry.key()) | |
| } | |
| FlowStatus::Active => None, | |
| } | |
| }) | |
| .collect(); | |
| let removed = to_remove.len(); | |
| for key in &to_remove { | |
| table.remove(key); | |
| } | |
| let mut removed = 0usize; | |
| // Use DashMap::retain so that the decision to remove an entry is made | |
| // while holding the shard lock for that entry, avoiding an ABA race on | |
| // keys that might be concurrently reused. | |
| table.retain(|_key, val| { | |
| let is_stale = match val.status() { | |
| FlowStatus::Expired | FlowStatus::Cancelled => true, | |
| FlowStatus::Active if val.expires_at() <= now => { | |
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | |
| val.update_status(FlowStatus::Expired); | |
| true | |
| } | |
| FlowStatus::Active => false, | |
| }; | |
| if is_stale { | |
| removed += 1; | |
| false | |
| } else { | |
| true | |
| } | |
| }); |
| // wait 3 secs. Flow 1 should have been expired by its tokio timer | ||
| tokio::time::sleep(Duration::from_secs(3)).await; | ||
|
|
||
| assert_eq!(flow_table.len().unwrap(), 1); | ||
| assert_eq!(flow_table.active_len().unwrap(), 1); |
There was a problem hiding this comment.
confidence: 7
tags: [logic]Similarly, this fixed sleep(Duration::from_secs(3)) assumes the per-flow expiration task has executed within that window. On slow/loaded runners this can be flaky.
Consider waiting/polling for the expected condition (e.g., active_len() == 1) with a timeout instead of relying on an exact sleep duration.
| let table = self.table.read().unwrap(); | ||
| let item = table.get(flow_key)?.upgrade(); | ||
| let Some(item) = item else { | ||
| debug!( | ||
| "lookup: Removing flow key {:?}, found empty weak reference", | ||
| flow_key | ||
| ); | ||
| Self::remove_with_read_lock(&table, flow_key); | ||
| return None; | ||
| }; | ||
| let item = table.get(flow_key)?.value().clone(); | ||
| let status = item.status(); |
There was a problem hiding this comment.
confidence: 8
tags: [logic]lookup() clones the Arc<FlowInfo> out of the DashMap and later (on the expired/cancelled paths) calls remove_with_read_lock() by key. Because the DashMap entry guard is dropped before the removal, another thread can replace the entry for the same key in between, and this removal can delete the new flow instead of the stale one.
Consider doing an atomic/conditional removal (e.g., remove only if the current value is still the one you examined, or re-check staleness under the shard lock before removing).
The previous expiration mechanism used a thread-local
PriorityQueueplus anExpirationsNFpipeline stage that had to run on every packet batch to process expired entries.New approach spawns per-flow timer. It sleeps till it's marked as
FlowStatus::Expiredwhen the deadline is confirmed.DashMapcleanup is intentionally deferred and it's no longer storing weak refs butArc<FlowInfo>.