Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::default::Default;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::SystemTime;
use std::{fmt, fs};
Expand Down Expand Up @@ -47,6 +48,7 @@ use crate::config::{
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig,
DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
DEFAULT_MAX_PROBE_AMOUNT_MSAT, MIN_PROBE_AMOUNT_MSAT,
};
use crate::connection::ConnectionManager;
use crate::entropy::NodeEntropy;
Expand All @@ -73,6 +75,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::probing;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -281,6 +284,7 @@ pub struct NodeBuilder {
runtime_handle: Option<tokio::runtime::Handle>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
recovery_mode: bool,
probing_config: Option<probing::ProbingConfig>,
}

impl NodeBuilder {
Expand All @@ -299,16 +303,19 @@ impl NodeBuilder {
let runtime_handle = None;
let pathfinding_scores_sync_config = None;
let recovery_mode = false;
let async_payments_role = None;
let probing_config = None;
Self {
config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
runtime_handle,
async_payments_role: None,
async_payments_role,
pathfinding_scores_sync_config,
recovery_mode,
probing_config,
}
}

Expand Down Expand Up @@ -614,6 +621,23 @@ impl NodeBuilder {
self
}

/// Configures background probing.
///
/// Use [`probing::ProbingConfig`] to build the configuration:
/// ```ignore
/// use ldk_node::probing::ProbingConfig;
///
/// builder.set_probing_config(
/// ProbingConfig::high_degree(100)
/// .interval(Duration::from_secs(30))
/// .build()
/// );
/// ```
pub fn set_probing_config(&mut self, config: probing::ProbingConfig) -> &mut Self {
self.probing_config = Some(config);
self
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -785,6 +809,7 @@ impl NodeBuilder {
self.gossip_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.probing_config.as_ref(),
self.async_payments_role,
self.recovery_mode,
seed_bytes,
Expand Down Expand Up @@ -1081,6 +1106,13 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_wallet_recovery_mode();
}

/// Configures background probing.
///
/// See [`probing::ProbingConfig`] for details.
pub fn set_probing_config(&self, config: probing::ProbingConfig) {
self.inner.write().unwrap().set_probing_config(config);
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: Arc<NodeEntropy>) -> Result<Arc<Node>, BuildError> {
Expand Down Expand Up @@ -1224,6 +1256,7 @@ fn build_with_store_internal(
gossip_source_config: Option<&GossipSourceConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>,
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
probing_config: Option<&probing::ProbingConfig>,
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -1626,7 +1659,10 @@ fn build_with_store_internal(
},
}

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if we should allow the user to set the entire ProbabilisticScoringFeeParameters and ProbabilisticScoringDecayParameters via the ProbingConfigBuilder mentioned above? Do you see any reason where that would conflict with other API design decisions?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should expose these settings (in NodeBuilder, not probing builder).
However these are for fine tuning and should be used only by advanced users. Also I would expose UserConfig, as for example user cannot decide what features do advertise.

I can add builder methods for scoring parameters, though maybe it should be in another PR aimed on exposing settings for an advanced user?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I would expose UserConfig, as for example user cannot decide what features do advertise.

Yes, this is very intentional, as we want to provide a sane/safe API. For example letting user freely choose to set certain parameters if we don't implement them properly will just lead to a lot of footguns, in some circumstances even with the potential for money loss.

if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) {
scoring_fee_params.probing_diversity_penalty_msat = penalty;
}
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Arc::clone(&logger),
Expand Down Expand Up @@ -1965,6 +2001,39 @@ fn build_with_store_internal(
_leak_checker.0.push(Arc::downgrade(&wallet) as Weak<dyn Any + Send + Sync>);
}

let prober = probing_config.map(|probing_cfg| {
let strategy: Arc<dyn probing::ProbingStrategy> = match &probing_cfg.kind {
probing::ProbingStrategyKind::HighDegree { top_node_count } => {
Arc::new(probing::HighDegreeStrategy::new(
Arc::clone(&network_graph),
*top_node_count,
MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
probing_cfg.cooldown,
))
},
probing::ProbingStrategyKind::Random { max_hops } => {
Arc::new(probing::RandomStrategy::new(
Arc::clone(&network_graph),
Arc::clone(&channel_manager),
*max_hops,
MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
))
},
probing::ProbingStrategyKind::Custom(s) => Arc::clone(s),
};
Arc::new(probing::Prober {
channel_manager: Arc::clone(&channel_manager),
logger: Arc::clone(&logger),
strategy,
interval: probing_cfg.interval,
liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier),
max_locked_msat: probing_cfg.max_locked_msat,
locked_msat: Arc::new(AtomicU64::new(0)),
})
});

Ok(Node {
runtime,
stop_sender,
Expand Down Expand Up @@ -1998,6 +2067,7 @@ fn build_with_store_internal(
om_mailbox,
async_payments_role,
hrn_resolver,
prober,
#[cfg(cycle_tests)]
_leak_checker,
})
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80;
const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30;
const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10;
const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10;
pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour
pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats
pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats
pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;

// The default timeout after which we abort a wallet syncing operation.
Expand Down
17 changes: 14 additions & 3 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
use crate::payment::store::{
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
};
use crate::probing::Prober;
use crate::runtime::Runtime;
use crate::types::{
CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet,
Expand Down Expand Up @@ -515,6 +516,7 @@ where
static_invoice_store: Option<StaticInvoiceStore>,
onion_messenger: Arc<OnionMessenger>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
prober: Option<Arc<Prober>>,
}

impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
Expand All @@ -530,7 +532,7 @@ where
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
runtime: Arc<Runtime>, logger: L, config: Arc<Config>, prober: Option<Arc<Prober>>,
) -> Self {
Self {
event_queue,
Expand All @@ -550,6 +552,7 @@ where
static_invoice_store,
onion_messenger,
om_mailbox,
prober,
}
}

Expand Down Expand Up @@ -1135,8 +1138,16 @@ where

LdkEvent::PaymentPathSuccessful { .. } => {},
LdkEvent::PaymentPathFailed { .. } => {},
LdkEvent::ProbeSuccessful { .. } => {},
LdkEvent::ProbeFailed { .. } => {},
LdkEvent::ProbeSuccessful { path, .. } => {
if let Some(prober) = &self.prober {
prober.handle_probe_successful(&path);
}
},
LdkEvent::ProbeFailed { path, .. } => {
if let Some(prober) = &self.prober {
prober.handle_probe_failed(&path);
}
},
LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
liquidity_source.handle_htlc_handling_failed(failure_type).await;
Expand Down
50 changes: 50 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod probing;
mod runtime;
mod scoring;
mod tx_broadcaster;
mod types;
mod util;
mod wallet;

use std::default::Default;
Expand Down Expand Up @@ -170,6 +172,10 @@ use payment::{
UnifiedPayment,
};
use peer_store::{PeerInfo, PeerStore};
pub use probing::{
HighDegreeStrategy, Probe, Prober, ProbingConfig, ProbingConfigBuilder, ProbingStrategy,
RandomStrategy,
};
use runtime::Runtime;
pub use tokio;
use types::{
Expand Down Expand Up @@ -239,6 +245,7 @@ pub struct Node {
om_mailbox: Option<Arc<OnionMessageMailbox>>,
async_payments_role: Option<AsyncPaymentsRole>,
hrn_resolver: Arc<HRNResolver>,
prober: Option<Arc<probing::Prober>>,
#[cfg(cycle_tests)]
_leak_checker: LeakChecker,
}
Expand Down Expand Up @@ -593,8 +600,16 @@ impl Node {
Arc::clone(&self.runtime),
Arc::clone(&self.logger),
Arc::clone(&self.config),
self.prober.clone(),
));

if let Some(prober) = self.prober.clone() {
let stop_rx = self.stop_sender.subscribe();
self.runtime.spawn_cancellable_background_task(async move {
probing::run_prober(prober, stop_rx).await;
});
}

// Setup background processing
let background_persister = Arc::clone(&self.kv_store);
let background_event_handler = Arc::clone(&event_handler);
Expand Down Expand Up @@ -1067,6 +1082,41 @@ impl Node {
))
}

/// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured.
pub fn prober(&self) -> Option<&Prober> {
self.prober.as_deref()
}

/// Returns the scorer's estimated `(min, max)` liquidity range for the given channel in the
/// direction toward `target`, or `None` if the scorer has no data for that channel.
///
/// Works by serializing the `CombinedScorer` (which writes `local_only_scorer`) and
/// deserializing it as a plain `ProbabilisticScorer` to call `estimated_channel_liquidity_range`.
pub fn scorer_channel_liquidity(&self, scid: u64, target: PublicKey) -> Option<(u64, u64)> {
use lightning::routing::scoring::{
ProbabilisticScorer, ProbabilisticScoringDecayParameters,
};
use lightning::util::ser::{ReadableArgs, Writeable};

let target_node_id = lightning::routing::gossip::NodeId::from_pubkey(&target);

let bytes = {
let scorer = self.scorer.lock().unwrap();
let mut buf = Vec::new();
scorer.write(&mut buf).ok()?;
buf
};

let decay_params = ProbabilisticScoringDecayParameters::default();
let prob_scorer = ProbabilisticScorer::read(
&mut &bytes[..],
(decay_params, Arc::clone(&self.network_graph), Arc::clone(&self.logger)),
)
.ok()?;

prob_scorer.estimated_channel_liquidity_range(scid, &target_node_id)
}

/// Retrieve a list of known channels.
pub fn list_channels(&self) -> Vec<ChannelDetails> {
self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect()
Expand Down
Loading
Loading