Skip to content

chore: Move flow expiration from PQ to timers#1327

Open
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/new-flow-timers
Open

chore: Move flow expiration from PQ to timers#1327
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/new-flow-timers

Conversation

@sergeymatov
Copy link
Contributor

The previous expiration mechanism used a thread-local PriorityQueue plus an ExpirationsNF pipeline stage that had to run on every packet batch to process expired entries.

New approach spawns per-flow timer. It sleeps till it's marked as FlowStatus::Expired when the deadline is confirmed. DashMap cleanup is intentionally deferred and it's no longer storing weak refs but Arc<FlowInfo>.

@sergeymatov sergeymatov requested a review from a team as a code owner March 10, 2026 08:37
@sergeymatov sergeymatov requested review from Copilot and daniel-noland and removed request for a team March 10, 2026 08:37
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the flow-table expiration mechanism (thread-local PQ + ExpirationsNF pipeline stage) with per-flow tokio timers that mark flows Expired, relying on lazy cleanup in lookup() / drain_stale().

Changes:

  • Remove ExpirationsNF and the thread-local priority-queue expiration machinery.
  • Add per-flow tokio timer spawning on insert, plus lazy time-based expiration in lookup() and proactive stale draining on size threshold.
  • Update dataplane pipeline composition, tests, docs, and dependencies to match the new expiration model.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
nat/src/portfw/test.rs Removes ExpirationsNF from the test pipeline and uses FlowTable::default().
flow-entry/src/flow_table/thread_local_pq.rs Deletes the thread-local PQ implementation used for expirations.
flow-entry/src/flow_table/table.rs Stores Arc<FlowInfo> directly; spawns per-flow tokio timers; adds lazy expiration in lookup() and drain_stale(); updates tests.
flow-entry/src/flow_table/nf_lookup.rs Updates tests to use tokio timing and removes the expirations stage dependency.
flow-entry/src/flow_table/nf_expirations.rs Deletes ExpirationsNF.
flow-entry/src/flow_table/mod.rs Removes module exports for expirations/PQ; keeps FlowLookup/FlowTable exports.
flow-entry/src/flow_table/display.rs Simplifies display logic now that table values are strong Arcs.
flow-entry/src/flow_table/README.md Updates documentation to describe tokio timers + lazy cleanup design.
flow-entry/Cargo.toml Drops priority-queue/thread_local; adds tokio (and dev features for async tests).
dataplane/src/packet_processor/mod.rs Removes ExpirationsNF from the router pipeline.
Cargo.lock Removes priority-queue from flow-entry’s dependency set and adds tokio.

