From 48af9d3f95278991bc978be8663f4c12160f7d9e Mon Sep 17 00:00:00 2001 From: Enigbe Date: Wed, 18 Feb 2026 22:18:01 +0100 Subject: [PATCH 01/10] Implement tiered storage Introduces TierStore, a KVStore implementation that manages data across three storage layers: - Primary: Main data store for critical node data - Ephemeral: Secondary store for non-critical, easily-rebuildable data (e.g., network graph) with fast local access - Backup: Tertiary store for disaster recovery with async/lazy operations to avoid blocking primary store - Unit tests for TierStore core functionality --- src/io/mod.rs | 1 + src/io/test_utils.rs | 170 +++++++- src/io/tier_store.rs | 980 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1150 insertions(+), 1 deletion(-) create mode 100644 src/io/tier_store.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..bf6366c45 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 88078b316..5dd36cd1a 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -9,7 +9,8 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use lightning::events::ClosureReason; use lightning::io; @@ -26,6 +27,8 @@ use lightning::util::test_utils; use rand::distr::Alphanumeric; use rand::{rng, Rng}; +use crate::runtime::Runtime; + type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< &'a K, &'a test_utils::TestLogger, @@ -353,3 +356,168 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } + +struct DelayedStoreInner { + storage: Mutex>>, + delay: Duration, +} + +impl DelayedStoreInner { + fn new(delay: Duration) -> Self { + Self { storage: Mutex::new(HashMap::new()), delay } + } + + fn make_key(pn: &str, sn: &str, key: &str) -> String { + format!("{}/{}/{}", pn, sn, key) + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let storage = self.storage.lock().unwrap(); + storage + .get(&full_key) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.insert(full_key, buf); + Ok(()) + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.remove(&full_key); + Ok(()) + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let prefix = format!("{}/{}/", primary_namespace, secondary_namespace); + let storage = self.storage.lock().unwrap(); + Ok(storage + .keys() + .filter(|k| k.starts_with(&prefix)) + .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) + .collect()) + } +} + +pub struct DelayedStore { + inner: Arc, + runtime: Arc, +} + +impl DelayedStore { + pub fn new(delay_ms: u64, runtime: Arc) -> Self { + Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime } + } +} + +impl KVStore for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(pn, sn, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(pn, sn, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(pn, sn, key).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + async move { inner.list_internal(pn, sn).await } + } +} + +impl KVStoreSync for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + self.runtime.block_on(async move { inner.list_internal(pn, sn).await }) + } +} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..07f87c0b3 --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,980 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_trace}; +use lightning::{log_debug, log_error, log_info, log_warn}; + +use tokio::sync::mpsc::{self, error::TrySendError}; + +use std::future::Future; +use std::sync::Arc; + +#[cfg(not(test))] +const BACKUP_QUEUE_CAPACITY: usize = 100; +#[cfg(test)] +const BACKUP_QUEUE_CAPACITY: usize = 5; + +/// A 3-tiered [`KVStoreSync`] implementation that manages data across +/// three distinct storage locations, i.e. primary (preferably remote) +/// store for all critical data, optional ephemeral (local) store for +/// non-critical and easily rebuildable data, and backup (preferably +/// local) to lazily backup the primary store for disaster recovery +/// scenarios. +pub(crate) struct TierStore { + inner: Arc, + runtime: Arc, + logger: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, runtime: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner, runtime, logger } + } + + /// Configures the local backup store for disaster recovery. + /// + /// This store serves as a local copy of the critical data for disaster + /// recovery scenarios. When configured, this method also spawns a background + /// task that asynchronously processes backup writes and removals to avoid + /// blocking primary store operations. + /// + /// The backup operates on a best-effort basis: + /// - Writes are queued asynchronously (non-blocking) + /// - No retry logic (We assume local store is unlikely to have transient failures). + /// - Failures are logged but don't propagate to all the way to caller. + pub fn set_backup_store(&mut self, backup: Arc) { + let (tx, rx) = mpsc::channel::(BACKUP_QUEUE_CAPACITY); + + let backup_clone = Arc::clone(&backup); + let logger = Arc::clone(&self.logger); + + self.runtime.spawn_background_task(Self::process_backup_operation( + rx, + backup_clone, + logger, + )); + + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + inner.backup_sender = Some(tx); + } + + async fn process_backup_operation( + mut receiver: mpsc::Receiver, backup_store: Arc, logger: Arc, + ) { + while let Some(op) = receiver.recv().await { + match Self::apply_backup_operation(&op, &backup_store).await { + Ok(_) => { + log_trace!( + logger, + "Backup succeeded for key {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + }, + Err(e) => { + log_error!( + logger, + "Backup failed permanently for key {}/{}/{}: {}", + op.primary_namespace(), + op.secondary_namespace(), + op.key(), + e + ); + }, + } + } + } + + async fn apply_backup_operation(op: &BackupOp, store: &Arc) -> io::Result<()> { + match op { + BackupOp::Write { primary_namespace, secondary_namespace, key, data } => { + KVStore::write( + store.as_ref(), + primary_namespace, + secondary_namespace, + key, + data.clone(), + ) + .await + }, + BackupOp::Remove { primary_namespace, secondary_namespace, key, lazy } => { + KVStore::remove(store.as_ref(), primary_namespace, secondary_namespace, key, *lazy) + .await + }, + } + } + + /// Configures the local store for non-critical data storage. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.runtime.block_on(self.inner.read_internal( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.write_internal( + primary_namespace, + secondary_namespace, + key, + buf, + )) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.remove_internal( + primary_namespace, + secondary_namespace, + key, + lazy, + )) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.runtime.block_on( + self.inner + .list_internal(primary_namespace.to_string(), secondary_namespace.to_string()), + ) + } +} + +pub struct TierStoreInner { + /// For remote data. + primary_store: Arc, + /// For local non-critical/ephemeral data. + ephemeral_store: Option>, + /// For redundancy (disaster recovery). + backup_store: Option>, + backup_sender: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + backup_sender: None, + logger, + } + } + + /// Queues data for asynchronous backup/write to the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// ## Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup queue is no longer available + fn enqueue_backup_write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let backup_res = backup_sender.try_send(BackupOp::Write { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + data: buf, + }); + if let Err(e) = backup_res { + match e { + // Assuming the channel is only full for a short time, should we explore + // retrying here to add some resiliency? + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Queues the removal of data from the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// # Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup system is no longer available + fn enqueue_backup_remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let removal_res = backup_sender.try_send(BackupOp::Remove { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + lazy, + }); + if let Err(e) = removal_res { + match e { + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => { + log_info!( + self.logger, + "Read succeeded for key: {}/{}/{}", + primary_namespace, + secondary_namespace, + key + ); + Ok(data) + }, + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => { + log_info!( + self.logger, + "List succeeded for namespace: {}/{}", + primary_namespace, + secondary_namespace + ); + return Ok(keys); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn primary_write_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + match KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ) + .await + { + Ok(()) => { + if let Err(e) = + self.enqueue_backup_write(primary_namespace, secondary_namespace, key, buf) + { + // We don't propagate backup errors here, opting to log only. + log_warn!( + self.logger, + "Failed to queue backup write for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(()) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup write due to primary write failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn primary_remove_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + match KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + { + Ok(()) => { + if let Err(e) = + self.enqueue_backup_remove(primary_namespace, secondary_namespace, key, lazy) + { + // We don't propagate backup errors here, opting to silently log. + log_warn!( + self.logger, + "Failed to queue backup removal for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(()) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup removal due to primary removal failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We only try once here (without retry logic) because local failure might be indicative + // of a more serious issue (e.g. full memory, memory corruption, permissions change) that + // do not self-resolve such that retrying would negate the latency benefits. + + // The following questions remain: + // 1. Are there situations where local transient errors may warrant a retry? + // 2. Can we reliably identify/detect these transient errors? + // 3. Should we fall back to the primary or backup stores in the event of any error? + KVStore::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Reading non-critical data from primary or backup stores."); + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + }, + _ => self.read_primary(&primary_namespace, &secondary_namespace, &key).await, + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Writing non-critical data to primary and backup stores."); + + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + }, + _ => { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }, + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Removing non-critical data from primary and backup stores."); + + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + }, + _ => { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }, + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + } else { + log_debug!( + self.logger, + "Ephemeral store not configured. Listing from primary and backup stores." + ); + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } +} + +enum BackupOp { + Write { primary_namespace: String, secondary_namespace: String, key: String, data: Vec }, + Remove { primary_namespace: String, secondary_namespace: String, key: String, lazy: bool }, +} + +impl BackupOp { + fn primary_namespace(&self) -> &str { + match self { + BackupOp::Write { primary_namespace, .. } + | BackupOp::Remove { primary_namespace, .. } => primary_namespace, + } + } + + fn secondary_namespace(&self) -> &str { + match self { + BackupOp::Write { secondary_namespace, .. } + | BackupOp::Remove { secondary_namespace, .. } => secondary_namespace, + } + } + + fn key(&self) -> &str { + match self { + BackupOp::Write { key, .. } | BackupOp::Remove { key, .. } => key, + } + } +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{ + do_read_write_remove_list_persist, random_storage_path, DelayedStore, + }; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::runtime::Runtime; + #[cfg(not(feature = "uniffi"))] + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store( + primary_store: Arc, logger: Arc, runtime: Arc, + ) -> TierStore { + TierStore::new(primary_store, runtime, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger, runtime); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + // Non-critical + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + // Critical + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn lazy_backup() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + // Immediate read from backup should fail + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_read_cm.is_err()); + + // Primary not blocked by backup hence immediate read should succeed + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(primary_read_cm.unwrap(), data); + + // Delayed read from backup should succeed + thread::sleep(Duration::from_millis(50)); + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(backup_read_cm.unwrap(), data); + } + + #[test] + fn backup_overflow_doesnt_fail_writes() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + for i in 0..=10 { + let result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + &format!("{}_{}", key, i), + data.clone(), + ); + + assert!(result.is_ok(), "Write {} should succeed", i); + } + + // Check logs for backup queue overflow message + let log_contents = std::fs::read_to_string(&log_path).unwrap(); + assert!( + log_contents.contains("Backup queue is full"), + "Logs should contain backup queue overflow message" + ); + } + + #[test] + fn lazy_removal() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + let write_result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data.clone(), + ); + assert!(write_result.is_ok(), "Write should succeed"); + + thread::sleep(Duration::from_millis(10)); + + assert_eq!( + KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .unwrap(), + data + ); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + thread::sleep(Duration::from_millis(10)); + + let res = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(res.is_err()); + } +} From b3eee95acf9aae60c655392f9ace633cac193b42 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 19 Feb 2026 09:44:55 +0100 Subject: [PATCH 02/10] Integrate TierStore into NodeBuilder Adds TierStoreConfig and two configuration methods to NodeBuilder: - set_backup_store: Configure backup store for disaster recovery - set_ephemeral_store: Configure ephemeral store for non-critical data Modifies build_with_store to wrap the provided store in a TierStore, as the primary store, applying any configured ephemeral and backup stores. Note: Temporal dead code allowance will be removed in test commit. --- src/builder.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/builder.rs b/src/builder.rs index 7641a767d..874cded72 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -54,6 +54,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, @@ -151,6 +152,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -244,6 +260,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -262,6 +279,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -271,6 +289,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -559,6 +578,33 @@ impl NodeBuilder { self } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives asynchronous copies + /// of all critical data written to the primary store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + #[allow(dead_code)] + pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + #[allow(dead_code)] + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -707,6 +753,14 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { @@ -721,6 +775,15 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = + TierStore::new(primary_store, Arc::clone(&runtime), Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -735,7 +798,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } From b1a0871d8cb3133eae000ee7674fbb19f9d62667 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 19 Feb 2026 21:30:56 +0100 Subject: [PATCH 03/10] Expose tier storage configuration across the FFI boundary Introduce FFI-safe abstractions and builder APIs to allow foreign language targets to configure custom backup and ephemeral stores when constructing nodes with a custom store. Major changes include: - Addition of FfiDynStoreTrait, an FFI-safe equivalent of DynStoreTrait, working around uniffi's lack of support for Pin> - Addition of FfiDynStore, a concrete wrapper for foreign language store implementations - Provision of FfiDynStoreTrait implementation for DynStoreWrapper to bridge native Rust stores to FFI layer (useful in testing) - Extension of ArcedNodeBuilder with methods for configuring backup and ephemeral stores - Exposure of build_with_store so foreign targets can build nodes with custom store implementations - Addition of build_node_with_store test helper to abstract uniffi-gated store wrapping at build_with_store call sites --- bindings/ldk_node.udl | 6 + src/builder.rs | 46 ++++- src/ffi/types.rs | 313 +++++++++++++++++++++++++++++++- src/io/tier_store.rs | 11 +- src/lib.rs | 12 +- src/types.rs | 8 +- tests/common/mod.rs | 20 +- tests/integration_tests_rust.rs | 22 +-- 8 files changed, 404 insertions(+), 34 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 3ec2919e7..d653afe8d 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -30,6 +30,8 @@ interface LogWriter { void log(LogRecord record); }; +typedef interface FfiDynStore; + interface Builder { constructor(); [Name=from_config] @@ -54,6 +56,8 @@ interface Builder { void set_announcement_addresses(sequence announcement_addresses); [Throws=BuildError] void set_node_alias(string node_alias); + void set_backup_store(FfiDynStore backup_store); + void set_ephemeral_store(FfiDynStore ephemeral_store); [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); @@ -69,6 +73,8 @@ interface Builder { Node build_with_vss_store_and_fixed_headers(NodeEntropy node_entropy, string vss_url, string store_id, record fixed_headers); [Throws=BuildError] Node build_with_vss_store_and_header_provider(NodeEntropy node_entropy, string vss_url, string store_id, VssHeaderProvider header_provider); + [Throws=BuildError] + Node build_with_store(NodeEntropy node_entropy, FfiDynStore store); }; interface Node { diff --git a/src/builder.rs b/src/builder.rs index 874cded72..e56ccc672 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -52,6 +52,8 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; +#[cfg(feature = "uniffi")] +use crate::ffi::FfiDynStore; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::tier_store::TierStore; @@ -584,7 +586,6 @@ impl NodeBuilder { /// of all critical data written to the primary store. /// /// Backup writes are non-blocking and do not affect primary store operation performance. - #[allow(dead_code)] pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); tier_store_config.backup = Some(backup_store); @@ -598,7 +599,6 @@ impl NodeBuilder { /// can be rebuilt if lost. /// /// If not set, non-critical data will be stored in the primary store. - #[allow(dead_code)] pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); tier_store_config.ephemeral = Some(ephemeral_store); @@ -1044,6 +1044,31 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_wallet_recovery_mode(); } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives asynchronous copies + /// of all critical data written to the primary store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + pub fn set_backup_store(&self, backup_store: Arc) { + let wrapper = DynStoreWrapper((*backup_store).clone()); + let store: Arc = Arc::new(wrapper); + self.inner.write().unwrap().set_backup_store(store); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + let wrapper = DynStoreWrapper((*ephemeral_store).clone()); + let store: Arc = Arc::new(wrapper); + self.inner.write().unwrap().set_ephemeral_store(store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1172,12 +1197,19 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - // Note that the generics here don't actually work for Uniffi, but we don't currently expose - // this so its not needed. - pub fn build_with_store( - &self, node_entropy: Arc, kv_store: S, + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store + pub fn build_with_store( + &self, node_entropy: Arc, kv_store: Arc, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) + let store = (*kv_store).clone(); + self.inner.read().unwrap().build_with_store(*node_entropy, store).map(Arc::new) } } diff --git a/src/ffi/types.rs b/src/ffi/types.rs index cc7298cfa..eca4103c8 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -12,11 +12,13 @@ use std::collections::HashMap; use std::convert::TryInto; +use std::future::Future; use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; pub use bip39::Mnemonic; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; @@ -36,6 +38,7 @@ use lightning::offers::static_invoice::StaticInvoice as LdkStaticInvoice; use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning_invoice::{Bolt11Invoice as LdkBolt11Invoice, Bolt11InvoiceDescriptionRef}; pub use lightning_invoice::{Description, SignedRawBolt11Invoice}; @@ -147,7 +150,315 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; +use crate::types::DynStoreTrait; +use crate::{hex_utils, DynStoreWrapper, SocketAddress, SyncAndAsyncKVStore, UserChannelId}; + +#[derive(Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Error))] +pub enum IOError { + NotFound, + PermissionDenied, + ConnectionRefused, + ConnectionReset, + ConnectionAborted, + NotConnected, + AddrInUse, + AddrNotAvailable, + BrokenPipe, + AlreadyExists, + WouldBlock, + InvalidInput, + InvalidData, + TimedOut, + WriteZero, + Interrupted, + UnexpectedEof, + Other, +} + +impl From for IOError { + fn from(error: bitcoin::io::Error) -> Self { + match error.kind() { + bitcoin::io::ErrorKind::NotFound => IOError::NotFound, + bitcoin::io::ErrorKind::PermissionDenied => IOError::PermissionDenied, + bitcoin::io::ErrorKind::ConnectionRefused => IOError::ConnectionRefused, + bitcoin::io::ErrorKind::ConnectionReset => IOError::ConnectionReset, + bitcoin::io::ErrorKind::ConnectionAborted => IOError::ConnectionAborted, + bitcoin::io::ErrorKind::NotConnected => IOError::NotConnected, + bitcoin::io::ErrorKind::AddrInUse => IOError::AddrInUse, + bitcoin::io::ErrorKind::AddrNotAvailable => IOError::AddrNotAvailable, + bitcoin::io::ErrorKind::BrokenPipe => IOError::BrokenPipe, + bitcoin::io::ErrorKind::AlreadyExists => IOError::AlreadyExists, + bitcoin::io::ErrorKind::WouldBlock => IOError::WouldBlock, + bitcoin::io::ErrorKind::InvalidInput => IOError::InvalidInput, + bitcoin::io::ErrorKind::InvalidData => IOError::InvalidData, + bitcoin::io::ErrorKind::TimedOut => IOError::TimedOut, + bitcoin::io::ErrorKind::WriteZero => IOError::WriteZero, + bitcoin::io::ErrorKind::Interrupted => IOError::Interrupted, + bitcoin::io::ErrorKind::UnexpectedEof => IOError::UnexpectedEof, + bitcoin::io::ErrorKind::Other => IOError::Other, + } + } +} + +impl From for bitcoin::io::Error { + fn from(error: IOError) -> Self { + match error { + IOError::NotFound => bitcoin::io::ErrorKind::NotFound.into(), + IOError::PermissionDenied => bitcoin::io::ErrorKind::PermissionDenied.into(), + IOError::ConnectionRefused => bitcoin::io::ErrorKind::ConnectionRefused.into(), + IOError::ConnectionReset => bitcoin::io::ErrorKind::ConnectionReset.into(), + IOError::ConnectionAborted => bitcoin::io::ErrorKind::ConnectionAborted.into(), + IOError::NotConnected => bitcoin::io::ErrorKind::NotConnected.into(), + IOError::AddrInUse => bitcoin::io::ErrorKind::AddrInUse.into(), + IOError::AddrNotAvailable => bitcoin::io::ErrorKind::AddrNotAvailable.into(), + IOError::BrokenPipe => bitcoin::io::ErrorKind::BrokenPipe.into(), + IOError::AlreadyExists => bitcoin::io::ErrorKind::AlreadyExists.into(), + IOError::WouldBlock => bitcoin::io::ErrorKind::WouldBlock.into(), + IOError::InvalidInput => bitcoin::io::ErrorKind::InvalidInput.into(), + IOError::InvalidData => bitcoin::io::ErrorKind::InvalidData.into(), + IOError::TimedOut => bitcoin::io::ErrorKind::TimedOut.into(), + IOError::WriteZero => bitcoin::io::ErrorKind::WriteZero.into(), + IOError::Interrupted => bitcoin::io::ErrorKind::Interrupted.into(), + IOError::UnexpectedEof => bitcoin::io::ErrorKind::UnexpectedEof.into(), + IOError::Other => bitcoin::io::ErrorKind::Other.into(), + } + } +} + +impl std::fmt::Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::NotFound => write!(f, "NotFound"), + IOError::PermissionDenied => write!(f, "PermissionDenied"), + IOError::ConnectionRefused => write!(f, "ConnectionRefused"), + IOError::ConnectionReset => write!(f, "ConnectionReset"), + IOError::ConnectionAborted => write!(f, "ConnectionAborted"), + IOError::NotConnected => write!(f, "NotConnected"), + IOError::AddrInUse => write!(f, "AddrInUse"), + IOError::AddrNotAvailable => write!(f, "AddrNotAvailable"), + IOError::BrokenPipe => write!(f, "BrokenPipe"), + IOError::AlreadyExists => write!(f, "AlreadyExists"), + IOError::WouldBlock => write!(f, "WouldBlock"), + IOError::InvalidInput => write!(f, "InvalidInput"), + IOError::InvalidData => write!(f, "InvalidData"), + IOError::TimedOut => write!(f, "TimedOut"), + IOError::WriteZero => write!(f, "WriteZero"), + IOError::Interrupted => write!(f, "Interrupted"), + IOError::UnexpectedEof => write!(f, "UnexpectedEof"), + IOError::Other => write!(f, "Other"), + } + } +} + +#[uniffi::export(with_foreign)] +#[async_trait] +pub trait FfiDynStoreTrait: Send + Sync { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; +} + +#[derive(Clone, uniffi::Object)] +pub struct FfiDynStore { + pub(crate) inner: Arc, +} + +#[uniffi::export] +impl FfiDynStore { + #[uniffi::constructor] + pub fn from_store(store: Arc) -> Self { + Self { inner: store } + } +} + +impl FfiDynStore { + pub fn from_kv_store(store: T) -> Self { + Self { inner: Arc::new(DynStoreWrapper(store)) } + } +} + +#[async_trait] +impl FfiDynStoreTrait for DynStoreWrapper { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read_async(self, &primary_namespace, &secondary_namespace, &key) + .await + .map_err(IOError::from) + } + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write_async(self, &primary_namespace, &secondary_namespace, &key, buf) + .await + .map_err(IOError::from) + } + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove_async(self, &primary_namespace, &secondary_namespace, &key, lazy) + .await + .map_err(IOError::from) + } + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list_async(self, &primary_namespace, &secondary_namespace) + .await + .map_err(IOError::from) + } + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read(self, &primary_namespace, &secondary_namespace, &key) + .map_err(IOError::from) + } + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write(self, &primary_namespace, &secondary_namespace, &key, buf) + .map_err(IOError::from) + } + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove(self, &primary_namespace, &secondary_namespace, &key, lazy) + .map_err(IOError::from) + } + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list(self, &primary_namespace, &secondary_namespace).map_err(IOError::from) + } +} + +impl KVStore for FfiDynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.read_async(primary_namespace, secondary_namespace, key).await.map_err(|e| e.into()) + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| e.into()) + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| e.into()) + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + async move { + this.list_async(primary_namespace, secondary_namespace).await.map_err(|e| e.into()) + } + } +} + +impl KVStoreSync for FfiDynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + FfiDynStoreTrait::read( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + .map_err(|e| e.into()) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + FfiDynStoreTrait::write( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + .map_err(|e| e.into()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + FfiDynStoreTrait::remove( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + .map_err(|e| e.into()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + FfiDynStoreTrait::list( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + ) + .map_err(|e| e.into()) + } +} uniffi::custom_type!(PublicKey, String, { remote, diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 07f87c0b3..fd0da6927 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -11,15 +11,15 @@ use crate::logger::{LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::DynStore; +use lightning::io; use lightning::util::persist::{ KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, }; -use lightning::{io, log_trace}; -use lightning::{log_debug, log_error, log_info, log_warn}; - -use tokio::sync::mpsc::{self, error::TrySendError}; +use lightning::{log_debug, log_error, log_info, log_trace, log_warn}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use std::future::Future; use std::sync::Arc; @@ -700,6 +700,7 @@ mod tests { }; use lightning_persister::fs_store::v1::FilesystemStore; + use super::*; use crate::io::test_utils::{ do_read_write_remove_list_persist, random_storage_path, DelayedStore, }; @@ -710,8 +711,6 @@ mod tests { use crate::types::DynStore; use crate::types::DynStoreWrapper; - use super::*; - impl RefUnwindSafe for TierStore {} struct CleanupDir(PathBuf); diff --git a/src/lib.rs b/src/lib.rs index 3e5180dcb..c91200d00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,6 +138,8 @@ use event::{EventHandler, EventQueue}; use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; #[cfg(feature = "uniffi")] use ffi::*; +#[cfg(feature = "uniffi")] +pub use ffi::{FfiDynStore, FfiDynStoreTrait, IOError}; use gossip::GossipSource; use graph::NetworkGraph; use io::utils::write_node_metrics; @@ -169,11 +171,13 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, HRNResolver, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, DynStoreWrapper, PeerDetails, SyncAndAsyncKVStore, + UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; use crate::scoring::setup_background_pathfinding_scores_sync; diff --git a/src/types.rs b/src/types.rs index 381bfbd21..8dfef74f4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -53,7 +53,7 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -133,9 +133,11 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +/// Type alias for any store that implements DynStoreTrait. +pub type DynStore = dyn DynStoreTrait; -pub(crate) struct DynStoreWrapper(pub(crate) T); +/// A wrapper that allows using any [`SyncAndAsyncKVStore`] implementor as a trait object. +pub struct DynStoreWrapper(pub T); impl DynStoreTrait for DynStoreWrapper { fn read_async( diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7854a77f2..08c41d725 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,7 +32,7 @@ use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - UserChannelId, + SyncAndAsyncKVStore, UserChannelId, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -486,7 +486,7 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); - builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + build_node_with_store(&builder, config.node_entropy, kv_store) }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), }; @@ -1691,3 +1691,19 @@ impl TestSyncStoreInner { self.do_list(primary_namespace, secondary_namespace) } } + +pub(crate) fn build_node_with_store( + builder: &Builder, entropy: NodeEntropy, store: S, +) -> TestNode { + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + builder + .build_with_store(entropy.into(), Arc::new(FfiDynStore::from_kv_store(store))) + .unwrap() + } + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(entropy, store).unwrap() + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 3fde52dc4..005b000a6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -17,14 +17,15 @@ use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ - bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, - expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, - expect_event, expect_payment_claimable_event, expect_payment_received_event, - expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, - premine_blocks, prepare_rbf, random_chain_source, random_config, random_listening_addresses, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + build_node_with_store, bump_fee_and_broadcast, distribute_funds_unconfirmed, + do_channel_full_cycle, expect_channel_pending_event, expect_channel_ready_event, + expect_channel_ready_events, expect_event, expect_payment_claimable_event, + expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, + generate_blocks_and_wait, open_channel, open_channel_push_amt, open_channel_with_all, + premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, + random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, + setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestStoreType, + TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -235,8 +236,7 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = - builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); + let node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); node.start().unwrap(); let expected_node_id = node.node_id(); @@ -275,7 +275,7 @@ async fn start_stop_reinit() { builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let reinitialized_node = - builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From 4f374e52037eb2c896305f1dbc852536cb1b9820 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Fri, 20 Feb 2026 21:05:09 +0100 Subject: [PATCH 04/10] Add integration and FFI tests for tiered storage - Add Rust integration test verifying correct routing to storage tiers - Add Python in-memory KV store and FFI test for tiered storage --- benches/payments.rs | 1 + bindings/python/src/ldk_node/kv_store.py | 115 +++++++ bindings/python/src/ldk_node/test_ldk_node.py | 280 ++++++++++++------ tests/common/mod.rs | 54 +++- tests/integration_tests_rust.rs | 86 ++++++ 5 files changed, 435 insertions(+), 101 deletions(-) create mode 100644 bindings/python/src/ldk_node/kv_store.py diff --git a/benches/payments.rs b/benches/payments.rs index 52769d794..8ded1399e 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -127,6 +127,7 @@ fn payment_benchmark(c: &mut Criterion) { true, false, common::TestStoreType::Sqlite, + common::TestStoreType::Sqlite, ); let runtime = diff --git a/bindings/python/src/ldk_node/kv_store.py b/bindings/python/src/ldk_node/kv_store.py new file mode 100644 index 000000000..5fce47561 --- /dev/null +++ b/bindings/python/src/ldk_node/kv_store.py @@ -0,0 +1,115 @@ +import threading + +from abc import ABC, abstractmethod +from typing import List + +from ldk_node import IoError + +class AbstractKvStore(ABC): + @abstractmethod + async def read_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + async def write_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + + @abstractmethod + async def remove_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + async def list_async(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + + @abstractmethod + def read(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + def write(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + + @abstractmethod + def remove(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + def list(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + +class TestKvStore(AbstractKvStore): + def __init__(self, name: str): + self.name = name + # Storage structure: {(primary_ns, secondary_ns): {key: [bytes]}} + self.storage = {} + self._lock = threading.Lock() + + def dump(self): + print(f"\n[{self.name}] Store contents:") + for (primary_ns, secondary_ns), keys_dict in self.storage.items(): + print(f" Namespace: ({primary_ns!r}, {secondary_ns!r})") + for key, data in keys_dict.items(): + print(f" Key: {key!r} -> {len(data)} bytes") + # Optionally show first few bytes + preview = data[:20] if len(data) > 20 else data + print(f" Data preview: {preview}...") + + def read(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + with self._lock: + print(f"[{self.name}] READ: {primary_namespace}/{secondary_namespace}/{key}") + namespace_key = (primary_namespace, secondary_namespace) + + if namespace_key not in self.storage: + print(f" -> namespace not found, keys: {list(self.storage.keys())}") + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + print(f" -> key not found, keys: {list(self.storage[namespace_key].keys())}") + raise IoError.NotFound() + + data = self.storage[namespace_key][key] + print(f" -> returning {len(data)} bytes") + return data + + def write(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + self.storage[namespace_key] = {} + + self.storage[namespace_key][key] = buf + + def remove(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + raise IoError.NotFound() + + del self.storage[namespace_key][key] + + if not self.storage[namespace_key]: + del self.storage[namespace_key] + + def list(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key in self.storage: + return list(self.storage[namespace_key].keys()) + return [] + + async def read_async(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + return self.read(primary_namespace, secondary_namespace, key) + + async def write_async(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + self.write(primary_namespace, secondary_namespace, key, buf) + + async def remove_async(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + self.remove(primary_namespace, secondary_namespace, key, lazy) + + async def list_async(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + return self.list(primary_namespace, secondary_namespace) + \ No newline at end of file diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 4f53dbabf..7846c3177 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -5,13 +5,30 @@ import os import re import requests +import asyncio +import threading +import ldk_node from ldk_node import * +from kv_store import TestKvStore DEFAULT_ESPLORA_SERVER_URL = "http://127.0.0.1:3002" DEFAULT_TEST_NETWORK = Network.REGTEST DEFAULT_BITCOIN_CLI_BIN = "bitcoin-cli" +class NodeSetup: + def __init__(self, node, node_id, tmp_dir, listening_addresses, stores=None): + self.node = node + self.node_id = node_id + self.tmp_dir = tmp_dir + self.listening_addresses = listening_addresses + self.stores = stores # (primary, backup, ephemeral) or None + + def cleanup(self): + self.node.stop() + time.sleep(1) + self.tmp_dir.cleanup() + def bitcoin_cli(cmd): args = [] @@ -95,7 +112,6 @@ def send_to_address(address, amount_sats): print("SEND TX:", res) return res - def setup_node(tmp_dir, esplora_endpoint, listening_addresses): mnemonic = generate_entropy_mnemonic(None) node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) @@ -107,134 +123,204 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): builder.set_listening_addresses(listening_addresses) return builder.build(node_entropy) -def get_esplora_endpoint(): - if os.environ.get('ESPLORA_ENDPOINT'): - return str(os.environ['ESPLORA_ENDPOINT']) - return DEFAULT_ESPLORA_SERVER_URL +def setup_two_nodes(esplora_endpoint, port_1=2323, port_2=2324, use_tier_store=False) -> tuple[NodeSetup, NodeSetup]: + # Setup Node 1 + tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") + print("TMP DIR 1:", tmp_dir_1.name) + + listening_addresses_1 = [f"127.0.0.1:{port_1}"] + if use_tier_store: + node_1, stores_1 = setup_node_with_tier_store(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + else: + node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + stores_1 = None + + node_1.start() + node_id_1 = node_1.node_id() + print("Node ID 1:", node_id_1) + + setup_1 = NodeSetup(node_1, node_id_1, tmp_dir_1, listening_addresses_1, stores_1) + + # Setup Node 2 + tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") + print("TMP DIR 2:", tmp_dir_2.name) + + listening_addresses_2 = [f"127.0.0.1:{port_2}"] + if use_tier_store: + node_2, stores_2 = setup_node_with_tier_store(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + else: + node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + stores_2 = None + + node_2.start() + node_id_2 = node_2.node_id() + print("Node ID 2:", node_id_2) + + setup_2 = NodeSetup(node_2, node_id_2, tmp_dir_2, listening_addresses_2, stores_2) + + return setup_1, setup_2 +def setup_node_with_tier_store(tmp_dir, esplora_endpoint, listening_addresses) -> tuple[Node, tuple[TestKvStore, TestKvStore, TestKvStore]]: + mnemonic = generate_entropy_mnemonic(None) + node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) + config = default_config() -def expect_event(node, expected_event_type): - event = node.wait_next_event() - assert isinstance(event, expected_event_type) - print("EVENT:", event) - node.event_handled() - return event + primary = TestKvStore("primary") + backup = TestKvStore("backup") + ephemeral = TestKvStore("ephemeral") + + # Set event loop for async Python callbacks from Rust + # (https://mozilla.github.io/uniffi-rs/0.27/futures.html#python-uniffi_set_event_loop) + loop = asyncio.new_event_loop() + + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + ldk_node.uniffi_set_event_loop(loop) + builder = Builder.from_config(config) + builder.set_storage_dir_path(tmp_dir) + builder.set_chain_source_esplora(esplora_endpoint, None) + builder.set_network(DEFAULT_TEST_NETWORK) + builder.set_listening_addresses(listening_addresses) + builder.set_backup_store(FfiDynStore.from_store(backup)) + builder.set_ephemeral_store(FfiDynStore.from_store(ephemeral)) + + return builder.build_with_store(node_entropy, FfiDynStore.from_store(primary)), (primary, backup, ephemeral) +def do_channel_full_cycle(setup_1: NodeSetup, setup_2: NodeSetup, esplora_endpoint): + # Fund both nodes + (node_1, node_2) = (setup_1.node, setup_2.node) + address_1 = node_1.onchain_payment().new_address() + txid_1 = send_to_address(address_1, 100000) + address_2 = node_2.onchain_payment().new_address() + txid_2 = send_to_address(address_2, 100000) -class TestLdkNode(unittest.TestCase): - def setUp(self): - bitcoin_cli("createwallet ldk_node_test") - mine(101) - time.sleep(3) - esplora_endpoint = get_esplora_endpoint() - mine_and_wait(esplora_endpoint, 1) + wait_for_tx(esplora_endpoint, txid_1) + wait_for_tx(esplora_endpoint, txid_2) - def test_channel_full_cycle(self): - esplora_endpoint = get_esplora_endpoint() + mine_and_wait(esplora_endpoint, 6) - ## Setup Node 1 - tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") - print("TMP DIR 1:", tmp_dir_1.name) + node_1.sync_wallets() + node_2.sync_wallets() - listening_addresses_1 = ["127.0.0.1:2323"] - node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) - node_1.start() - node_id_1 = node_1.node_id() - print("Node ID 1:", node_id_1) + spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats + spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats + total_balance_1 = node_1.list_balances().total_onchain_balance_sats + total_balance_2 = node_2.list_balances().total_onchain_balance_sats - # Setup Node 2 - tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") - print("TMP DIR 2:", tmp_dir_2.name) + print("SPENDABLE 1:", spendable_balance_1) + assert spendable_balance_1 == 100000 - listening_addresses_2 = ["127.0.0.1:2324"] - node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) - node_2.start() - node_id_2 = node_2.node_id() - print("Node ID 2:", node_id_2) - - address_1 = node_1.onchain_payment().new_address() - txid_1 = send_to_address(address_1, 100000) - address_2 = node_2.onchain_payment().new_address() - txid_2 = send_to_address(address_2, 100000) + print("SPENDABLE 2:", spendable_balance_2) + assert spendable_balance_2 == 100000 - wait_for_tx(esplora_endpoint, txid_1) - wait_for_tx(esplora_endpoint, txid_2) + print("TOTAL 1:", total_balance_1) + assert total_balance_1 == 100000 - mine_and_wait(esplora_endpoint, 6) + print("TOTAL 2:", total_balance_2) + assert total_balance_2 == 100000 - node_1.sync_wallets() - node_2.sync_wallets() + (node_id_2, listening_addresses_2) = (setup_2.node_id, setup_2.listening_addresses) + node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) - spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats - spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats - total_balance_1 = node_1.list_balances().total_onchain_balance_sats - total_balance_2 = node_2.list_balances().total_onchain_balance_sats + channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) + channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) + funding_txid = channel_pending_event_1.funding_txo.txid + wait_for_tx(esplora_endpoint, funding_txid) + mine_and_wait(esplora_endpoint, 6) - print("SPENDABLE 1:", spendable_balance_1) - self.assertEqual(spendable_balance_1, 100000) + node_1.sync_wallets() + node_2.sync_wallets() - print("SPENDABLE 2:", spendable_balance_2) - self.assertEqual(spendable_balance_2, 100000) + channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) + print("funding_txo:", funding_txid) - print("TOTAL 1:", total_balance_1) - self.assertEqual(total_balance_1, 100000) + channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) - print("TOTAL 2:", total_balance_2) - self.assertEqual(total_balance_2, 100000) + description = Bolt11InvoiceDescription.DIRECT("asdf") + invoice = node_2.bolt11_payment().receive(2500000, description, 9217) + node_1.bolt11_payment().send(invoice, None) - node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) + expect_event(node_1, Event.PAYMENT_SUCCESSFUL) + expect_event(node_2, Event.PAYMENT_RECEIVED) - channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) - channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) - funding_txid = channel_pending_event_1.funding_txo.txid - wait_for_tx(esplora_endpoint, funding_txid) - mine_and_wait(esplora_endpoint, 6) + node_id_1 = setup_1.node_id + node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - node_1.sync_wallets() - node_2.sync_wallets() + # expect channel closed event on both nodes + expect_event(node_1, Event.CHANNEL_CLOSED) - channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) - print("funding_txo:", funding_txid) + expect_event(node_2, Event.CHANNEL_CLOSED) - channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) + mine_and_wait(esplora_endpoint, 1) - description = Bolt11InvoiceDescription.DIRECT("asdf") - invoice = node_2.bolt11_payment().receive(2500000, description, 9217) - node_1.bolt11_payment().send(invoice, None) - - expect_event(node_1, Event.PAYMENT_SUCCESSFUL) + node_1.sync_wallets() + node_2.sync_wallets() - expect_event(node_2, Event.PAYMENT_RECEIVED) - + spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_1 > 95000 + assert spendable_balance_after_close_1 < 100000 + spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_2 == 102500 - node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) +def get_esplora_endpoint(): + if os.environ.get('ESPLORA_ENDPOINT'): + return str(os.environ['ESPLORA_ENDPOINT']) + return DEFAULT_ESPLORA_SERVER_URL - # expect channel closed event on both nodes - expect_event(node_1, Event.CHANNEL_CLOSED) - expect_event(node_2, Event.CHANNEL_CLOSED) +def expect_event(node, expected_event_type): + event = node.wait_next_event() + assert isinstance(event, expected_event_type) + print("EVENT:", event) + node.event_handled() + return event +class TestLdkNode(unittest.TestCase): + def setUp(self): + bitcoin_cli("createwallet ldk_node_test") + mine(101) + time.sleep(3) + esplora_endpoint = get_esplora_endpoint() mine_and_wait(esplora_endpoint, 1) - node_1.sync_wallets() - node_2.sync_wallets() - - spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats - assert spendable_balance_after_close_1 > 95000 - assert spendable_balance_after_close_1 < 100000 - spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats - self.assertEqual(spendable_balance_after_close_2, 102500) - - # Stop nodes - node_1.stop() - node_2.stop() - - # Cleanup - time.sleep(1) # Wait a sec so our logs can finish writing - tmp_dir_1.cleanup() - tmp_dir_2.cleanup() + def test_channel_full_cycle(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + setup_1.cleanup() + setup_2.cleanup() + + def test_tier_store(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint, port_1=2325, port_2=2326, use_tier_store=True) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + primary, backup, ephemeral = setup_1.stores + + # Wait for async backup + time.sleep(2) + + self.assertGreater(len(primary.storage), 0, "Primary should have data") + self.assertGreater(len(backup.storage), 0, "Backup should have data") + self.assertEqual(list(primary.storage.keys()), list(backup.storage.keys()), + "Backup should mirror primary") + + self.assertGreater(len(ephemeral.storage), 0, "Ephemeral should have data") + ephemeral_keys = [key for namespace in ephemeral.storage.values() for key in namespace.keys()] + has_scorer_or_graph = any(key in ['scorer', 'network_graph'] for key in ephemeral_keys) + self.assertTrue(has_scorer_or_graph, "Ephemeral should contain scorer or network_graph data") + + setup_1.cleanup() + setup_2.cleanup() if __name__ == '__main__': unittest.main() diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 08c41d725..3a63bf931 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -34,6 +34,8 @@ use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, SyncAndAsyncKVStore, UserChannelId, }; +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStore; use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; @@ -328,10 +330,20 @@ pub(crate) enum TestChainSource<'a> { BitcoindRestSync(&'a BitcoinD), } -#[derive(Clone, Copy)] +#[cfg(feature = "uniffi")] +type TestDynStore = Arc; +#[cfg(not(feature = "uniffi"))] +type TestDynStore = Arc; + +#[derive(Clone)] pub(crate) enum TestStoreType { TestSyncStore, Sqlite, + TierStore { + primary: TestDynStore, + backup: Option, + ephemeral: Option, + }, } impl Default for TestStoreType { @@ -382,6 +394,30 @@ macro_rules! setup_builder { pub(crate) use setup_builder; +pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynStore, TestDynStore) { + let primary = SqliteStore::new( + base_path.join("primary"), + Some("primary_db".to_string()), + Some("primary_kv".to_string()), + ) + .unwrap(); + let backup = FilesystemStore::new(base_path.join("backup")); + let ephemeral = TestStore::new(false); + + #[cfg(feature = "uniffi")] + { + ( + Arc::new(FfiDynStore::from_kv_store(primary)), + Arc::new(FfiDynStore::from_kv_store(backup)), + Arc::new(FfiDynStore::from_kv_store(ephemeral)), + ) + } + #[cfg(not(feature = "uniffi"))] + { + (primary, backup, ephemeral) + } +} + pub(crate) fn setup_two_nodes( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, @@ -392,21 +428,22 @@ pub(crate) fn setup_two_nodes( anchor_channels, anchors_trusted_no_reserve, TestStoreType::TestSyncStore, + TestStoreType::TestSyncStore, ) } pub(crate) fn setup_two_nodes_with_store( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, - anchors_trusted_no_reserve: bool, store_type: TestStoreType, + anchors_trusted_no_reserve: bool, store_type_a: TestStoreType, store_type_b: TestStoreType, ) -> (TestNode, TestNode) { println!("== Node A =="); let mut config_a = random_config(anchor_channels); - config_a.store_type = store_type; + config_a.store_type = store_type_a; let node_a = setup_node(chain_source, config_a); println!("\n== Node B =="); let mut config_b = random_config(anchor_channels); - config_b.store_type = store_type; + config_b.store_type = store_type_b; if allow_0conf { config_b.node_config.trusted_peers_0conf.push(node_a.node_id()); } @@ -489,6 +526,15 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> build_node_with_store(&builder, config.node_entropy, kv_store) }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), + TestStoreType::TierStore { primary, backup, ephemeral } => { + if let Some(backup) = backup { + builder.set_backup_store(backup); + } + if let Some(ephemeral) = ephemeral { + builder.set_ephemeral_store(ephemeral); + } + builder.build_with_store(config.node_entropy.into(), primary).unwrap() + }, }; if config.recovery_mode { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 005b000a6..467601447 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -38,10 +38,17 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{ + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; +use crate::common::{create_tier_stores, random_storage_path, setup_two_nodes_with_store}; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -51,6 +58,85 @@ async fn channel_full_cycle() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_tier_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (primary_a, backup_a, ephemeral_a) = create_tier_stores(random_storage_path()); + let (primary_b, backup_b, ephemeral_b) = create_tier_stores(random_storage_path()); + + let (node_a, node_b) = setup_two_nodes_with_store( + &chain_source, + false, + true, + false, + TestStoreType::TierStore { + primary: Arc::clone(&primary_a), + backup: Some(Arc::clone(&backup_a)), + ephemeral: Some(Arc::clone(&ephemeral_a)), + }, + TestStoreType::TierStore { + primary: Arc::clone(&primary_b), + backup: Some(Arc::clone(&backup_b)), + ephemeral: Some(Arc::clone(&ephemeral_b)), + }, + ); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; + + // Verify Primary store contains channel manager data + let primary_channel_manager = KVStoreSync::read( + primary_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(primary_channel_manager.is_ok(), "Primary should have channel manager data"); + + // Verify Primary store contains payment info + let primary_payments = KVStoreSync::list(primary_a.as_ref(), "payments", ""); + assert!(primary_payments.is_ok(), "Primary should have payment data"); + assert!(!primary_payments.unwrap().is_empty(), "Primary should have payment entries"); + + // Verify Backup store synced critical data + let backup_channel_manager = KVStoreSync::read( + backup_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_channel_manager.is_ok(), "Backup should have synced channel manager"); + + // Verify backup is not empty + let backup_all_keys = KVStoreSync::list(backup_a.as_ref(), "", "").unwrap(); + assert!(!backup_all_keys.is_empty(), "Backup store should not be empty"); + + // Verify Ephemeral does NOT have channel manager + let ephemeral_channel_manager = KVStoreSync::read( + ephemeral_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(ephemeral_channel_manager.is_err(), "Ephemeral should NOT have channel manager"); + + // Verify Ephemeral does NOT have payment info + let ephemeral_payments = KVStoreSync::list(ephemeral_a.as_ref(), "payments", ""); + assert!( + ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), + "Ephemeral should NOT have payment data" + ); + + //Verify Ephemeral does have network graph + let ephemeral_network_graph = KVStoreSync::read( + ephemeral_a.as_ref(), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(ephemeral_network_graph.is_ok(), "Ephemeral should have network graph"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 2a8d9b9a60acb8b7295ac672d49e0c66a5955596 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 24 Feb 2026 22:48:29 +0100 Subject: [PATCH 05/10] Add per-key version-locked writes to FfiDynStore Introduce FfiDynStoreInner with per-key write version locks that ensure write ordering and skip stale versions in both sync and async code paths. Test changes: - Unify tier store test helpers to use TestSyncStore for all tiers, replacing mixed SqliteStore/FilesystemStore/TestStore usage that caused test hangs due to TestStore's async write blocking - Split build_node_with_store into cfg-gated versions for uniffi vs non-uniffi feature flags --- src/ffi/types.rs | 314 +++++++++++++++++++++++++++++--- tests/common/mod.rs | 82 ++++++--- tests/integration_tests_rust.rs | 53 ++++-- 3 files changed, 382 insertions(+), 67 deletions(-) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index eca4103c8..61b4e244c 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -15,7 +15,8 @@ use std::convert::TryInto; use std::future::Future; use std::ops::Deref; use std::str::FromStr; -use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; @@ -148,6 +149,7 @@ use crate::builder::sanitize_alias; pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig}; pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; +use crate::io::utils::check_namespace_key_validity; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; use crate::types::DynStoreTrait; @@ -283,20 +285,46 @@ pub trait FfiDynStoreTrait: Send + Sync { #[derive(Clone, uniffi::Object)] pub struct FfiDynStore { - pub(crate) inner: Arc, + inner: Arc, + next_write_version: Arc, } #[uniffi::export] impl FfiDynStore { #[uniffi::constructor] pub fn from_store(store: Arc) -> Self { - Self { inner: store } + let inner = Arc::new(FfiDynStoreInner::new(store)); + Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) } } } impl FfiDynStore { pub fn from_kv_store(store: T) -> Self { - Self { inner: Arc::new(DynStoreWrapper(store)) } + let store = FfiDynStoreInner::new(Arc::new(DynStoreWrapper(store))); + Self { inner: Arc::new(store), next_write_version: Arc::new(AtomicU64::new(1)) } + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref( + &self, locking_key: String, + ) -> (Arc>, u64) { + let version = self.next_write_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FfiDynStore version counter overflowed"); + } + + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) } } @@ -365,7 +393,9 @@ impl KVStore for FfiDynStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); async move { - this.read_async(primary_namespace, secondary_namespace, key).await.map_err(|e| e.into()) + this.read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) } } @@ -373,13 +403,25 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); async move { - this.write_async(primary_namespace, secondary_namespace, key, buf) - .await - .map_err(|e| e.into()) + this.write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) } } @@ -387,13 +429,26 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + async move { - this.remove_async(primary_namespace, secondary_namespace, key, lazy) - .await - .map_err(|e| e.into()) + this.remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) } } @@ -401,10 +456,14 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); + async move { - this.list_async(primary_namespace, secondary_namespace).await.map_err(|e| e.into()) + this.list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) } } } @@ -413,50 +472,251 @@ impl KVStoreSync for FfiDynStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, lightning::io::Error> { - FfiDynStoreTrait::read( - self.inner.as_ref(), + self.inner.read_internal( primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), ) - .map_err(|e| e.into()) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), lightning::io::Error> { - FfiDynStoreTrait::write( - self.inner.as_ref(), + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.inner.write_internal( + inner_lock_ref, + locking_key, + version, primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), buf, ) - .map_err(|e| e.into()) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), lightning::io::Error> { - FfiDynStoreTrait::remove( - self.inner.as_ref(), + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.inner.remove_internal( + inner_lock_ref, + locking_key, + version, primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), lazy, ) - .map_err(|e| e.into()) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, lightning::io::Error> { - FfiDynStoreTrait::list( - self.inner.as_ref(), - primary_namespace.to_string(), - secondary_namespace.to_string(), - ) - .map_err(|e| e.into()) + self.inner.list_internal(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct FfiDynStoreInner { + ffi_store: Arc, + write_version_locks: Mutex>>>, +} + +impl FfiDynStoreInner { + fn new(ffi_store: Arc) -> Self { + Self { ffi_store, write_version_locks: Mutex::new(HashMap::new()) } + } + + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.write_version_locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + async fn read_internal_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store + .read_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + } + + fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store.read(primary_namespace, secondary_namespace, key).map_err(|e| e.into()) + } + + async fn write_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + fn write_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let res = { + let mut last_written_version = inner_lock_ref.blocking_lock(); + if version <= *last_written_version { + Ok(()) + } else { + self.ffi_store + .write(primary_namespace, secondary_namespace, key, buf) + .map_err(|e| e.into()) + .map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + res + } + + async fn remove_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + fn remove_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let res = { + let mut last_written_version = inner_lock_ref.blocking_lock(); + if version <= *last_written_version { + Ok(()) + } else { + self.ffi_store + .remove(primary_namespace, secondary_namespace, key, lazy) + .map_err(|e| >::into(e)) + .map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + res + } + + async fn list_internal_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store + .list_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + } + + fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store.list(primary_namespace, secondary_namespace).map_err(|e| e.into()) + } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + callback: FN, + ) -> Result<(), bitcoin::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.lock().await; + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.write_version_locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FfiDynStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 3a63bf931..fac548c9c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -30,12 +30,14 @@ use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyn use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStore; +#[cfg(not(feature = "uniffi"))] +use ldk_node::SyncAndAsyncKVStore; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - SyncAndAsyncKVStore, UserChannelId, + UserChannelId, }; -#[cfg(feature = "uniffi")] -use ldk_node::FfiDynStore; use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; @@ -333,7 +335,7 @@ pub(crate) enum TestChainSource<'a> { #[cfg(feature = "uniffi")] type TestDynStore = Arc; #[cfg(not(feature = "uniffi"))] -type TestDynStore = Arc; +type TestDynStore = Arc; #[derive(Clone)] pub(crate) enum TestStoreType { @@ -395,14 +397,9 @@ macro_rules! setup_builder { pub(crate) use setup_builder; pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynStore, TestDynStore) { - let primary = SqliteStore::new( - base_path.join("primary"), - Some("primary_db".to_string()), - Some("primary_kv".to_string()), - ) - .unwrap(); - let backup = FilesystemStore::new(base_path.join("backup")); - let ephemeral = TestStore::new(false); + let primary = TestSyncStore::new(base_path.join("primary")); + let backup = TestSyncStore::new(base_path.join("backup")); + let ephemeral = TestSyncStore::new(base_path.join("ephemeral")); #[cfg(feature = "uniffi")] { @@ -414,7 +411,7 @@ pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynSt } #[cfg(not(feature = "uniffi"))] { - (primary, backup, ephemeral) + (Arc::new(primary), Arc::new(backup), Arc::new(ephemeral)) } } @@ -523,17 +520,52 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); - build_node_with_store(&builder, config.node_entropy, kv_store) + #[cfg(feature = "uniffi")] + { + let kv_store = Arc::new(FfiDynStore::from_kv_store(kv_store)); + build_node_with_store(&builder, config.node_entropy, kv_store) + } + #[cfg(not(feature = "uniffi"))] + { + build_node_with_store(&builder, config.node_entropy, kv_store) + } }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), TestStoreType::TierStore { primary, backup, ephemeral } => { if let Some(backup) = backup { - builder.set_backup_store(backup); + #[cfg(feature = "uniffi")] + { + builder.set_backup_store(backup); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + builder.set_backup_store( + Arc::new(DynStoreWrapper((*backup).clone())) as Arc + ); + } } if let Some(ephemeral) = ephemeral { - builder.set_ephemeral_store(ephemeral); + #[cfg(feature = "uniffi")] + { + builder.set_ephemeral_store(ephemeral); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + builder.set_ephemeral_store( + Arc::new(DynStoreWrapper((*ephemeral).clone())) as Arc + ); + } + } + #[cfg(feature = "uniffi")] + { + build_node_with_store(&builder, config.node_entropy, primary) + } + #[cfg(not(feature = "uniffi"))] + { + build_node_with_store(&builder, config.node_entropy, (*primary).clone()) } - builder.build_with_store(config.node_entropy.into(), primary).unwrap() }, }; @@ -1738,18 +1770,20 @@ impl TestSyncStoreInner { } } +#[cfg(not(feature = "uniffi"))] pub(crate) fn build_node_with_store( builder: &Builder, entropy: NodeEntropy, store: S, ) -> TestNode { - #[cfg(feature = "uniffi")] { - use ldk_node::FfiDynStore; - builder - .build_with_store(entropy.into(), Arc::new(FfiDynStore::from_kv_store(store))) - .unwrap() + builder.build_with_store(entropy, store).unwrap() } - #[cfg(not(feature = "uniffi"))] +} + +#[cfg(feature = "uniffi")] +pub(crate) fn build_node_with_store( + builder: &Builder, entropy: NodeEntropy, store: Arc, +) -> TestNode { { - builder.build_with_store(entropy, store).unwrap() + builder.build_with_store(entropy.into(), store).unwrap() } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 467601447..83cb2b3ad 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -71,14 +71,14 @@ async fn channel_full_cycle_tier_store() { true, false, TestStoreType::TierStore { - primary: Arc::clone(&primary_a), - backup: Some(Arc::clone(&backup_a)), - ephemeral: Some(Arc::clone(&ephemeral_a)), + primary: primary_a.clone(), + backup: Some(backup_a.clone()), + ephemeral: Some(ephemeral_a.clone()), }, TestStoreType::TierStore { - primary: Arc::clone(&primary_b), - backup: Some(Arc::clone(&backup_b)), - ephemeral: Some(Arc::clone(&ephemeral_b)), + primary: primary_b, + backup: Some(backup_b), + ephemeral: Some(ephemeral_b), }, ); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) @@ -86,7 +86,7 @@ async fn channel_full_cycle_tier_store() { // Verify Primary store contains channel manager data let primary_channel_manager = KVStoreSync::read( - primary_a.as_ref(), + &(*primary_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -94,13 +94,13 @@ async fn channel_full_cycle_tier_store() { assert!(primary_channel_manager.is_ok(), "Primary should have channel manager data"); // Verify Primary store contains payment info - let primary_payments = KVStoreSync::list(primary_a.as_ref(), "payments", ""); + let primary_payments = KVStoreSync::list(&(*primary_a.clone()), "payments", ""); assert!(primary_payments.is_ok(), "Primary should have payment data"); assert!(!primary_payments.unwrap().is_empty(), "Primary should have payment entries"); // Verify Backup store synced critical data let backup_channel_manager = KVStoreSync::read( - backup_a.as_ref(), + &(*backup_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -108,12 +108,12 @@ async fn channel_full_cycle_tier_store() { assert!(backup_channel_manager.is_ok(), "Backup should have synced channel manager"); // Verify backup is not empty - let backup_all_keys = KVStoreSync::list(backup_a.as_ref(), "", "").unwrap(); + let backup_all_keys = KVStoreSync::list(&(*backup_a.clone()), "", "").unwrap(); assert!(!backup_all_keys.is_empty(), "Backup store should not be empty"); // Verify Ephemeral does NOT have channel manager let ephemeral_channel_manager = KVStoreSync::read( - ephemeral_a.as_ref(), + &(*ephemeral_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -121,7 +121,7 @@ async fn channel_full_cycle_tier_store() { assert!(ephemeral_channel_manager.is_err(), "Ephemeral should NOT have channel manager"); // Verify Ephemeral does NOT have payment info - let ephemeral_payments = KVStoreSync::list(ephemeral_a.as_ref(), "payments", ""); + let ephemeral_payments = KVStoreSync::list(&(*ephemeral_a.clone()), "payments", ""); assert!( ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), "Ephemeral should NOT have payment data" @@ -129,7 +129,7 @@ async fn channel_full_cycle_tier_store() { //Verify Ephemeral does have network graph let ephemeral_network_graph = KVStoreSync::read( - ephemeral_a.as_ref(), + &(*ephemeral_a.clone()), NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, @@ -322,7 +322,18 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + let node; + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + + let test_sync_store = Arc::new(FfiDynStore::from_kv_store(test_sync_store.clone())); + node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } + #[cfg(not(feature = "uniffi"))] + { + node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + } node.start().unwrap(); let expected_node_id = node.node_id(); @@ -360,8 +371,18 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = - build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + let reinitialized_node; + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + + let test_sync_store = Arc::new(FfiDynStore::from_kv_store(test_sync_store)); + reinitialized_node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } + #[cfg(not(feature = "uniffi"))] + { + reinitialized_node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From 8763af09068fe86f91630a677668c2a090452c99 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 17:32:30 +0100 Subject: [PATCH 06/10] Add documentation for ForeignDynstoreTrait --- src/ffi/types.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 61b4e244c..fbcbd1250 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -253,6 +253,9 @@ impl std::fmt::Display for IOError { } } +/// FFI-safe version of [`DynStoreTrait`]. +/// +/// [`DynStoreTrait`]: crate::types::DynStoreTrait #[uniffi::export(with_foreign)] #[async_trait] pub trait FfiDynStoreTrait: Send + Sync { From 23815f36e64e9b53f29c85f8336d030a1656fd64 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 18:36:52 +0100 Subject: [PATCH 07/10] Remove unnecessary uniffi gating in tier store --- src/io/tier_store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index fd0da6927..c14530c5f 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -707,7 +707,6 @@ mod tests { use crate::io::tier_store::TierStore; use crate::logger::Logger; use crate::runtime::Runtime; - #[cfg(not(feature = "uniffi"))] use crate::types::DynStore; use crate::types::DynStoreWrapper; From bd4e79da7c77c84651f117cdbdcbdaa9ddee1400 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 19:55:33 +0100 Subject: [PATCH 08/10] Drop DelayedStore and associated backup test These were created to test that our backup store does not impact the primary store writes but the boilerplate appears too much for the functionality being tested. --- src/io/test_utils.rs | 170 +------------------------------------------ src/io/tier_store.rs | 49 +------------ 2 files changed, 4 insertions(+), 215 deletions(-) diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 5dd36cd1a..88078b316 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -9,8 +9,7 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::Mutex; use lightning::events::ClosureReason; use lightning::io; @@ -27,8 +26,6 @@ use lightning::util::test_utils; use rand::distr::Alphanumeric; use rand::{rng, Rng}; -use crate::runtime::Runtime; - type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< &'a K, &'a test_utils::TestLogger, @@ -356,168 +353,3 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } - -struct DelayedStoreInner { - storage: Mutex>>, - delay: Duration, -} - -impl DelayedStoreInner { - fn new(delay: Duration) -> Self { - Self { storage: Mutex::new(HashMap::new()), delay } - } - - fn make_key(pn: &str, sn: &str, key: &str) -> String { - format!("{}/{}/{}", pn, sn, key) - } - - async fn read_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, - ) -> Result, io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let storage = self.storage.lock().unwrap(); - storage - .get(&full_key) - .cloned() - .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) - } - - async fn write_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, - ) -> Result<(), io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let mut storage = self.storage.lock().unwrap(); - storage.insert(full_key, buf); - Ok(()) - } - - async fn remove_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, - ) -> Result<(), io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let mut storage = self.storage.lock().unwrap(); - storage.remove(&full_key); - Ok(()) - } - - async fn list_internal( - &self, primary_namespace: String, secondary_namespace: String, - ) -> Result, io::Error> { - tokio::time::sleep(self.delay).await; - - let prefix = format!("{}/{}/", primary_namespace, secondary_namespace); - let storage = self.storage.lock().unwrap(); - Ok(storage - .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect()) - } -} - -pub struct DelayedStore { - inner: Arc, - runtime: Arc, -} - -impl DelayedStore { - pub fn new(delay_ms: u64, runtime: Arc) -> Self { - Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime } - } -} - -impl KVStore for DelayedStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> impl Future, io::Error>> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.read_internal(pn, sn, key).await } - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> impl Future> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.write_internal(pn, sn, key, buf).await } - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> impl Future> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.remove_internal(pn, sn, key).await } - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> impl Future, io::Error>> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - - async move { inner.list_internal(pn, sn).await } - } -} - -impl KVStoreSync for DelayedStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await }) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Result<(), io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await }) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> Result<(), io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await }) - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - - self.runtime.block_on(async move { inner.list_internal(pn, sn).await }) - } -} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index c14530c5f..a34297f74 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -701,9 +701,7 @@ mod tests { use lightning_persister::fs_store::v1::FilesystemStore; use super::*; - use crate::io::test_utils::{ - do_read_write_remove_list_persist, random_storage_path, DelayedStore, - }; + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; use crate::io::tier_store::TierStore; use crate::logger::Logger; use crate::runtime::Runtime; @@ -871,47 +869,6 @@ mod tests { assert_eq!(backup_read_cm.unwrap(), data); } - #[test] - fn backup_overflow_doesnt_fail_writes() { - let base_dir = random_storage_path(); - let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); - let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); - let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); - - let _cleanup = CleanupDir(base_dir.clone()); - - let primary_store: Arc = - Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); - let mut tier = - setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); - - let backup_store: Arc = - Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); - tier.set_backup_store(Arc::clone(&backup_store)); - - let data = vec![42u8; 32]; - - let key = CHANNEL_MANAGER_PERSISTENCE_KEY; - for i in 0..=10 { - let result = KVStoreSync::write( - &tier, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - &format!("{}_{}", key, i), - data.clone(), - ); - - assert!(result.is_ok(), "Write {} should succeed", i); - } - - // Check logs for backup queue overflow message - let log_contents = std::fs::read_to_string(&log_path).unwrap(); - assert!( - log_contents.contains("Backup queue is full"), - "Logs should contain backup queue overflow message" - ); - } - #[test] fn lazy_removal() { let base_dir = random_storage_path(); @@ -927,7 +884,7 @@ mod tests { setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); let backup_store: Arc = - Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); tier.set_backup_store(Arc::clone(&backup_store)); let data = vec![42u8; 32]; @@ -942,7 +899,7 @@ mod tests { ); assert!(write_result.is_ok(), "Write should succeed"); - thread::sleep(Duration::from_millis(10)); + thread::sleep(Duration::from_millis(100)); assert_eq!( KVStoreSync::read( From 6fce4385b1548afdc98cb61e0c0a5904cf1fe096 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 20:27:57 +0100 Subject: [PATCH 09/10] Remove spammy logs --- src/io/tier_store.rs | 46 +++++++------------------------------------- 1 file changed, 7 insertions(+), 39 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index a34297f74..c8bc508ff 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -17,9 +17,9 @@ use lightning::util::persist::{ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, }; -use lightning::{log_debug, log_error, log_info, log_trace, log_warn}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use lightning::{log_error, log_warn}; + +use tokio::sync::mpsc::{self, error::TrySendError}; use std::future::Future; use std::sync::Arc; @@ -86,15 +86,7 @@ impl TierStore { ) { while let Some(op) = receiver.recv().await { match Self::apply_backup_operation(&op, &backup_store).await { - Ok(_) => { - log_trace!( - logger, - "Backup succeeded for key {}/{}/{}", - op.primary_namespace(), - op.secondary_namespace(), - op.key() - ); - }, + Ok(_) => {}, Err(e) => { log_error!( logger, @@ -381,16 +373,7 @@ impl TierStoreInner { ) .await { - Ok(data) => { - log_info!( - self.logger, - "Read succeeded for key: {}/{}/{}", - primary_namespace, - secondary_namespace, - key - ); - Ok(data) - }, + Ok(data) => Ok(data), Err(e) => { log_error!( self.logger, @@ -413,12 +396,6 @@ impl TierStoreInner { .await { Ok(keys) => { - log_info!( - self.logger, - "List succeeded for namespace: {}/{}", - primary_namespace, - secondary_namespace - ); return Ok(keys); }, Err(e) => { @@ -464,7 +441,7 @@ impl TierStoreInner { Ok(()) }, Err(e) => { - log_debug!( + log_error!( self.logger, "Skipping backup write due to primary write failure for key: {}/{}/{}.", primary_namespace, @@ -506,7 +483,7 @@ impl TierStoreInner { Ok(()) }, Err(e) => { - log_debug!( + log_error!( self.logger, "Skipping backup removal due to primary removal failure for key: {}/{}/{}.", primary_namespace, @@ -548,7 +525,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Reading non-critical data from primary or backup stores."); self.read_primary(&primary_namespace, &secondary_namespace, &key).await } }, @@ -572,8 +548,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Writing non-critical data to primary and backup stores."); - self.primary_write_then_schedule_backup( primary_namespace.as_str(), secondary_namespace.as_str(), @@ -611,8 +585,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Removing non-critical data from primary and backup stores."); - self.primary_remove_then_schedule_backup( primary_namespace.as_str(), secondary_namespace.as_str(), @@ -646,10 +618,6 @@ impl TierStoreInner { if let Some(eph_store) = self.ephemeral_store.as_ref() { KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) } else { - log_debug!( - self.logger, - "Ephemeral store not configured. Listing from primary and backup stores." - ); self.list_primary(&primary_namespace, &secondary_namespace).await } }, From 35f9ec27c4deab7c5a6534d7c63fc7874093bef0 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 21:44:10 +0100 Subject: [PATCH 10/10] DRY ephemeral key matching, fix visibility and appropriate kvstore usage - Restrict `TierStoreInner` visibility from `pub` to `pub(crate)` - Primary store can be either local or remote - Extract repeated ephemeral key matching into a standalone `is_ephemerally_cached_key` helper to DRY up `read_internal`, `write_internal`, and `remove_internal` - Replace `KVStoreSync::list` with async `KVStore::list` in `list_internal` to avoid blocking the async runtime --- src/io/tier_store.rs | 163 +++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 92 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index c8bc508ff..41997b049 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -230,8 +230,8 @@ impl KVStoreSync for TierStore { } } -pub struct TierStoreInner { - /// For remote data. +struct TierStoreInner { + /// For local or remote data. primary_store: Arc, /// For local non-critical/ephemeral data. ephemeral_store: Option>, @@ -395,9 +395,7 @@ impl TierStoreInner { match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) .await { - Ok(keys) => { - return Ok(keys); - }, + Ok(keys) => Ok(keys), Err(e) => { log_error!( self.logger, @@ -505,104 +503,76 @@ impl TierStoreInner { "read", )?; - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = self.ephemeral_store.as_ref() { - // We only try once here (without retry logic) because local failure might be indicative - // of a more serious issue (e.g. full memory, memory corruption, permissions change) that - // do not self-resolve such that retrying would negate the latency benefits. - - // The following questions remain: - // 1. Are there situations where local transient errors may warrant a retry? - // 2. Can we reliably identify/detect these transient errors? - // 3. Should we fall back to the primary or backup stores in the event of any error? - KVStore::read( - eph_store.as_ref(), - &primary_namespace, - &secondary_namespace, - &key, - ) - .await - } else { - self.read_primary(&primary_namespace, &secondary_namespace, &key).await - } - }, - _ => self.read_primary(&primary_namespace, &secondary_namespace, &key).await, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + // We only try once here (without retry logic) because local failure might be indicative + // of a more serious issue (e.g. full memory, memory corruption, permissions change) that + // do not self-resolve such that retrying would negate the latency benefits. + + // The following questions remain: + // 1. Are there situations where local transient errors may warrant a retry? + // 2. Can we reliably identify/detect these transient errors? + // 3. Should we fall back to the primary or backup stores in the event of any error? + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await } } async fn write_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, ) -> io::Result<()> { - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = &self.ephemeral_store { - KVStore::write( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - } else { - self.primary_write_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - } - }, - _ => { - self.primary_write_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - }, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await } } async fn remove_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, ) -> io::Result<()> { - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = &self.ephemeral_store { - KVStore::remove( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - } else { - self.primary_remove_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - } - }, - _ => { - self.primary_remove_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - }, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await } } @@ -616,7 +586,8 @@ impl TierStoreInner { ) | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { if let Some(eph_store) = self.ephemeral_store.as_ref() { - KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await } else { self.list_primary(&primary_namespace, &secondary_namespace).await } @@ -653,6 +624,14 @@ impl BackupOp { } } +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + #[cfg(test)] mod tests { use std::panic::RefUnwindSafe;