diff --git a/src/builder.rs b/src/builder.rs index 806c676b3..f4df35313 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::default::Default; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::SystemTime; use std::{fmt, fs}; @@ -47,6 +48,7 @@ use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, MIN_PROBE_AMOUNT_MSAT, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -73,6 +75,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -281,6 +284,7 @@ pub struct NodeBuilder { runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, + probing_config: Option, } impl NodeBuilder { @@ -299,6 +303,8 @@ impl NodeBuilder { let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; + let async_payments_role = None; + let probing_config = None; Self { config, chain_data_source_config, @@ -306,9 +312,10 @@ impl NodeBuilder { liquidity_source_config, log_writer_config, runtime_handle, - async_payments_role: None, + async_payments_role, pathfinding_scores_sync_config, recovery_mode, + probing_config, } } @@ -614,6 +621,23 @@ impl NodeBuilder { self } + /// Configures background probing. + /// + /// Use [`probing::ProbingConfig`] to build the configuration: + /// ```ignore + /// use ldk_node::probing::ProbingConfig; + /// + /// builder.set_probing_config( + /// ProbingConfig::high_degree(100) + /// .interval(Duration::from_secs(30)) + /// .build() + /// ); + /// ``` + pub fn set_probing_config(&mut self, config: probing::ProbingConfig) -> &mut Self { + self.probing_config = Some(config); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -785,6 +809,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), self.pathfinding_scores_sync_config.as_ref(), + self.probing_config.as_ref(), self.async_payments_role, self.recovery_mode, seed_bytes, @@ -1081,6 +1106,13 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_wallet_recovery_mode(); } + /// Configures background probing. + /// + /// See [`probing::ProbingConfig`] for details. + pub fn set_probing_config(&self, config: probing::ProbingConfig) { + self.inner.write().unwrap().set_probing_config(config); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1224,6 +1256,7 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, + probing_config: Option<&probing::ProbingConfig>, async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, kv_store: Arc, ) -> Result { @@ -1626,7 +1659,10 @@ fn build_with_store_internal( }, } - let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) { + scoring_fee_params.probing_diversity_penalty_msat = penalty; + } let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -1965,6 +2001,39 @@ fn build_with_store_internal( _leak_checker.0.push(Arc::downgrade(&wallet) as Weak); } + let prober = probing_config.map(|probing_cfg| { + let strategy: Arc = match &probing_cfg.kind { + probing::ProbingStrategyKind::HighDegree { top_node_count } => { + Arc::new(probing::HighDegreeStrategy::new( + Arc::clone(&network_graph), + *top_node_count, + MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + probing_cfg.cooldown, + )) + }, + probing::ProbingStrategyKind::Random { max_hops } => { + Arc::new(probing::RandomStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + *max_hops, + MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )) + }, + probing::ProbingStrategyKind::Custom(s) => Arc::clone(s), + }; + Arc::new(probing::Prober { + channel_manager: Arc::clone(&channel_manager), + logger: Arc::clone(&logger), + strategy, + interval: probing_cfg.interval, + liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier), + max_locked_msat: probing_cfg.max_locked_msat, + locked_msat: Arc::new(AtomicU64::new(0)), + }) + }); + Ok(Node { runtime, stop_sender, @@ -1998,6 +2067,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver, + prober, #[cfg(cycle_tests)] _leak_checker, }) diff --git a/src/config.rs b/src/config.rs index 71e4d2314..a7d72ceaf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,11 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; +pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10; +pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour +pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats +pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats +pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; // The default timeout after which we abort a wallet syncing operation. diff --git a/src/event.rs b/src/event.rs index ccee8e50b..adb6e46ff 100644 --- a/src/event.rs +++ b/src/event.rs @@ -52,6 +52,7 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; +use crate::probing::Prober; use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, @@ -515,6 +516,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + prober: Option>, } impl EventHandler @@ -530,7 +532,7 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + runtime: Arc, logger: L, config: Arc, prober: Option>, ) -> Self { Self { event_queue, @@ -550,6 +552,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + prober, } } @@ -1135,8 +1138,16 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { .. } => {}, - LdkEvent::ProbeFailed { .. } => {}, + LdkEvent::ProbeSuccessful { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_successful(&path); + } + }, + LdkEvent::ProbeFailed { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_failed(&path); + } + }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source.handle_htlc_handling_failed(failure_type).await; diff --git a/src/lib.rs b/src/lib.rs index 2e02e996c..cb418280d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,10 +101,12 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod probing; mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -170,6 +172,10 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +pub use probing::{ + HighDegreeStrategy, Probe, Prober, ProbingConfig, ProbingConfigBuilder, ProbingStrategy, + RandomStrategy, +}; use runtime::Runtime; pub use tokio; use types::{ @@ -239,6 +245,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: Arc, + prober: Option>, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -593,8 +600,16 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), + self.prober.clone(), )); + if let Some(prober) = self.prober.clone() { + let stop_rx = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + probing::run_prober(prober, stop_rx).await; + }); + } + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1067,6 +1082,41 @@ impl Node { )) } + /// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured. + pub fn prober(&self) -> Option<&Prober> { + self.prober.as_deref() + } + + /// Returns the scorer's estimated `(min, max)` liquidity range for the given channel in the + /// direction toward `target`, or `None` if the scorer has no data for that channel. + /// + /// Works by serializing the `CombinedScorer` (which writes `local_only_scorer`) and + /// deserializing it as a plain `ProbabilisticScorer` to call `estimated_channel_liquidity_range`. + pub fn scorer_channel_liquidity(&self, scid: u64, target: PublicKey) -> Option<(u64, u64)> { + use lightning::routing::scoring::{ + ProbabilisticScorer, ProbabilisticScoringDecayParameters, + }; + use lightning::util::ser::{ReadableArgs, Writeable}; + + let target_node_id = lightning::routing::gossip::NodeId::from_pubkey(&target); + + let bytes = { + let scorer = self.scorer.lock().unwrap(); + let mut buf = Vec::new(); + scorer.write(&mut buf).ok()?; + buf + }; + + let decay_params = ProbabilisticScoringDecayParameters::default(); + let prob_scorer = ProbabilisticScorer::read( + &mut &bytes[..], + (decay_params, Arc::clone(&self.network_graph), Arc::clone(&self.logger)), + ) + .ok()?; + + prob_scorer.estimated_channel_liquidity_range(scid, &target_node_id) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 000000000..a7be9cb64 --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,596 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::collections::HashMap; +use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use bitcoin::secp256k1::PublicKey; +use lightning::routing::gossip::NodeId; +use lightning::routing::router::{Path, RouteHop, MAX_PATH_LENGTH_ESTIMATE}; +use lightning_invoice::DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning_types::features::NodeFeatures; + +use crate::config::{ + DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBED_NODE_COOLDOWN_SECS, DEFAULT_PROBING_INTERVAL_SECS, +}; +use crate::logger::{log_debug, LdkLogger, Logger}; +use crate::types::{ChannelManager, Graph}; +use crate::util::random_range; + +/// Which built-in probing strategy to use, or a custom one. +#[derive(Clone)] +pub(crate) enum ProbingStrategyKind { + HighDegree { top_node_count: usize }, + Random { max_hops: usize }, + Custom(Arc), +} + +/// Configuration for the background probing subsystem. +/// +/// Use the constructor methods [`high_degree`], [`random_walk`], or [`custom`] to start +/// building, then chain optional setters and call [`build`]. +/// +/// # Example +/// ```ignore +/// let config = ProbingConfig::high_degree(100) +/// .interval(Duration::from_secs(30)) +/// .max_locked_msat(500_000) +/// .diversity_penalty_msat(250) +/// .build(); +/// builder.set_probing_config(config); +/// ``` +/// +/// [`high_degree`]: Self::high_degree +/// [`random_walk`]: Self::random_walk +/// [`custom`]: Self::custom +/// [`build`]: ProbingConfigBuilder::build +#[derive(Clone)] +pub struct ProbingConfig { + pub(crate) kind: ProbingStrategyKind, + pub(crate) interval: Duration, + pub(crate) max_locked_msat: u64, + pub(crate) diversity_penalty_msat: Option, + pub(crate) cooldown: Duration, +} + +impl fmt::Debug for ProbingConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let kind_str = match &self.kind { + ProbingStrategyKind::HighDegree { top_node_count } => { + format!("HighDegree {{ top_node_count: {} }}", top_node_count) + }, + ProbingStrategyKind::Random { max_hops } => { + format!("Random {{ max_hops: {} }}", max_hops) + }, + ProbingStrategyKind::Custom(_) => "Custom()".to_string(), + }; + f.debug_struct("ProbingConfig") + .field("kind", &kind_str) + .field("interval", &self.interval) + .field("max_locked_msat", &self.max_locked_msat) + .field("diversity_penalty_msat", &self.diversity_penalty_msat) + .field("cooldown", &self.cooldown) + .finish() + } +} + +impl ProbingConfig { + /// Start building a config that probes toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + pub fn high_degree(top_node_count: usize) -> ProbingConfigBuilder { + ProbingConfigBuilder::new(ProbingStrategyKind::HighDegree { top_node_count }) + } + + /// Start building a config that probes via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + pub fn random_walk(max_hops: usize) -> ProbingConfigBuilder { + ProbingConfigBuilder::new(ProbingStrategyKind::Random { max_hops }) + } + + /// Start building a config with a custom [`ProbingStrategy`] implementation. + pub fn custom(strategy: Arc) -> ProbingConfigBuilder { + ProbingConfigBuilder::new(ProbingStrategyKind::Custom(strategy)) + } +} + +/// Builder for [`ProbingConfig`]. +/// +/// Created via [`ProbingConfig::high_degree`], [`ProbingConfig::random_walk`], or +/// [`ProbingConfig::custom`]. Call [`build`] to finalize. +/// +/// [`build`]: Self::build +pub struct ProbingConfigBuilder { + kind: ProbingStrategyKind, + interval: Duration, + max_locked_msat: u64, + diversity_penalty_msat: Option, + cooldown: Duration, +} + +impl ProbingConfigBuilder { + fn new(kind: ProbingStrategyKind) -> Self { + Self { + kind, + interval: Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS), + max_locked_msat: DEFAULT_MAX_PROBE_LOCKED_MSAT, + diversity_penalty_msat: None, + cooldown: Duration::from_secs(DEFAULT_PROBED_NODE_COOLDOWN_SECS), + } + } + + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. + pub fn interval(mut self, interval: Duration) -> Self { + self.interval = interval; + self + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn max_locked_msat(mut self, max_msat: u64) -> Self { + self.max_locked_msat = max_msat; + self + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn diversity_penalty_msat(mut self, penalty_msat: u64) -> Self { + self.diversity_penalty_msat = Some(penalty_msat); + self + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. + pub fn cooldown(mut self, cooldown: Duration) -> Self { + self.cooldown = cooldown; + self + } + + /// Builds the [`ProbingConfig`]. + pub fn build(self) -> ProbingConfig { + ProbingConfig { + kind: self.kind, + interval: self.interval, + max_locked_msat: self.max_locked_msat, + diversity_penalty_msat: self.diversity_penalty_msat, + cooldown: self.cooldown, + } + } +} + +/// A probe to be dispatched by the Prober. +pub enum Probe { + /// A manually constructed path; dispatched via `send_probe`. + PrebuiltRoute(Path), + /// A destination to reach; the router selects the actual path via + /// `send_spontaneous_preflight_probes`. + Destination { + /// The destination node. + final_node: PublicKey, + /// The probe amount in millisatoshis. + amount_msat: u64, + }, +} + +/// Strategy can be used for determining the next target and amount for probing. +pub trait ProbingStrategy: Send + Sync + 'static { + /// Returns the next probe to run, or `None` to skip this tick. + fn next_probe(&self) -> Option; +} + +/// Probes toward the most-connected nodes in the graph. +/// +/// On each tick the strategy reads the current gossip graph, sorts nodes by +/// channel count, and picks the highest-degree node from the top +/// `top_node_count` that has not been probed within `cooldown`. +/// Nodes probed more recently are skipped so that the strategy +/// naturally spreads across the top nodes and picks up graph changes. +/// Returns `None` (skips the tick) if all top nodes are on cooldown. +/// +/// The probe amount is chosen uniformly at random from +/// `[min_amount_msat, max_amount_msat]`. +pub struct HighDegreeStrategy { + network_graph: Arc, + /// How many of the highest-degree nodes to cycle through. + pub top_node_count: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, + /// How long a node stays ineligible after being probed. + pub cooldown: Duration, + /// Nodes probed recently, with the time they were last probed. + recently_probed: Mutex>, +} + +impl HighDegreeStrategy { + /// Creates a new high-degree probing strategy. + pub(crate) fn new( + network_graph: Arc, top_node_count: usize, min_amount_msat: u64, + max_amount_msat: u64, cooldown: Duration, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + top_node_count, + min_amount_msat, + max_amount_msat, + cooldown, + recently_probed: Mutex::new(HashMap::new()), + } + } +} + +impl ProbingStrategy for HighDegreeStrategy { + fn next_probe(&self) -> Option { + let graph = self.network_graph.read_only(); + + // Collect (pubkey, channel_count) for all nodes. + let mut nodes_by_degree: Vec<(PublicKey, usize)> = graph + .nodes() + .unordered_iter() + .filter_map(|(id, info)| { + PublicKey::try_from(*id).ok().map(|pubkey| (pubkey, info.channels.len())) + }) + .collect(); + + if nodes_by_degree.is_empty() { + return None; + } + + nodes_by_degree.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let top_node_count = self.top_node_count.min(nodes_by_degree.len()); + let now = Instant::now(); + + let mut probed = self.recently_probed.lock().unwrap(); + + // We could check staleness when we use the entry, but that way we'd not clear cache at + // all. For hundreds of top nodes it's okay to call retain each tick. + probed.retain(|_, probed_at| now.duration_since(*probed_at) < self.cooldown); + + // If all top nodes are on cooldown, reset and start a new cycle. + let final_node = match nodes_by_degree[..top_node_count] + .iter() + .find(|(pubkey, _)| !probed.contains_key(pubkey)) + { + Some((pubkey, _)) => *pubkey, + None => { + probed.clear(); + nodes_by_degree[0].0 + }, + }; + + probed.insert(final_node, now); + drop(probed); + + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + Some(Probe::Destination { final_node, amount_msat }) + } +} + +/// Explores the graph by walking a random number of hops outward from one of our own +/// channels, constructing the [`Path`] explicitly. +/// +/// On each tick: +/// 1. Picks one of our confirmed, usable channels to start from. +/// 2. Performs a deterministic walk of a randomly chosen depth (up to +/// [`MAX_PATH_LENGTH_ESTIMATE`]) through the gossip graph, skipping disabled +/// channels and dead-ends. +/// 3. Returns `Probe::PrebuiltRoute(path)` so the prober calls `send_probe` directly. +/// +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +/// +/// Because path selection ignores the scorer, this probes channels the router +/// would never try on its own, teaching the scorer about previously unknown paths. +pub struct RandomStrategy { + network_graph: Arc, + channel_manager: Arc, + /// Upper bound on the number of hops in a randomly constructed path. + pub max_hops: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, +} + +impl RandomStrategy { + /// Creates a new random-walk probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, max_hops: usize, + min_amount_msat: u64, max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + max_hops: max_hops.clamp(1, MAX_PATH_LENGTH_ESTIMATE as usize), + min_amount_msat, + max_amount_msat, + } + } + + /// Tries to build a path of `target_hops` hops. Returns `None` if the local node has no + /// usable channels, or the walk terminates before reaching `target_hops`. + fn try_build_path(&self, target_hops: usize, amount_msat: u64) -> Option { + let initial_channels = self + .channel_manager + .list_channels() + .into_iter() + .filter(|c| c.is_usable && c.short_channel_id.is_some()) + .collect::>(); + + if initial_channels.is_empty() { + return None; + } + + let graph = self.network_graph.read_only(); + let first_hop = + &initial_channels[random_range(0, initial_channels.len() as u64 - 1) as usize]; + let first_hop_scid = first_hop.short_channel_id.unwrap(); + let next_peer_pubkey = first_hop.counterparty.node_id; + let next_peer_node_id = NodeId::from_pubkey(&next_peer_pubkey); + + // Track the tightest HTLC limit across all hops to cap the probe amount. + // The first hop limit comes from our live channel state; subsequent hops use htlc_maximum_msat from the gossip channel update. + let mut route_least_htlc_upper_bound = first_hop.next_outbound_htlc_limit_msat; + let mut route_greatest_htlc_lower_bound = first_hop.next_outbound_htlc_minimum_msat; + + // Walk the graph: each entry is (node_id, arrived_via_scid, pubkey); first entry is set: + let mut route: Vec<(NodeId, u64, PublicKey)> = + vec![(next_peer_node_id, first_hop_scid, next_peer_pubkey)]; + + let mut prev_scid = first_hop_scid; + let mut current_node_id = next_peer_node_id; + + for _ in 1..target_hops { + let node_info = match graph.node(¤t_node_id) { + Some(n) => n, + None => break, + }; + + // Outward channels: skip the one we arrived on to avoid backtracking. + let candidates: Vec = + node_info.channels.iter().copied().filter(|&scid| scid != prev_scid).collect(); + + if candidates.is_empty() { + break; + } + + let next_scid = candidates[random_range(0, candidates.len() as u64 - 1) as usize]; + let next_channel = match graph.channel(next_scid) { + Some(c) => c, + None => break, + }; + + // as_directed_from validates that current_node_id is a channel endpoint and that + // both direction updates are present; effective_capacity covers both htlc_maximum_msat + // and funding capacity. + let Some((directed, next_node_id)) = next_channel.as_directed_from(¤t_node_id) + else { + break; + }; + // Retrieve the direction-specific update via the public ChannelInfo fields. + // Safe to unwrap: as_directed_from already checked both directions are Some. + let update = if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref().unwrap() + } else { + next_channel.two_to_one.as_ref().unwrap() + }; + + if !update.enabled { + break; + } + + route_least_htlc_upper_bound = + route_least_htlc_upper_bound.min(update.htlc_maximum_msat); + + route_greatest_htlc_lower_bound = + route_greatest_htlc_lower_bound.max(update.htlc_minimum_msat); + + let next_pubkey = match PublicKey::try_from(*next_node_id) { + Ok(pk) => pk, + Err(_) => break, + }; + + route.push((*next_node_id, next_scid, next_pubkey)); + prev_scid = next_scid; + current_node_id = *next_node_id; + } + + // The route is infeasible if any hop's minimum exceeds another hop's maximum. + if route_greatest_htlc_lower_bound > route_least_htlc_upper_bound { + return None; + } + let amount_msat = + amount_msat.max(route_greatest_htlc_lower_bound).min(route_least_htlc_upper_bound); + if amount_msat < self.min_amount_msat { + return None; + } + + // Assemble hops. + // For hop i: fee and CLTV are determined by the *next* channel (what route[i] + // will charge to forward onward). For the last hop they are amount_msat and zero expiry delta. + let mut hops = Vec::with_capacity(route.len()); + for i in 0..route.len() { + let (node_id, via_scid, pubkey) = route[i]; + + let channel_info = graph.channel(via_scid)?; + + let node_features = graph + .node(&node_id) + .and_then(|n| n.announcement_info.as_ref().map(|a| a.features().clone())) + .unwrap_or_else(NodeFeatures::empty); + + let (fee_msat, cltv_expiry_delta) = if i + 1 < route.len() { + // non-final hop + let (_, next_scid, _) = route[i + 1]; + let next_channel = graph.channel(next_scid)?; + let (directed, _) = next_channel.as_directed_from(&node_id)?; + let update = if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref().unwrap() + } else { + next_channel.two_to_one.as_ref().unwrap() + }; + let fee = update.fees.base_msat as u64 + + (amount_msat * update.fees.proportional_millionths as u64 / 1_000_000); + (fee, update.cltv_expiry_delta as u32) + } else { + // Final hop: fee_msat carries the delivery amount; cltv delta is zero. + (amount_msat, 0) + }; + + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features: channel_info.features.clone(), + fee_msat, + cltv_expiry_delta, + maybe_announced_channel: true, + }); + } + + // The first-hop HTLC carries amount_msat + all intermediate fees. + // Verify the total fits within our live outbound limit before returning. + let total_outgoing: u64 = hops.iter().map(|h| h.fee_msat).sum(); + if total_outgoing > first_hop.next_outbound_htlc_limit_msat { + return None; + } + + Some(Path { hops, blinded_tail: None }) + } +} + +impl ProbingStrategy for RandomStrategy { + fn next_probe(&self) -> Option { + let target_hops = random_range(1, self.max_hops as u64) as usize; + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + + self.try_build_path(target_hops, amount_msat).map(Probe::PrebuiltRoute) + } +} + +/// Periodically dispatches probes according to a [`ProbingStrategy`]. +pub struct Prober { + pub(crate) channel_manager: Arc, + pub(crate) logger: Arc, + /// The strategy that decides what to probe. + pub strategy: Arc, + /// How often to fire a probe attempt. + pub interval: Duration, + /// Passed to `send_spontaneous_preflight_probes`. `None` uses LDK default (3×). + pub liquidity_limit_multiplier: Option, + /// Maximum total millisatoshis that may be locked in in-flight probes at any time. + pub max_locked_msat: u64, + pub(crate) locked_msat: Arc, +} + +impl Prober { + /// Returns the total millisatoshis currently locked in in-flight probes. + pub fn locked_msat(&self) -> u64 { + self.locked_msat.load(Ordering::Relaxed) + } + + pub(crate) fn handle_probe_successful(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let _ = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))); + } + + pub(crate) fn handle_probe_failed(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let _ = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))); + } +} + +/// Runs the probing loop for the given [`Prober`] until `stop_rx` fires. +pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::watch::Receiver<()>) { + let mut ticker = tokio::time::interval(prober.interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + _ = stop_rx.changed() => { + log_debug!(prober.logger, "Stopping background probing."); + return; + } + _ = ticker.tick() => { + match prober.strategy.next_probe() { + None => {} + Some(Probe::PrebuiltRoute(path)) => { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + if prober.locked_msat.load(Ordering::Acquire) + amount > prober.max_locked_msat { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + } else { + match prober.channel_manager.send_probe(path) { + Ok(_) => { + prober.locked_msat.fetch_add(amount, Ordering::Release); + } + Err(e) => { + log_debug!(prober.logger, "Prebuilt path probe failed: {:?}", e); + } + } + } + } + Some(Probe::Destination { final_node, amount_msat }) => { + if prober.locked_msat.load(Ordering::Acquire) + amount_msat + > prober.max_locked_msat + { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + } else { + match prober.channel_manager.send_spontaneous_preflight_probes( + final_node, + amount_msat, + DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32, + prober.liquidity_limit_multiplier, + ) { + Ok(probes) => { + if !probes.is_empty() { + prober.locked_msat.fetch_add(amount_msat, Ordering::Release); + } else { + log_debug!(prober.logger, "No probe paths found for destination {}; skipping budget increment.", final_node); + } + } + Err(e) => { + log_debug!(prober.logger, "Route-follow probe to {} failed: {:?}", final_node, e); + } + } + } + } + } + } + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 000000000..aa1a35bae --- /dev/null +++ b/src/util.rs @@ -0,0 +1,29 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +/// Returns a random `u64` uniformly distributed in `[min, max]` (inclusive). +pub(crate) fn random_range(min: u64, max: u64) -> u64 { + debug_assert!(min <= max); + if min == max { + return min; + } + let range = max - min + 1; + // We remove bias due to the fact that the range does not evenly divide 2⁶⁴. + // Imagine we had a range from 0 to 2⁶⁴-2 (of length 2⁶⁴-1), then + // the outcomes of 0 would be twice as frequent as any other, as 0 can be produced + // as randomly drawn 0 % 2⁶⁴-1 and as well as 2⁶⁴-1 % 2⁶⁴-1 + let limit = u64::MAX - (u64::MAX % range); + loop { + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + let val = u64::from_ne_bytes(buf); + if val < limit { + return min + (val % range); + } + // loop runs ~1 iteration on average, in worst case it's ~2 iterations on average + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7854a77f2..0f987a359 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,7 +32,7 @@ use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - UserChannelId, + ProbingConfig, UserChannelId, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -316,9 +316,9 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { } #[cfg(feature = "uniffi")] -type TestNode = Arc; +pub(crate) type TestNode = Arc; #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +pub(crate) type TestNode = Node; #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -348,6 +348,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub probing: Option, } impl Default for TestConfig { @@ -367,6 +368,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + probing: None, } } } @@ -483,6 +485,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + if let Some(probing) = config.probing { + builder.set_probing_config(probing); + } + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -710,12 +716,18 @@ pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, None, should_announce).await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo } -pub async fn open_channel_push_amt( +/// Like [`open_channel`] but skips the `wait_for_tx` electrum check so that +/// multiple channels can be opened back-to-back before any blocks are mined. +/// The caller is responsible for mining blocks and confirming the funding txs. +pub async fn open_channel_no_wait( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, - should_announce: bool, electrsd: &ElectrsD, + should_announce: bool, ) -> OutPoint { if should_announce { node_a @@ -743,11 +755,20 @@ pub async fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid).await; - funding_txo_a } +pub async fn open_channel_push_amt( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, + should_announce: bool, electrsd: &ElectrsD, +) -> OutPoint { + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, push_amount_msat, should_announce) + .await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo +} + pub async fn open_channel_with_all( node_a: &TestNode, node_b: &TestNode, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs new file mode 100644 index 000000000..aea2ecdde --- /dev/null +++ b/tests/probing_tests.rs @@ -0,0 +1,611 @@ +// Integration tests for the probing service. +// +// Budget tests – linear A ──[1M sats]──▶ B ──[1M sats]──▶ C topology: +// +// probe_budget_increments_and_decrements +// Verifies locked_msat rises when a probe is dispatched and returns +// to zero once the probe resolves. +// +// exhausted_probe_budget_blocks_new_probes +// Stops B mid-flight so the HTLC cannot resolve; confirms the budget +// stays exhausted and no further probes are sent. After B restarts +// the probe fails, the budget clears, and new probes resume. +// +// Strategy tests: +// +// probing_strategies_perfomance +// Brings up a random mesh of nodes, fires random-walk probes via +// RandomStrategy and high-degree probes via HighDegreeStrategy, then +// runs payment rounds and prints probing perfomance tables. + +mod common; +use std::sync::atomic::{AtomicBool, Ordering}; + +use lightning::routing::gossip::NodeAlias; +use lightning_invoice::{Bolt11InvoiceDescription, Description}; + +use common::{ + expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, + open_channel_no_wait, premine_and_distribute_funds, random_config, setup_bitcoind_and_electrsd, + setup_node, TestChainSource, TestNode, +}; + +use ldk_node::bitcoin::secp256k1::PublicKey; +use ldk_node::bitcoin::Amount; +use ldk_node::{Event, Probe, ProbingConfig, ProbingStrategy}; + +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::time::Duration; + +const PROBE_AMOUNT_MSAT: u64 = 1_000_000; +const MAX_LOCKED_MSAT: u64 = 100_000_000; +const PROBING_INTERVAL_MILLISECONDS: u64 = 100; +const PROBING_DIVERSITY_PENALTY: u64 = 50_000; + +/// FixedDestStrategy — always targets one node; used by budget tests. +struct FixedDestStrategy { + destination: PublicKey, + amount_msat: u64, + ready_to_probe: AtomicBool, +} + +impl FixedDestStrategy { + fn new(destination: PublicKey, amount_msat: u64) -> Arc { + Arc::new(Self { destination, amount_msat, ready_to_probe: AtomicBool::new(false) }) + } + + fn start_probing(&self) { + self.ready_to_probe.store(true, Ordering::Relaxed); + } +} + +impl ProbingStrategy for FixedDestStrategy { + fn next_probe(&self) -> Option { + if self.ready_to_probe.load(Ordering::Relaxed) { + Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat }) + } else { + None + } + } +} + +fn config_with_label(label: &str) -> common::TestConfig { + let mut config = random_config(false); + let mut alias_bytes = [0u8; 32]; + let b = label.as_bytes(); + alias_bytes[..b.len()].copy_from_slice(b); + config.node_config.node_alias = Some(NodeAlias(alias_bytes)); + config +} + +fn build_node_random_probing(chain_source: &TestChainSource<'_>, max_hops: usize) -> TestNode { + let mut config = config_with_label("Random"); + config.probing = Some( + ProbingConfig::random_walk(max_hops) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(MAX_LOCKED_MSAT) + .build(), + ); + setup_node(chain_source, config) +} + +fn build_node_highdegree_probing( + chain_source: &TestChainSource<'_>, top_node_count: usize, +) -> TestNode { + let mut config = config_with_label("HiDeg"); + config.probing = Some( + ProbingConfig::high_degree(top_node_count) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(MAX_LOCKED_MSAT) + .build(), + ); + setup_node(chain_source, config) +} + +fn build_node_z_highdegree_probing( + chain_source: &TestChainSource<'_>, top_node_count: usize, diversity_penalty: u64, +) -> TestNode { + let mut config = config_with_label("HiDeg+P"); + config.probing = Some( + ProbingConfig::high_degree(top_node_count) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(MAX_LOCKED_MSAT) + .diversity_penalty_msat(diversity_penalty) + .build(), + ); + setup_node(chain_source, config) +} + +// helpers, formatting +fn node_label(node: &TestNode) -> String { + node.node_alias() + .map(|alias| { + let end = alias.0.iter().position(|&b| b == 0).unwrap_or(32); + String::from_utf8_lossy(&alias.0[..end]).to_string() + }) + .unwrap_or_else(|| format!("{:.8}", node.node_id())) +} + +fn print_topology(all_nodes: &[&TestNode]) { + let labels: HashMap = + all_nodes.iter().map(|n| (n.node_id(), node_label(n))).collect(); + let label_of = |pk: PublicKey| labels.get(&pk).cloned().unwrap_or_else(|| format!("{:.8}", pk)); + + let mut adjacency: BTreeMap> = BTreeMap::new(); + for node in all_nodes { + let local = label_of(node.node_id()); + let mut peers: Vec = node + .list_channels() + .into_iter() + .filter(|ch| ch.short_channel_id.is_some()) + .map(|ch| label_of(ch.counterparty_node_id)) + .collect(); + peers.sort(); + peers.dedup(); + adjacency.entry(local).or_default().extend(peers); + } + + println!("\n=== Topology ==="); + for (node, peers) in &adjacency { + println!(" {node} ── {}", peers.join(", ")); + } +} + +const LABEL_MAX: usize = 8; +const DIR_W: usize = LABEL_MAX * 2 + 1; +const SCORER_W: usize = 28; + +fn thousands(n: u64) -> String { + let s = n.to_string(); + let mut out = String::with_capacity(s.len() + s.len() / 3); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + out.push(' '); + } + out.push(c); + } + out.chars().rev().collect() +} + +fn short_label(label: &str) -> String { + label.chars().take(LABEL_MAX).collect() +} + +fn fmt_est(est: Option<(u64, u64)>) -> String { + match est { + Some((lo, hi)) => format!("[{}, {}]", thousands(lo), thousands(hi)), + None => "unknown".into(), + } +} + +fn print_probing_perfomance(observers: &[&TestNode], all_nodes: &[&TestNode]) { + let labels: HashMap = + all_nodes.iter().chain(observers.iter()).map(|n| (n.node_id(), node_label(n))).collect(); + let label_of = |pk: PublicKey| { + short_label(&labels.get(&pk).cloned().unwrap_or_else(|| format!("{:.8}", pk))) + }; + + let mut by_scid: BTreeMap> = BTreeMap::new(); + for node in all_nodes { + let local_pk = node.node_id(); + for ch in node.list_channels() { + if let Some(scid) = ch.short_channel_id { + by_scid.entry(scid).or_default().push(( + local_pk, + ch.counterparty_node_id, + ch.outbound_capacity_msat, + )); + } + } + } + + print!("\n{:<15} {: 0 { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .is_ok(); + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat()); + + let cleared = tokio::time::timeout(Duration::from_secs(20), async { + loop { + if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never returned to zero after probe resolved"); + println!("Probe resolved; locked_msat = 0"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that no new probes are dispatched once the in-flight budget is exhausted. +/// +/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC +/// is in-flight, preventing resolution and keeping the budget locked. After B restarts +/// the HTLC fails, the budget clears, and probing resumes. +#[tokio::test(flavor = "multi_thread")] +async fn exhausted_probe_budget_blocks_new_probes() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Electrum(&electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + // Use a slow probing interval so we can read capacity before the first probe fires. + let mut config_a = random_config(false); + let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT); + config_a.probing = Some( + ProbingConfig::custom(strategy.clone()) + .interval(Duration::from_secs(10)) + .max_locked_msat(PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + // Record capacity before the first probe fires (interval is 3s, so we have time). + let capacity_at_open = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + + assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); + strategy.start_probing(); + + // Give gossip time to propagate to A, then wait for the first probe. + let locked = tokio::time::timeout(Duration::from_secs(15), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .is_ok(); + assert!(locked, "no probe dispatched within 15 s"); + + // Capacity should have decreased due to the in-flight probe HTLC. + let capacity_with_probe = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + assert!( + capacity_with_probe < capacity_at_open, + "HTLC not visible in channel state: capacity unchanged ({capacity_at_open} msat)" + ); + + // Stop B while the probe HTLC is in-flight. + node_b.stop().unwrap(); + + // Let several Prober ticks fire (interval is 3s); the budget is exhausted so + // they must be skipped. Wait, then check both conditions at once. + tokio::time::sleep(Duration::from_secs(5)).await; + assert!( + node_a.prober().map_or(0, |p| p.locked_msat()) > 0, + "probe resolved unexpectedly while B was offline" + ); + let capacity_after_wait = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .unwrap_or(u64::MAX); + assert!( + capacity_after_wait >= capacity_with_probe, + "a new probe HTLC was sent despite budget being exhausted" + ); + + // Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves + // without waiting for the background reconnection backoff. + node_b.start().unwrap(); + let node_a_addr = node_a.listening_addresses().unwrap().first().unwrap().clone(); + let node_c_addr = node_c.listening_addresses().unwrap().first().unwrap().clone(); + node_b.connect(node_a.node_id(), node_a_addr, false).unwrap(); + node_b.connect(node_c.node_id(), node_c_addr, false).unwrap(); + + let cleared = tokio::time::timeout(Duration::from_secs(15), async { + loop { + if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never cleared after B came back online"); + + // Once the budget is freed, a new probe should be dispatched within a few ticks. + let new_probe = tokio::time::timeout(Duration::from_secs(10), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .is_ok(); + assert!(new_probe, "no new probe dispatched after budget was freed"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Builds a random mesh of nodes, runs `RandomStrategy` and `HighDegreeStrategy` +/// probers alongside payment rounds, then prints scorer liquidity estimates to +/// compare probing coverage. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn probing_strategies_perfomance() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Electrum(&electrsd); + + let num_nodes = 5; + let channel_capacity_sat = 1_000_000u64; + // Each observer opens 1 channel; regular nodes open at most (num_nodes-1) each. + // num_nodes UTXOs per node is a safe upper bound for funding. + let utxos_per_node = num_nodes; + let utxo_per_channel = Amount::from_sat(channel_capacity_sat + 50_000); + + let mut nodes: Vec = Vec::new(); + for i in 0..num_nodes { + let label = char::from(b'B' + i as u8).to_string(); + let mut config = random_config(false); + let mut alias_bytes = [0u8; 32]; + alias_bytes[..label.as_bytes().len()].copy_from_slice(label.as_bytes()); + config.node_config.node_alias = Some(NodeAlias(alias_bytes)); + nodes.push(setup_node(&chain_source, config)); + } + let node_a = build_node_random_probing(&chain_source, 4); + let node_x = setup_node(&chain_source, config_with_label("nostrat")); + let node_y = build_node_highdegree_probing(&chain_source, 4); + let node_z = build_node_z_highdegree_probing(&chain_source, 4, PROBING_DIVERSITY_PENALTY); + + let seed = std::env::var("TEST_SEED") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| rand::rng().random()); + println!("RNG seed: {seed} (re-run with TEST_SEED={seed} to reproduce)"); + let mut rng = StdRng::seed_from_u64(seed); + let channels_per_node = rng.random_range(1..=num_nodes - 1); + let channels_per_nodes: Vec = + (0..num_nodes).map(|_| rng.random_range(1..=channels_per_node)).collect(); + + let observer_nodes: [&TestNode; 4] = [&node_a, &node_y, &node_z, &node_x]; + + let mut addresses = Vec::new(); + for node in observer_nodes { + for _ in 0..utxos_per_node { + addresses.push(node.onchain_payment().new_address().unwrap()); + } + } + for node in &nodes { + for _ in 0..utxos_per_node { + addresses.push(node.onchain_payment().new_address().unwrap()); + } + } + + premine_and_distribute_funds(&bitcoind.client, &electrsd.client, addresses, utxo_per_channel) + .await; + + println!("distributed initial sats"); + for node in nodes.iter().chain(observer_nodes) { + node.sync_wallets().unwrap(); + } + + fn drain_events(node: &TestNode) { + while let Some(_) = node.next_event() { + node.event_handled().unwrap(); + } + } + + println!("opening channels"); + for node in observer_nodes { + let idx = rng.random_range(0..num_nodes); + open_channel_no_wait(node, &nodes[idx], channel_capacity_sat, None, true).await; + } + for (i, &count) in channels_per_nodes.iter().enumerate() { + let targets: Vec = (0..num_nodes).filter(|&j| j != i).take(count).collect(); + for j in targets { + open_channel_no_wait(&nodes[i], &nodes[j], channel_capacity_sat, None, true).await; + } + } + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + for node in nodes.iter().chain(observer_nodes) { + node.sync_wallets().unwrap(); + } + for node in nodes.iter().chain(observer_nodes) { + drain_events(node); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + + let mut node_map = HashMap::new(); + for (i, node) in nodes.iter().enumerate() { + node_map.insert(node.node_id(), i); + } + + let all_nodes: Vec<&TestNode> = nodes.iter().chain(observer_nodes).collect(); + + print_topology(&all_nodes); + + println!("\nbefore payments"); + print_probing_perfomance(&observer_nodes, &all_nodes); + + let desc = Bolt11InvoiceDescription::Direct(Description::new("test".to_string()).unwrap()); + for round in 0..10 { + let mut sent = 0u32; + for sender_idx in 0..num_nodes { + let channels: Vec<_> = nodes[sender_idx] + .list_channels() + .into_iter() + .filter(|ch| ch.is_channel_ready && ch.outbound_capacity_msat > 1_000) + .collect(); + if channels.is_empty() { + continue; + } + let ch = &channels[rng.random_range(0..channels.len())]; + let amount_msat = rng.random_range(1_000..=ch.outbound_capacity_msat.min(100_000_000)); + if let Some(&receiver_idx) = node_map.get(&ch.counterparty_node_id) { + let invoice = nodes[receiver_idx] + .bolt11_payment() + .receive(amount_msat, &desc.clone().into(), 3600) + .unwrap(); + if nodes[sender_idx].bolt11_payment().send(&invoice, None).is_ok() { + sent += 1; + } + } + } + println!("round {round}: sent {sent} payments"); + tokio::time::sleep(Duration::from_millis(500)).await; + for node in nodes.iter().chain(observer_nodes) { + drain_events(node); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + println!("\n=== after payments ==="); + print_probing_perfomance(&observer_nodes, &all_nodes); + + for node in nodes.iter().chain(observer_nodes) { + node.stop().unwrap(); + } +}