Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 203 additions & 43 deletions crates/storage/src/qmdb_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,37 @@

use crate::cache::{CachedValue, ShardedDbCache};
use crate::metrics::OptionalMetrics;
use crate::types::{
create_storage_key, create_storage_value_chunk, extract_value_from_chunk, StorageKey,
StorageValueChunk, MAX_VALUE_DATA_SIZE,
};
use crate::types::{create_storage_key, StorageKey, MAX_VALUE_SIZE};
use async_trait::async_trait;
use commonware_codec::RangeCfg;
use commonware_cryptography::sha256::Sha256;
use commonware_runtime::{
buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage,
};
use commonware_storage::qmdb::current::{unordered::variable::Db, VariableConfig};
use commonware_storage::translator::EightCap;
use commonware_storage::{
qmdb::current::{unordered::fixed::Db, FixedConfig},
Persistable,
};
use evolve_core::{ErrorCode, ReadonlyKV};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::runtime::RuntimeFlavor;
use tokio::sync::RwLock;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
RwLock,
};

/// Type alias for QMDB in Current state.
/// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`.
type QmdbCurrent<C> = Db<C, StorageKey, StorageValueChunk, Sha256, EightCap, 64>;
type QmdbCurrent<C> = Db<C, StorageKey, Vec<u8>, Sha256, EightCap, 64>;

const PRUNE_SCHEDULE_DELAY: Duration = Duration::from_millis(50);
const PRUNE_RETRY_DELAY: Duration = Duration::from_millis(25);
const PRUNE_MAX_LOCK_RETRIES: usize = 20;

#[derive(Debug)]
struct PreparedBatch {
updates: Vec<(StorageKey, Option<StorageValueChunk>)>,
updates: Vec<(StorageKey, Option<Vec<u8>>)>,
keys_to_invalidate: Vec<Vec<u8>>,
ops_count: usize,
sets: usize,
Expand All @@ -57,7 +59,7 @@ pub enum StorageError {
#[error("Key error")]
Key(ErrorCode),

#[error("Value too large for single chunk: {size} bytes (max: {max})")]
#[error("Value too large: {size} bytes (max: {max})")]
ValueTooLarge { size: usize, max: usize },

#[error("IO error: {0}")]
Expand Down Expand Up @@ -114,6 +116,7 @@ where
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
db: Arc<RwLock<QmdbCurrent<C>>>,
prune_tx: UnboundedSender<()>,
/// Read cache for fast synchronous lookups
cache: Arc<ShardedDbCache>,
/// Optional metrics for monitoring storage performance
Expand All @@ -127,6 +130,7 @@ where
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
prune_tx: self.prune_tx.clone(),
cache: self.cache.clone(),
metrics: self.metrics.clone(),
}
Expand All @@ -137,6 +141,46 @@ impl<C> QmdbStorage<C>
where
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
fn spawn_prune_worker(db: Arc<RwLock<QmdbCurrent<C>>>) -> UnboundedSender<()> {
let (prune_tx, mut prune_rx) = unbounded_channel::<()>();

tokio::spawn(async move {
while prune_rx.recv().await.is_some() {
tokio::time::sleep(PRUNE_SCHEDULE_DELAY).await;
// Drain any signals that arrived during the delay.
while prune_rx.try_recv().is_ok() {}

// Acquire write lock with bounded retries.
let mut db_guard = None;
for _ in 0..PRUNE_MAX_LOCK_RETRIES {
match db.try_write() {
Ok(guard) => {
db_guard = Some(guard);
break;
}
Err(_) => tokio::time::sleep(PRUNE_RETRY_DELAY).await,
}
}

let Some(mut db) = db_guard else {
tracing::warn!("prune worker: could not acquire write lock, skipping cycle");
continue;
};

let prune_loc = db.inactivity_floor_loc();
if let Err(err) = db.prune(prune_loc).await {
tracing::error!("background prune failed: {err}");
continue;
}
if let Err(err) = db.sync().await {
tracing::error!("background prune sync failed: {err}");
}
}
});

prune_tx
}

/// Create a new QmdbStorage instance
pub async fn new(
context: C,
Expand Down Expand Up @@ -172,10 +216,12 @@ where
StorageError::InvalidConfig("write_buffer_size must be non-zero".to_string())
})?;