Comment on lines +196 to +200
if tokio::runtime::Handle::try_current().is_ok() {
let fi = val.clone();
tokio::task::spawn(async move {
loop {
let deadline = fi.expires_at();
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic, other]

insert_common spawns a new tokio task on every call whenever a runtime is present. This means repeated reinsert() calls (or inserting the same Arc<FlowInfo> again under the same key) will create multiple concurrent timers for the same flow, which can lead to unbounded task growth and extra wakeups.

Consider deduplicating timer creation (e.g., skip spawning when the replaced value is the same Arc via Arc::ptr_eq, or track a “timer started” flag per FlowInfo).

Copilot uses AI. Check for mistakes.
Comment on lines +198 to +210
tokio::task::spawn(async move {
loop {
let deadline = fi.expires_at();
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
let new_deadline = fi.expires_at();
if new_deadline > deadline {
// Deadline was extended (e.g. by StatefulNat); sleep again.
continue;
}
fi.update_status(FlowStatus::Expired);
break;
}
});
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 7
tags: [logic, other]

The per-flow timer task holds an Arc<FlowInfo> until the deadline elapses, even if the entry is removed from the DashMap earlier (e.g., via lookup() removing Cancelled/Expired entries or drain_stale). With many flows and long timeouts this can keep a large number of FlowInfo allocations (and their timer tasks) alive longer than necessary.

If early cleanup matters, consider wiring a cancellation mechanism (e.g., store/abort a JoinHandle, use a cancellation token/notify in FlowInfo, or have a single shared reaper task rather than one task per flow).

Copilot uses AI. Check for mistakes.
Comment on lines 167 to 168
// Entries are all gone (removed by per-flow tokio timers)
let num_entries = flow_table.len().unwrap();
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [docs, style]

This test comment says entries are “removed by per-flow tokio timers”, but the current implementation only marks flows as Expired in the timer and relies on lazy removal (e.g., lookup() / drain_stale) to actually delete DashMap entries. Consider rewording this to reflect the real behavior, or add an explicit cleanup step if the test intends to assert removal rather than just expiration.

Copilot uses AI. Check for mistakes.
Comment on lines 335 to +345
#[allow(clippy::len_without_is_empty)]
/// Tell how many flows are in the table if it can be locked
/// This is mostly for testing
/// Returns the number of *active* (non-expired, non-cancelled) flows in the table.
/// This is mostly for testing.
pub fn len(&self) -> Option<usize> {
let table = self.table.try_read().ok()?;
Some(table.len())
Some(
table
.iter()
.filter(|e| e.value().status() == FlowStatus::Active)
.count(),
)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [api_design]

FlowTable::len() now does an O(n) scan and returns only the count of Active entries (not the raw map size). Since the method name strongly implies a cheap “number of entries” operation, this is easy to misuse in non-test code (e.g., stats/telemetry paths) and can produce surprising results.

Consider either renaming this to something like active_len() (and/or adding a separate raw_len()), or making the behavior more explicit in the API so callers don’t accidentally rely on len() for map size/perf-sensitive code.

Copilot uses AI. Check for mistakes.
@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch 3 times, most recently from 0dfae20 to 27dedc9 Compare March 11, 2026 13:14
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

Comment on lines +179 to +189
// Proactively purge stale entries when the raw table size exceeds the threshold.
// This bounds memory growth when flows expire faster than they are looked up,
// since expired entries otherwise accumulate in the `DashMap` until a lookup hits them.
let raw_len = table.len();
if raw_len > self.reap_threshold {
warn!(
"The number of flows ({raw_len}) exceeds {}. Reaping stale entries...",
self.reap_threshold
);
Self::drain_stale_with_read_lock(&table);
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [logic]

insert_common triggers a full-table drain_stale_with_read_lock() scan on every insert once table.len() exceeds reap_threshold. If the table is legitimately large but mostly Active (so drain_stale removes little or nothing), this becomes O(n) work per insert and can degrade to O(n²) behavior under load.

Consider adding hysteresis/cooldown so the expensive scan only happens when the size crosses the threshold (from below), or at most once per time window / per X inserts, or only if a previous scan actually removed entries (and otherwise back off).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@sergeymatov sergeymatov Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inserts_while_large: AtomicUsize counter + DRAIN_EVERY_N_INSERTS = 1000 constant were added specifically for this. The drain only fires on insert 0, 1000, 2000, etc while the table stays over threshold, turning O(n) per insert into O(n/1000) amortized.

Comment on lines +191 to +241
// Spawn a per-flow expiration timer when running inside a tokio runtime.
// The timer marks the flow as Expired; the `DashMap` entry is cleaned up
// lazily the next time lookup() is called for this key.
// In non-tokio contexts (shuttle tests, sync unit tests) the guard fails
// gracefully and lazy time-checking in `lookup` handles expiration instead.
//
// Only spawn a timer for a genuinely new Arc. If the same Arc is being
// reinserted (e.g. via reinsert()), its existing timer loop already handles
// extended deadlines via the `new_deadline > deadline` re-check, so spawning
// a second task would be redundant and would cause unbounded task growth.
//
// The timer holds a Weak<FlowInfo> rather than Arc<FlowInfo> and drops the
// upgrade before sleeping, so the timer task does not extend the lifetime of
// the FlowInfo allocation. Once the DashMap entry is removed (drain_stale,
// lookup lazy cleanup, or explicit remove) and all other callers drop their
// Arc clones, the allocation is freed even if the timer has not yet woken up.
// The status check after each sleep avoids redundant work for flows that were
// already Cancelled before their deadline elapsed.
let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val));
if need_timer && tokio::runtime::Handle::try_current().is_ok() {
let fi_weak = Arc::downgrade(val);
tokio::task::spawn(async move {
loop {
// Upgrade to check status and read the deadline. If the Arc has
// already been dropped (no DashMap entry, no in-flight holders),
// there is nothing left to expire.
let Some(fi) = fi_weak.upgrade() else { break };
if fi.status() != FlowStatus::Active {
// Already Cancelled or Expired by another path; nothing to do.
break;
}
let deadline = fi.expires_at();
// Drop the strong ref before sleeping so this task does not
// prevent the FlowInfo allocation from being freed.
drop(fi);
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
// Re-acquire after sleeping and re-check before committing.
let Some(fi) = fi_weak.upgrade() else { break };
if fi.status() != FlowStatus::Active {
break;
}
let new_deadline = fi.expires_at();
if new_deadline > deadline {
// Deadline was extended (e.g. by StatefulNat); sleep again.
continue;
}
fi.update_status(FlowStatus::Expired);
break;
}
});
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

Spawning a dedicated tokio::task per inserted flow can scale poorly: at high flow cardinalities this creates a task count on the order of the number of flows, increasing memory usage and scheduler overhead (and potentially hitting runtime limits). Given the flow table’s default shard count (1024) and aggressive reap threshold (1,000,000), this seems plausible in production.

If large flow counts are expected, it would be safer to use a shared expiration mechanism (e.g., a timer wheel / DelayQueue / per-shard expiration loop) or to add an explicit upper bound/feature flag/config knob to disable per-flow timers and fall back to lazy expiry + periodic drain.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR Tokio is designed for things like that. 1M flows with be reasonable in terms of memory usage but not sure if so clinical.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.


You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +342 to +360
let to_remove: Vec<FlowKey> = table
.iter()
.filter_map(|entry| {
let val = entry.value();
match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()),
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
Some(*entry.key())
}
FlowStatus::Active => None,
}
})
.collect();
let removed = to_remove.len();
for key in &to_remove {
table.remove(key);
}
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

drain_stale_with_read_lock() first collects FlowKeys to remove and then removes by key. Since the outer lock is a shared read lock, inserts/replacements for the same key can happen concurrently; this creates an ABA race where a freshly-inserted active flow under a reused key can be removed because the key was collected when an older entry was stale.

Consider removing conditionally (re-check status/deadline at removal time) or using a DashMap API that can filter/retain under the shard lock to avoid removing a different value than the one you inspected.

Suggested change
let to_remove: Vec<FlowKey> = table
.iter()
.filter_map(|entry| {
let val = entry.value();
match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()),
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
Some(*entry.key())
}
FlowStatus::Active => None,
}
})
.collect();
let removed = to_remove.len();
for key in &to_remove {
table.remove(key);
}
let mut removed = 0usize;
// Use DashMap::retain so that the decision to remove an entry is made
// while holding the shard lock for that entry, avoiding an ABA race on
// keys that might be concurrently reused.
table.retain(|_key, val| {
let is_stale = match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => true,
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
true
}
FlowStatus::Active => false,
};
if is_stale {
removed += 1;
false
} else {
true
}
});

