diff --git a/engine/packages/depot/src/history_snapshot.rs b/engine/packages/depot/src/history_snapshot.rs new file mode 100644 index 0000000000..15e0b725ce --- /dev/null +++ b/engine/packages/depot/src/history_snapshot.rs @@ -0,0 +1,238 @@ +//! Structured, read-only snapshot of a database branch's stored history rows. +//! +//! Tests and tooling use this to assert exactly which DELTA, COMMITS, VTX, +//! PIDX, SHARD, PITR_INTERVAL, and DB_PIN rows survive compaction and reclaim. +//! Every set is decoded from raw UDB rows inside one transaction so partial +//! deletes cannot hide behind in-process caches or derived state. + +use std::collections::{BTreeMap, BTreeSet}; + +use anyhow::{Context, Result, ensure}; +use futures_util::TryStreamExt; +use universaldb::{RangeOption, options::StreamingMode, utils::IsolationLevel::Snapshot}; + +use crate::conveyer::{ + keys, + types::{ + CommitRow, CompactionRoot, DBHead, DatabaseBranchId, DbHistoryPin, PitrIntervalCoverage, + decode_commit_row, decode_compaction_root, decode_db_head, decode_db_history_pin, + decode_pitr_interval_coverage, + }, +}; + +/// Exact stored-history state for one database branch. +#[derive(Debug, Clone, Default)] +pub struct BranchHistorySnapshot { + pub head: Option, + pub head_at_fork: Option, + pub compaction_root: Option, + /// Distinct txids that still have at least one DELTA chunk row. + pub delta_txids: BTreeSet, + /// Total DELTA chunk rows, including multi-chunk blobs. + pub delta_chunk_rows: usize, + pub commits: BTreeMap, + /// VTX rows as versionstamp -> txid. + pub vtx: BTreeMap<[u8; 16], u64>, + /// PIDX rows as page number -> owning delta txid. + pub pidx: BTreeMap, + /// Reader-visible shard versions as shard id -> ascending as_of txids. + pub shard_versions: BTreeMap>, + /// PITR interval rows as bucket start ms -> coverage. + pub pitr_intervals: BTreeMap, + pub pins: Vec, +} + +impl BranchHistorySnapshot { + pub fn hot_watermark_txid(&self) -> u64 { + self.compaction_root + .as_ref() + .map(|root| root.hot_watermark_txid) + .unwrap_or(0) + } + + pub fn commit_txids(&self) -> BTreeSet { + self.commits.keys().copied().collect() + } + + pub fn vtx_txids(&self) -> BTreeSet { + self.vtx.values().copied().collect() + } + + pub fn pidx_txids(&self) -> BTreeSet { + self.pidx.values().copied().collect() + } + + pub fn pitr_interval_txids(&self) -> BTreeSet { + self.pitr_intervals + .values() + .map(|coverage| coverage.txid) + .collect() + } + + pub fn pin_txids(&self) -> BTreeSet { + self.pins.iter().map(|pin| pin.at_txid).collect() + } +} + +pub async fn branch_history_snapshot( + udb: &universaldb::Database, + branch_id: DatabaseBranchId, +) -> Result { + udb.txn("depot_history_snapshot", move |tx| async move { + branch_history_snapshot_tx(&tx, branch_id).await + }) + .await +} + +pub async fn branch_history_snapshot_tx( + tx: &universaldb::Transaction, + branch_id: DatabaseBranchId, +) -> Result { + let mut snapshot = BranchHistorySnapshot::default(); + + snapshot.head = read_value(tx, &keys::branch_meta_head_key(branch_id)) + .await? + .as_deref() + .map(decode_db_head) + .transpose() + .context("decode sqlite head for history snapshot")?; + snapshot.head_at_fork = read_value(tx, &keys::branch_meta_head_at_fork_key(branch_id)) + .await? + .as_deref() + .map(decode_db_head) + .transpose() + .context("decode sqlite fork head for history snapshot")?; + snapshot.compaction_root = read_value(tx, &keys::branch_compaction_root_key(branch_id)) + .await? + .as_deref() + .map(decode_compaction_root) + .transpose() + .context("decode sqlite compaction root for history snapshot")?; + + for (key, _) in scan_prefix(tx, &keys::branch_delta_prefix(branch_id)).await? { + let txid = keys::decode_branch_delta_chunk_txid(branch_id, &key)?; + snapshot.delta_txids.insert(txid); + snapshot.delta_chunk_rows += 1; + } + + for (key, value) in scan_prefix(tx, &keys::branch_commit_prefix(branch_id)).await? { + let txid = decode_be_u64_suffix(&keys::branch_commit_prefix(branch_id), &key) + .context("decode sqlite commit txid for history snapshot")?; + let commit = + decode_commit_row(&value).context("decode sqlite commit row for history snapshot")?; + snapshot.commits.insert(txid, commit); + } + + for (key, value) in scan_prefix(tx, &keys::branch_vtx_prefix(branch_id)).await? { + let prefix = keys::branch_vtx_prefix(branch_id); + let suffix = key + .strip_prefix(prefix.as_slice()) + .context("branch VTX key did not start with expected prefix")?; + let versionstamp: [u8; 16] = suffix + .try_into() + .map_err(|_| anyhow::anyhow!("branch VTX suffix should be 16 bytes"))?; + let txid = decode_be_u64_value(&value) + .context("decode sqlite VTX txid value for history snapshot")?; + snapshot.vtx.insert(versionstamp, txid); + } + + for (key, value) in scan_prefix(tx, &keys::branch_pidx_prefix(branch_id)).await? { + let prefix = keys::branch_pidx_prefix(branch_id); + let suffix = key + .strip_prefix(prefix.as_slice()) + .context("branch PIDX key did not start with expected prefix")?; + let pgno_bytes: [u8; 4] = suffix + .try_into() + .map_err(|_| anyhow::anyhow!("branch PIDX suffix should be 4 bytes"))?; + let txid = decode_be_u64_value(&value) + .context("decode sqlite PIDX txid value for history snapshot")?; + snapshot.pidx.insert(u32::from_be_bytes(pgno_bytes), txid); + } + + for (key, _) in scan_prefix(tx, &keys::branch_shard_prefix(branch_id)).await? { + let prefix = keys::branch_shard_prefix(branch_id); + let suffix = key + .strip_prefix(prefix.as_slice()) + .context("branch SHARD key did not start with expected prefix")?; + ensure!( + suffix.len() == 4 + 1 + 8 && suffix[4] == b'/', + "branch SHARD key suffix had unexpected layout" + ); + let shard_id = u32::from_be_bytes( + suffix[..4] + .try_into() + .context("branch SHARD id should decode as u32")?, + ); + let as_of_txid = u64::from_be_bytes( + suffix[5..] + .try_into() + .context("branch SHARD as_of txid should decode as u64")?, + ); + snapshot + .shard_versions + .entry(shard_id) + .or_default() + .push(as_of_txid); + } + + for (key, value) in scan_prefix(tx, &keys::branch_pitr_interval_prefix(branch_id)).await? { + let bucket_start_ms = keys::decode_branch_pitr_interval_bucket(branch_id, &key)?; + let coverage = decode_pitr_interval_coverage(&value) + .context("decode sqlite PITR interval coverage for history snapshot")?; + snapshot.pitr_intervals.insert(bucket_start_ms, coverage); + } + + for (_, value) in scan_prefix(tx, &keys::db_pin_prefix(branch_id)).await? { + let pin = + decode_db_history_pin(&value).context("decode sqlite db pin for history snapshot")?; + snapshot.pins.push(pin); + } + + Ok(snapshot) +} + +async fn read_value(tx: &universaldb::Transaction, key: &[u8]) -> Result>> { + Ok(tx.informal().get(key, Snapshot).await?.map(Vec::::from)) +} + +async fn scan_prefix( + tx: &universaldb::Transaction, + prefix: &[u8], +) -> Result, Vec)>> { + let prefix_subspace = + universaldb::Subspace::from(universaldb::tuple::Subspace::from_bytes(prefix.to_vec())); + let informal = tx.informal(); + let mut stream = informal.get_ranges_keyvalues( + RangeOption { + mode: StreamingMode::WantAll, + ..RangeOption::from(&prefix_subspace) + }, + Snapshot, + ); + let mut rows = Vec::new(); + + while let Some(entry) = stream.try_next().await? { + rows.push((entry.key().to_vec(), entry.value().to_vec())); + } + + Ok(rows) +} + +fn decode_be_u64_suffix(prefix: &[u8], key: &[u8]) -> Result { + let suffix = key + .strip_prefix(prefix) + .context("key did not start with expected prefix")?; + let bytes: [u8; 8] = suffix + .try_into() + .map_err(|_| anyhow::anyhow!("key suffix should be 8 bytes"))?; + + Ok(u64::from_be_bytes(bytes)) +} + +fn decode_be_u64_value(value: &[u8]) -> Result { + let bytes: [u8; 8] = value + .try_into() + .map_err(|_| anyhow::anyhow!("value should be 8 bytes"))?; + + Ok(u64::from_be_bytes(bytes)) +} diff --git a/engine/packages/depot/src/lib.rs b/engine/packages/depot/src/lib.rs index f307e0408a..5c60b9b248 100644 --- a/engine/packages/depot/src/lib.rs +++ b/engine/packages/depot/src/lib.rs @@ -7,6 +7,7 @@ pub mod doctor; #[cfg(feature = "test-faults")] pub mod fault; pub mod gc; +pub mod history_snapshot; pub mod inspect; pub mod metrics; #[cfg(debug_assertions)] diff --git a/engine/packages/depot/tests/common/mod.rs b/engine/packages/depot/tests/common/mod.rs index 2580fe22ec..2be0bd5ef1 100644 --- a/engine/packages/depot/tests/common/mod.rs +++ b/engine/packages/depot/tests/common/mod.rs @@ -100,6 +100,111 @@ where Ok(()) } +pub use depot::history_snapshot::{BranchHistorySnapshot, branch_history_snapshot}; + +pub async fn database_branch_id( + udb: &universaldb::Database, + bucket_id: Id, + database_id: &str, +) -> Result { + let database_id = database_id.to_string(); + udb.txn("test_depotcommon_branch_id", move |tx| { + let database_id = database_id.clone(); + async move { + depot::conveyer::branch::resolve_database_branch( + &tx, + depot::types::BucketId::from_gas_id(bucket_id), + &database_id, + universaldb::utils::IsolationLevel::Serializable, + ) + .await? + .context("database branch should exist") + } + }) + .await +} + +pub async fn history( + udb: &universaldb::Database, + branch_id: depot::types::DatabaseBranchId, +) -> Result { + branch_history_snapshot(udb, branch_id).await +} + +/// Asserts the exact set of txids that still own DELTA chunk rows. Always +/// exact-set equality so partial deletes and over-deletes both fail loudly. +pub fn assert_delta_txids( + snapshot: &BranchHistorySnapshot, + expected: impl IntoIterator, + context: &str, +) { + let expected = expected + .into_iter() + .collect::>(); + assert_eq!( + snapshot.delta_txids, expected, + "[{context}] surviving DELTA txids did not match" + ); +} + +pub fn assert_commit_txids( + snapshot: &BranchHistorySnapshot, + expected: impl IntoIterator, + context: &str, +) { + let expected = expected + .into_iter() + .collect::>(); + assert_eq!( + snapshot.commit_txids(), + expected, + "[{context}] surviving COMMITS txids did not match" + ); +} + +pub fn assert_vtx_txids( + snapshot: &BranchHistorySnapshot, + expected: impl IntoIterator, + context: &str, +) { + let expected = expected + .into_iter() + .collect::>(); + assert_eq!( + snapshot.vtx_txids(), + expected, + "[{context}] surviving VTX txids did not match" + ); +} + +pub fn assert_pidx( + snapshot: &BranchHistorySnapshot, + expected: impl IntoIterator, + context: &str, +) { + let expected = expected + .into_iter() + .collect::>(); + assert_eq!( + snapshot.pidx, expected, + "[{context}] surviving PIDX rows did not match" + ); +} + +pub fn assert_shard_versions( + snapshot: &BranchHistorySnapshot, + expected: impl IntoIterator)>, + context: &str, +) { + let expected = expected + .into_iter() + .collect::>(); + assert_eq!( + snapshot.shard_versions, expected, + "[{context}] surviving SHARD versions did not match" + ); +} + pub async fn read_value(db: &universaldb::Database, key: Vec) -> Result>> { db.txn("test_depotcommon_mod", move |tx| { let key = key.clone(); diff --git a/engine/packages/depot/tests/history_snapshot.rs b/engine/packages/depot/tests/history_snapshot.rs new file mode 100644 index 0000000000..6f5068b746 --- /dev/null +++ b/engine/packages/depot/tests/history_snapshot.rs @@ -0,0 +1,57 @@ +mod common; + +use anyhow::Result; +use depot::types::DirtyPage; + +fn page(pgno: u32, fill: u8) -> DirtyPage { + DirtyPage { + pgno, + bytes: vec![fill; depot::keys::PAGE_SIZE as usize], + } +} + +#[tokio::test] +async fn history_snapshot_reports_exact_row_sets() -> Result<()> { + common::test_matrix("depot-history-snapshot", |_tier, ctx| { + Box::pin(async move { + // Three commits: txid 1 writes pages 1-2, txid 2 overwrites page 2 and + // writes page 3, txid 3 writes a page in the second shard. + ctx.db + .commit(vec![page(1, 0x11), page(2, 0x12)], 3, 1_000) + .await?; + ctx.db + .commit(vec![page(2, 0x22), page(3, 0x23)], 3, 2_000) + .await?; + let far_pgno = depot::keys::SHARD_SIZE + 1; + ctx.db + .commit(vec![page(far_pgno, 0x33)], far_pgno, 3_000) + .await?; + + let branch_id = + common::database_branch_id(&ctx.udb, ctx.bucket_id, &ctx.database_id).await?; + + let snapshot = common::history(&ctx.udb, branch_id).await?; + + common::assert_delta_txids(&snapshot, [1, 2, 3], "after three commits"); + common::assert_commit_txids(&snapshot, [1, 2, 3], "after three commits"); + common::assert_vtx_txids(&snapshot, [1, 2, 3], "after three commits"); + // PIDX maps each page to the latest txid that wrote it. + common::assert_pidx( + &snapshot, + [(1, 1), (2, 2), (3, 2), (far_pgno, 3)], + "after three commits", + ); + // No compaction has run: no shards, no PITR intervals, no pins. + common::assert_shard_versions(&snapshot, [], "after three commits"); + assert!(snapshot.pitr_intervals.is_empty()); + assert!(snapshot.pins.is_empty()); + assert_eq!(snapshot.hot_watermark_txid(), 0); + assert_eq!(snapshot.head.as_ref().map(|head| head.head_txid), Some(3)); + // Two single-chunk deltas plus one for the far page commit. + assert_eq!(snapshot.delta_chunk_rows, 3); + + Ok(()) + }) + }) + .await +}