let qmdb_config = FixedConfig {
log_journal_partition: format!("{}_log-journal", config.partition_prefix),
let qmdb_config = VariableConfig {
log_partition: format!("{}_log-journal", config.partition_prefix),
log_items_per_blob: NonZeroU64::new(1000).unwrap(),
log_write_buffer: write_buffer_size,
log_compression: None,
log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_SIZE), ())),
mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix),
mmr_items_per_blob: NonZeroU64::new(1000).unwrap(),
mmr_write_buffer: write_buffer_size,
Expand All @@ -189,12 +235,16 @@ where
page_cache: CacheRef::from_pooler(&context, page_size, capacity),
};

let db = Db::init(context, qmdb_config)
.await
.map_err(map_qmdb_error)?;
let db = Arc::new(RwLock::new(
Db::init(context, qmdb_config)
.await
.map_err(map_qmdb_error)?,
));
let prune_tx = Self::spawn_prune_worker(db.clone());

Ok(Self {
db: Arc::new(RwLock::new(db)),
db,
prune_tx,
cache: Arc::new(ShardedDbCache::with_defaults()),
metrics,
})
Expand Down Expand Up @@ -304,18 +354,17 @@ where
for op in operations {
match op {
crate::types::Operation::Set { key, value } => {
if value.len() > MAX_VALUE_DATA_SIZE {
if value.len() > MAX_VALUE_SIZE {
return Err(StorageError::ValueTooLarge {
size: value.len(),
max: MAX_VALUE_DATA_SIZE,
max: MAX_VALUE_SIZE,
});
}

sets += 1;
let storage_key = create_storage_key(&key)?;
let storage_value = create_storage_value_chunk(&value)?;
keys_to_invalidate.push(key);
updates.push((storage_key, Some(storage_value)));
updates.push((storage_key, Some(value)));
}
crate::types::Operation::Remove { key } => {
deletes += 1;
Expand All @@ -339,8 +388,10 @@ where
pub async fn commit_state(&self) -> Result<crate::types::CommitHash, StorageError> {
let start = Instant::now();
let db = self.db.read().await;
db.commit().await.map_err(map_qmdb_error)?;
db.sync().await.map_err(map_qmdb_error)?;
let hash = root_to_commit_hash(db.root())?;
drop(db);
let _ = self.prune_tx.send(());
self.metrics.record_commit(start.elapsed().as_secs_f64());
Ok(hash)
}
Expand Down Expand Up @@ -429,17 +480,10 @@ where
}

fn decode_storage_value(
result: Result<Option<StorageValueChunk>, impl std::fmt::Display>,
result: Result<Option<Vec<u8>>, impl std::fmt::Display>,
) -> Result<Option<Vec<u8>>, ErrorCode> {
match result {
Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) {
Some(data) if data.is_empty() => Ok(None),
Some(data) => Ok(Some(data)),
None => {
tracing::warn!("Invalid value chunk format, treating as absent");
Ok(None)
}
},
Ok(Some(value)) => Ok(Some(value)),
Ok(None) => Ok(None),
Err(e) => {
let err_str = e.to_string();
Expand Down Expand Up @@ -740,8 +784,8 @@ mod tests {
runner.start(|context| async move {
let storage = QmdbStorage::new(context, config).await.unwrap();

// Value exactly at MAX_VALUE_DATA_SIZE limit should work
let max_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE];
// Value exactly at MAX_VALUE_SIZE limit should work
let max_value = vec![b'v'; crate::types::MAX_VALUE_SIZE];
storage
.apply_batch(vec![crate::types::Operation::Set {
key: b"key".to_vec(),
Expand All @@ -753,8 +797,8 @@ mod tests {
let retrieved = storage.get(b"key").unwrap();
assert_eq!(retrieved, Some(max_value));

// Value exceeding MAX_VALUE_DATA_SIZE should fail
let oversized_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE + 1];
// Value exceeding MAX_VALUE_SIZE should fail
let oversized_value = vec![b'v'; crate::types::MAX_VALUE_SIZE + 1];
let result = storage
.apply_batch(vec![crate::types::Operation::Set {
key: b"key2".to_vec(),
Expand All @@ -765,8 +809,8 @@ mod tests {
assert!(result.is_err());
match result.unwrap_err() {
StorageError::ValueTooLarge { size, max } => {
assert_eq!(size, crate::types::MAX_VALUE_DATA_SIZE + 1);
assert_eq!(max, crate::types::MAX_VALUE_DATA_SIZE);
assert_eq!(size, crate::types::MAX_VALUE_SIZE + 1);
assert_eq!(max, crate::types::MAX_VALUE_SIZE);
}
e => panic!("Expected ValueTooLarge, got {:?}", e),
}
Expand Down Expand Up @@ -1043,8 +1087,6 @@ mod tests {
runner.start(|context| async move {
let storage = QmdbStorage::new(context, config).await.unwrap();

// Empty values are treated as removed (since we use all-zeros to signal removal)
// This is a known limitation
storage
.apply_batch(vec![crate::types::Operation::Set {
key: b"empty".to_vec(),
Expand All @@ -1053,9 +1095,8 @@ mod tests {
.await
.unwrap();

// Empty value should be treated as None (due to removal semantics)
let result = storage.get(b"empty").unwrap();
assert_eq!(result, None);
assert_eq!(result, Some(Vec::new()));
})
}

Expand Down Expand Up @@ -1325,6 +1366,125 @@ mod tests {
})
}

#[test]
fn test_commit_prunes_inactive_history() {
let temp_dir = TempDir::new().unwrap();
let config = crate::types::StorageConfig {
path: temp_dir.path().to_path_buf(),
..Default::default()
};

let runtime_config = TokioConfig::default()
.with_storage_directory(temp_dir.path())
.with_worker_threads(2);

let runner = Runner::new(runtime_config);

runner.start(|context| async move {
use commonware_storage::qmdb::store::LogStore;

let storage = QmdbStorage::new(context, config).await.unwrap();
const KEYS: usize = 1_100;

let initial_ops = (0..KEYS)
.map(|i| crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v1").into_bytes(),
})
.collect();
storage.apply_batch(initial_ops).await.unwrap();
storage.commit_state().await.unwrap();

let start_before = {
let db = storage.db.read().await;
*db.bounds().await.start
};

let second_ops = (0..KEYS)
.map(|i| {
if i % 5 == 0 {
crate::types::Operation::Remove {
key: format!("key-{i}").into_bytes(),
}
} else {
crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v2").into_bytes(),
}
}
})
.collect();
storage.apply_batch(second_ops).await.unwrap();
storage.commit_state().await.unwrap();

let mut start_after = start_before;
for _ in 0..50 {
start_after = {
let db = storage.db.read().await;
*db.bounds().await.start
};
if start_after > start_before {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}

assert!(
start_after > start_before,
"prune boundary did not advance: before={start_before}, after={start_after}"
);
assert_eq!(storage.get(b"key-1").unwrap(), Some(b"value-1-v2".to_vec()));
assert_eq!(storage.get(b"key-0").unwrap(), None);
})
}

#[test]
fn test_variable_codec_survives_reopen() {
let temp_dir = TempDir::new().unwrap();
let config = crate::types::StorageConfig {
path: temp_dir.path().to_path_buf(),
..Default::default()
};

let runtime_config = TokioConfig::default()
.with_storage_directory(temp_dir.path())
.with_worker_threads(2);

let runner = Runner::new(runtime_config);

runner.start(|context| async move {
let config2 = config.clone();
let ctx2 = context.clone();

let storage = QmdbStorage::new(context, config).await.unwrap();
storage
.apply_batch(vec![
crate::types::Operation::Set {
key: b"k1".to_vec(),
value: b"hello".to_vec(),
},
crate::types::Operation::Set {
key: b"k2".to_vec(),
value: vec![0u8; 100],
},
crate::types::Operation::Set {
key: b"k3".to_vec(),
value: Vec::new(),
},
])
.await
.unwrap();
storage.commit_state().await.unwrap();
drop(storage);

// Reopen from the same directory and verify values survived.
let reopened = QmdbStorage::new(ctx2, config2).await.unwrap();
assert_eq!(reopened.get(b"k1").unwrap(), Some(b"hello".to_vec()));
assert_eq!(reopened.get(b"k2").unwrap(), Some(vec![0u8; 100]));
assert_eq!(reopened.get(b"k3").unwrap(), Some(Vec::new()));
})
}

#[test]
fn test_preview_batch_root_matches_eventual_commit_hash() {
let temp_dir = TempDir::new().unwrap();
Expand Down
Loading
Loading