From aeaf994fa8e7d0d2d7860eb6e438028ba81e3b28 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 10 Mar 2026 18:47:01 +0100 Subject: [PATCH 1/4] rewrite storage variable size writes --- crates/storage/src/qmdb_impl.rs | 113 +++++++++++++++++++++++--------- crates/storage/src/types.rs | 54 ++------------- 2 files changed, 90 insertions(+), 77 deletions(-) diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 48922eb..b58d97b 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -13,20 +13,15 @@ use crate::cache::{CachedValue, ShardedDbCache}; use crate::metrics::OptionalMetrics; -use crate::types::{ - create_storage_key, create_storage_value_chunk, extract_value_from_chunk, StorageKey, - StorageValueChunk, MAX_VALUE_DATA_SIZE, -}; +use crate::types::{create_storage_key, StorageKey, MAX_VALUE_DATA_SIZE}; use async_trait::async_trait; +use commonware_codec::RangeCfg; use commonware_cryptography::sha256::Sha256; use commonware_runtime::{ buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage, }; +use commonware_storage::qmdb::current::{unordered::variable::Db, VariableConfig}; use commonware_storage::translator::EightCap; -use commonware_storage::{ - qmdb::current::{unordered::fixed::Db, FixedConfig}, - Persistable, -}; use evolve_core::{ErrorCode, ReadonlyKV}; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::sync::Arc; @@ -37,11 +32,11 @@ use tokio::sync::RwLock; /// Type alias for QMDB in Current state. /// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`. -type QmdbCurrent = Db; +type QmdbCurrent = Db, Sha256, EightCap, 64>; #[derive(Debug)] struct PreparedBatch { - updates: Vec<(StorageKey, Option)>, + updates: Vec<(StorageKey, Option>)>, keys_to_invalidate: Vec>, ops_count: usize, sets: usize, @@ -57,7 +52,7 @@ pub enum StorageError { #[error("Key error")] Key(ErrorCode), - #[error("Value too large for single chunk: {size} bytes (max: {max})")] + #[error("Value too large: {size} bytes (max: {max})")] ValueTooLarge { size: usize, max: usize }, #[error("IO error: {0}")] @@ -172,10 +167,12 @@ where StorageError::InvalidConfig("write_buffer_size must be non-zero".to_string()) })?; - let qmdb_config = FixedConfig { - log_journal_partition: format!("{}_log-journal", config.partition_prefix), + let qmdb_config = VariableConfig { + log_partition: format!("{}_log-journal", config.partition_prefix), log_items_per_blob: NonZeroU64::new(1000).unwrap(), log_write_buffer: write_buffer_size, + log_compression: None, + log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_DATA_SIZE), ())), mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix), mmr_items_per_blob: NonZeroU64::new(1000).unwrap(), mmr_write_buffer: write_buffer_size, @@ -313,9 +310,8 @@ where sets += 1; let storage_key = create_storage_key(&key)?; - let storage_value = create_storage_value_chunk(&value)?; keys_to_invalidate.push(key); - updates.push((storage_key, Some(storage_value))); + updates.push((storage_key, Some(value))); } crate::types::Operation::Remove { key } => { deletes += 1; @@ -338,8 +334,10 @@ where /// Commit the current state and generate a commit hash. pub async fn commit_state(&self) -> Result { let start = Instant::now(); - let db = self.db.read().await; - db.commit().await.map_err(map_qmdb_error)?; + let mut db = self.db.write().await; + let prune_loc = db.inactivity_floor_loc(); + db.prune(prune_loc).await.map_err(map_qmdb_error)?; + db.sync().await.map_err(map_qmdb_error)?; let hash = root_to_commit_hash(db.root())?; self.metrics.record_commit(start.elapsed().as_secs_f64()); Ok(hash) @@ -429,17 +427,10 @@ where } fn decode_storage_value( - result: Result, impl std::fmt::Display>, + result: Result>, impl std::fmt::Display>, ) -> Result>, ErrorCode> { match result { - Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) { - Some(data) if data.is_empty() => Ok(None), - Some(data) => Ok(Some(data)), - None => { - tracing::warn!("Invalid value chunk format, treating as absent"); - Ok(None) - } - }, + Ok(Some(value)) => Ok(Some(value)), Ok(None) => Ok(None), Err(e) => { let err_str = e.to_string(); @@ -1043,8 +1034,6 @@ mod tests { runner.start(|context| async move { let storage = QmdbStorage::new(context, config).await.unwrap(); - // Empty values are treated as removed (since we use all-zeros to signal removal) - // This is a known limitation storage .apply_batch(vec![crate::types::Operation::Set { key: b"empty".to_vec(), @@ -1053,9 +1042,8 @@ mod tests { .await .unwrap(); - // Empty value should be treated as None (due to removal semantics) let result = storage.get(b"empty").unwrap(); - assert_eq!(result, None); + assert_eq!(result, Some(Vec::new())); }) } @@ -1325,6 +1313,71 @@ mod tests { }) } + #[test] + fn test_commit_prunes_inactive_history() { + let temp_dir = TempDir::new().unwrap(); + let config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + + runner.start(|context| async move { + use commonware_storage::qmdb::store::LogStore; + + let storage = QmdbStorage::new(context, config).await.unwrap(); + const KEYS: usize = 1_100; + + let initial_ops = (0..KEYS) + .map(|i| crate::types::Operation::Set { + key: format!("key-{i}").into_bytes(), + value: format!("value-{i}-v1").into_bytes(), + }) + .collect(); + storage.apply_batch(initial_ops).await.unwrap(); + storage.commit_state().await.unwrap(); + + let start_before = { + let db = storage.db.read().await; + *db.bounds().await.start + }; + + let second_ops = (0..KEYS) + .map(|i| { + if i % 5 == 0 { + crate::types::Operation::Remove { + key: format!("key-{i}").into_bytes(), + } + } else { + crate::types::Operation::Set { + key: format!("key-{i}").into_bytes(), + value: format!("value-{i}-v2").into_bytes(), + } + } + }) + .collect(); + storage.apply_batch(second_ops).await.unwrap(); + storage.commit_state().await.unwrap(); + + let start_after = { + let db = storage.db.read().await; + *db.bounds().await.start + }; + + assert!( + start_after > start_before, + "prune boundary did not advance: before={start_before}, after={start_after}" + ); + assert_eq!(storage.get(b"key-1").unwrap(), Some(b"value-1-v2".to_vec())); + assert_eq!(storage.get(b"key-0").unwrap(), None); + }) + } + #[test] fn test_preview_batch_root_matches_eventual_commit_hash() { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index b9b7bcc..83a3c81 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -143,12 +143,9 @@ pub const MAX_KEY_SIZE: usize = evolve_core::MAX_STORAGE_KEY_SIZE; const _: () = assert!(MAX_KEY_SIZE + KEY_LENGTH_PREFIX_SIZE == STORAGE_KEY_SIZE); pub const MAX_BATCH_SIZE: usize = 10_000; // 10k operations -// For commonware integration, we use fixed-size types -// Keys are 256 bytes (padded with zeros if needed) +// For commonware integration, keys use a fixed-size envelope while values stay variable-sized. +// Keys are 256 bytes (padded with zeros if needed). pub type StorageKey = FixedBytes; -// Values are stored in 4KB chunks -pub const STORAGE_VALUE_SIZE: usize = 4096; -pub type StorageValueChunk = FixedBytes; /// Helper functions for creating storage keys pub fn create_storage_key(key: &[u8]) -> Result { @@ -164,47 +161,10 @@ pub fn create_storage_key(key: &[u8]) -> Result { Ok(StorageKey::new(data)) } -/// Length prefix size for value storage (4 bytes for u32 length) -pub const VALUE_LENGTH_PREFIX_SIZE: usize = 4; - -/// Maximum actual value size (chunk size minus length prefix) -pub const MAX_VALUE_DATA_SIZE: usize = STORAGE_VALUE_SIZE - VALUE_LENGTH_PREFIX_SIZE; /// Maximum value size accepted by the storage layer. -pub const MAX_VALUE_SIZE: usize = MAX_VALUE_DATA_SIZE; - -/// Helper function for creating storage value chunks /// -/// Stores value with a 4-byte length prefix to preserve exact data semantics. -/// Format: [len_u32_le][data][padding] -pub fn create_storage_value_chunk(value: &[u8]) -> Result { - if value.len() > MAX_VALUE_DATA_SIZE { - return Err(ERR_VALUE_TOO_LARGE); - } - - let mut data = [0u8; STORAGE_VALUE_SIZE]; - // Store length as 4-byte little-endian prefix - let len_bytes = (value.len() as u32).to_le_bytes(); - data[..VALUE_LENGTH_PREFIX_SIZE].copy_from_slice(&len_bytes); - // Store actual value after length prefix - data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + value.len()].copy_from_slice(value); - - Ok(StorageValueChunk::new(data)) -} - -/// Extract value from storage chunk by reading length prefix -/// -/// Returns the exact bytes that were stored, preserving trailing zeros. -pub fn extract_value_from_chunk(chunk: &StorageValueChunk) -> Option> { - let data = chunk.as_ref(); - // Read length from 4-byte little-endian prefix - let len_bytes: [u8; 4] = data[..VALUE_LENGTH_PREFIX_SIZE].try_into().ok()?; - let len = u32::from_le_bytes(len_bytes) as usize; - - // Validate length - if len > MAX_VALUE_DATA_SIZE { - return None; - } - - // Extract exactly 'len' bytes of actual data - Some(data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + len].to_vec()) -} +/// We keep the existing 4092-byte limit to preserve bounded memory usage even though the +/// underlying QMDB encoding is now variable-sized. +pub const MAX_VALUE_SIZE: usize = 4092; +/// Backwards-compatible alias used by existing tests and callers. +pub const MAX_VALUE_DATA_SIZE: usize = MAX_VALUE_SIZE; From 79fe31240972757c0c02b345ef235548c5d4c796 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 10 Mar 2026 19:12:39 +0100 Subject: [PATCH 2/4] async pruning --- crates/storage/src/qmdb_impl.rs | 81 +++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index b58d97b..54be34a 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -25,15 +25,21 @@ use commonware_storage::translator::EightCap; use evolve_core::{ErrorCode, ReadonlyKV}; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use thiserror::Error; use tokio::runtime::RuntimeFlavor; -use tokio::sync::RwLock; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + RwLock, +}; /// Type alias for QMDB in Current state. /// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`. type QmdbCurrent = Db, Sha256, EightCap, 64>; +const PRUNE_SCHEDULE_DELAY: Duration = Duration::from_millis(50); +const PRUNE_RETRY_DELAY: Duration = Duration::from_millis(25); + #[derive(Debug)] struct PreparedBatch { updates: Vec<(StorageKey, Option>)>, @@ -109,6 +115,7 @@ where C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { db: Arc>>, + prune_tx: UnboundedSender<()>, /// Read cache for fast synchronous lookups cache: Arc, /// Optional metrics for monitoring storage performance @@ -122,6 +129,7 @@ where fn clone(&self) -> Self { Self { db: self.db.clone(), + prune_tx: self.prune_tx.clone(), cache: self.cache.clone(), metrics: self.metrics.clone(), } @@ -132,6 +140,42 @@ impl QmdbStorage where C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { + fn spawn_prune_worker(db: Arc>>) -> UnboundedSender<()> { + let (prune_tx, mut prune_rx) = unbounded_channel::<()>(); + + tokio::spawn(async move { + while prune_rx.recv().await.is_some() { + tokio::time::sleep(PRUNE_SCHEDULE_DELAY).await; + while prune_rx.try_recv().is_ok() {} + + loop { + let mut db = match db.try_write() { + Ok(db) => db, + Err(_) => { + while prune_rx.try_recv().is_ok() {} + tokio::time::sleep(PRUNE_RETRY_DELAY).await; + continue; + } + }; + + let prune_loc = db.inactivity_floor_loc(); + if let Err(err) = db.prune(prune_loc).await { + tracing::error!("background prune failed: {err}"); + break; + } + + if let Err(err) = db.sync().await { + tracing::error!("background prune sync failed: {err}"); + } + + break; + } + } + }); + + prune_tx + } + /// Create a new QmdbStorage instance pub async fn new( context: C, @@ -186,12 +230,16 @@ where page_cache: CacheRef::from_pooler(&context, page_size, capacity), }; - let db = Db::init(context, qmdb_config) - .await - .map_err(map_qmdb_error)?; + let db = Arc::new(RwLock::new( + Db::init(context, qmdb_config) + .await + .map_err(map_qmdb_error)?, + )); + let prune_tx = Self::spawn_prune_worker(db.clone()); Ok(Self { - db: Arc::new(RwLock::new(db)), + db, + prune_tx, cache: Arc::new(ShardedDbCache::with_defaults()), metrics, }) @@ -334,11 +382,11 @@ where /// Commit the current state and generate a commit hash. pub async fn commit_state(&self) -> Result { let start = Instant::now(); - let mut db = self.db.write().await; - let prune_loc = db.inactivity_floor_loc(); - db.prune(prune_loc).await.map_err(map_qmdb_error)?; + let db = self.db.write().await; db.sync().await.map_err(map_qmdb_error)?; let hash = root_to_commit_hash(db.root())?; + drop(db); + let _ = self.prune_tx.send(()); self.metrics.record_commit(start.elapsed().as_secs_f64()); Ok(hash) } @@ -1364,10 +1412,17 @@ mod tests { storage.apply_batch(second_ops).await.unwrap(); storage.commit_state().await.unwrap(); - let start_after = { - let db = storage.db.read().await; - *db.bounds().await.start - }; + let mut start_after = start_before; + for _ in 0..50 { + start_after = { + let db = storage.db.read().await; + *db.bounds().await.start + }; + if start_after > start_before { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } assert!( start_after > start_before, From 997b1b82581adcd94b9886ab7c85219110791868 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 10 Mar 2026 19:22:14 +0100 Subject: [PATCH 3/4] simplify --- crates/storage/src/qmdb_impl.rs | 61 ++++++++++++++++++--------------- crates/storage/src/types.rs | 2 -- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 54be34a..8736ae1 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -13,7 +13,7 @@ use crate::cache::{CachedValue, ShardedDbCache}; use crate::metrics::OptionalMetrics; -use crate::types::{create_storage_key, StorageKey, MAX_VALUE_DATA_SIZE}; +use crate::types::{create_storage_key, StorageKey, MAX_VALUE_SIZE}; use async_trait::async_trait; use commonware_codec::RangeCfg; use commonware_cryptography::sha256::Sha256; @@ -39,6 +39,7 @@ type QmdbCurrent = Db, Sha256, EightCap, 64>; const PRUNE_SCHEDULE_DELAY: Duration = Duration::from_millis(50); const PRUNE_RETRY_DELAY: Duration = Duration::from_millis(25); +const PRUNE_MAX_LOCK_RETRIES: usize = 20; #[derive(Debug)] struct PreparedBatch { @@ -146,29 +147,33 @@ where tokio::spawn(async move { while prune_rx.recv().await.is_some() { tokio::time::sleep(PRUNE_SCHEDULE_DELAY).await; + // Drain any signals that arrived during the delay. while prune_rx.try_recv().is_ok() {} - loop { - let mut db = match db.try_write() { - Ok(db) => db, - Err(_) => { - while prune_rx.try_recv().is_ok() {} - tokio::time::sleep(PRUNE_RETRY_DELAY).await; - continue; + // Acquire write lock with bounded retries. + let mut db_guard = None; + for _ in 0..PRUNE_MAX_LOCK_RETRIES { + match db.try_write() { + Ok(guard) => { + db_guard = Some(guard); + break; } - }; - - let prune_loc = db.inactivity_floor_loc(); - if let Err(err) = db.prune(prune_loc).await { - tracing::error!("background prune failed: {err}"); - break; + Err(_) => tokio::time::sleep(PRUNE_RETRY_DELAY).await, } + } - if let Err(err) = db.sync().await { - tracing::error!("background prune sync failed: {err}"); - } + let Some(mut db) = db_guard else { + tracing::warn!("prune worker: could not acquire write lock, skipping cycle"); + continue; + }; - break; + let prune_loc = db.inactivity_floor_loc(); + if let Err(err) = db.prune(prune_loc).await { + tracing::error!("background prune failed: {err}"); + continue; + } + if let Err(err) = db.sync().await { + tracing::error!("background prune sync failed: {err}"); } } }); @@ -216,7 +221,7 @@ where log_items_per_blob: NonZeroU64::new(1000).unwrap(), log_write_buffer: write_buffer_size, log_compression: None, - log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_DATA_SIZE), ())), + log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_SIZE), ())), mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix), mmr_items_per_blob: NonZeroU64::new(1000).unwrap(), mmr_write_buffer: write_buffer_size, @@ -349,10 +354,10 @@ where for op in operations { match op { crate::types::Operation::Set { key, value } => { - if value.len() > MAX_VALUE_DATA_SIZE { + if value.len() > MAX_VALUE_SIZE { return Err(StorageError::ValueTooLarge { size: value.len(), - max: MAX_VALUE_DATA_SIZE, + max: MAX_VALUE_SIZE, }); } @@ -382,7 +387,7 @@ where /// Commit the current state and generate a commit hash. pub async fn commit_state(&self) -> Result { let start = Instant::now(); - let db = self.db.write().await; + let db = self.db.read().await; db.sync().await.map_err(map_qmdb_error)?; let hash = root_to_commit_hash(db.root())?; drop(db); @@ -779,8 +784,8 @@ mod tests { runner.start(|context| async move { let storage = QmdbStorage::new(context, config).await.unwrap(); - // Value exactly at MAX_VALUE_DATA_SIZE limit should work - let max_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE]; + // Value exactly at MAX_VALUE_SIZE limit should work + let max_value = vec![b'v'; crate::types::MAX_VALUE_SIZE]; storage .apply_batch(vec![crate::types::Operation::Set { key: b"key".to_vec(), @@ -792,8 +797,8 @@ mod tests { let retrieved = storage.get(b"key").unwrap(); assert_eq!(retrieved, Some(max_value)); - // Value exceeding MAX_VALUE_DATA_SIZE should fail - let oversized_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE + 1]; + // Value exceeding MAX_VALUE_SIZE should fail + let oversized_value = vec![b'v'; crate::types::MAX_VALUE_SIZE + 1]; let result = storage .apply_batch(vec![crate::types::Operation::Set { key: b"key2".to_vec(), @@ -804,8 +809,8 @@ mod tests { assert!(result.is_err()); match result.unwrap_err() { StorageError::ValueTooLarge { size, max } => { - assert_eq!(size, crate::types::MAX_VALUE_DATA_SIZE + 1); - assert_eq!(max, crate::types::MAX_VALUE_DATA_SIZE); + assert_eq!(size, crate::types::MAX_VALUE_SIZE + 1); + assert_eq!(max, crate::types::MAX_VALUE_SIZE); } e => panic!("Expected ValueTooLarge, got {:?}", e), } diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index 83a3c81..8ff405c 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -166,5 +166,3 @@ pub fn create_storage_key(key: &[u8]) -> Result { /// We keep the existing 4092-byte limit to preserve bounded memory usage even though the /// underlying QMDB encoding is now variable-sized. pub const MAX_VALUE_SIZE: usize = 4092; -/// Backwards-compatible alias used by existing tests and callers. -pub const MAX_VALUE_DATA_SIZE: usize = MAX_VALUE_SIZE; From 3700c04e6b4818a6014b5a0d9395195e30c61823 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 10 Mar 2026 19:26:29 +0100 Subject: [PATCH 4/4] add test case --- crates/storage/src/qmdb_impl.rs | 47 +++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 8736ae1..b37ef6d 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -1438,6 +1438,53 @@ mod tests { }) } + #[test] + fn test_variable_codec_survives_reopen() { + let temp_dir = TempDir::new().unwrap(); + let config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + + runner.start(|context| async move { + let config2 = config.clone(); + let ctx2 = context.clone(); + + let storage = QmdbStorage::new(context, config).await.unwrap(); + storage + .apply_batch(vec![ + crate::types::Operation::Set { + key: b"k1".to_vec(), + value: b"hello".to_vec(), + }, + crate::types::Operation::Set { + key: b"k2".to_vec(), + value: vec![0u8; 100], + }, + crate::types::Operation::Set { + key: b"k3".to_vec(), + value: Vec::new(), + }, + ]) + .await + .unwrap(); + storage.commit_state().await.unwrap(); + drop(storage); + + // Reopen from the same directory and verify values survived. + let reopened = QmdbStorage::new(ctx2, config2).await.unwrap(); + assert_eq!(reopened.get(b"k1").unwrap(), Some(b"hello".to_vec())); + assert_eq!(reopened.get(b"k2").unwrap(), Some(vec![0u8; 100])); + assert_eq!(reopened.get(b"k3").unwrap(), Some(Vec::new())); + }) + } + #[test] fn test_preview_batch_root_matches_eventual_commit_hash() { let temp_dir = TempDir::new().unwrap();