Copilot uses AI. Check for mistakes.
Comment on lines +228 to +231
// wait 3 secs. Flow 1 should have been expired by its tokio timer
tokio::time::sleep(Duration::from_secs(3)).await;

assert_eq!(flow_table.len().unwrap(), 1);
assert_eq!(flow_table.active_len().unwrap(), 1);
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 7
tags: [logic]

Similarly, this fixed sleep(Duration::from_secs(3)) assumes the per-flow expiration task has executed within that window. On slow/loaded runners this can be flaky.

Consider waiting/polling for the expected condition (e.g., active_len() == 1) with a timeout instead of relying on an exact sleep duration.

Copilot uses AI. Check for mistakes.
Comment on lines 269 to 271
let table = self.table.read().unwrap();
let item = table.get(flow_key)?.upgrade();
let Some(item) = item else {
debug!(
"lookup: Removing flow key {:?}, found empty weak reference",
flow_key
);
Self::remove_with_read_lock(&table, flow_key);
return None;
};
let item = table.get(flow_key)?.value().clone();
let status = item.status();
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

lookup() clones the Arc<FlowInfo> out of the DashMap and later (on the expired/cancelled paths) calls remove_with_read_lock() by key. Because the DashMap entry guard is dropped before the removal, another thread can replace the entry for the same key in between, and this removal can delete the new flow instead of the stale one.

Consider doing an atomic/conditional removal (e.g., remove only if the current value is still the one you examined, or re-check staleness under the shard lock before removing).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants