diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index b10270851cc06..19aaa0371ada3 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -302,6 +302,32 @@ impl TrackedConsumer { } } +/// A point-in-time snapshot of a tracked memory consumer's state. +/// +/// Returned by [`TrackConsumersPool::metrics()`]. +#[derive(Debug, Clone)] +pub struct MemoryConsumerMetrics { + /// The name of the memory consumer + pub name: String, + /// Whether this consumer can spill to disk + pub can_spill: bool, + /// The number of bytes currently reserved by this consumer + pub reserved: usize, + /// The peak number of bytes reserved by this consumer + pub peak: usize, +} + +impl From<&TrackedConsumer> for MemoryConsumerMetrics { + fn from(tracked: &TrackedConsumer) -> Self { + Self { + name: tracked.name.clone(), + can_spill: tracked.can_spill, + reserved: tracked.reserved(), + peak: tracked.peak(), + } + } +} + /// A [`MemoryPool`] that tracks the consumers that have /// reserved memory within the inner memory pool. /// @@ -381,6 +407,15 @@ impl TrackConsumersPool { } } + /// Returns a snapshot of all currently tracked consumers. + pub fn metrics(&self) -> Vec { + self.tracked_consumers + .lock() + .values() + .map(Into::into) + .collect() + } + /// Returns a formatted string with the top memory consumers. pub fn report_top(&self, top: usize) -> String { let mut consumers = self @@ -778,6 +813,54 @@ mod tests { test_per_pool_type(tracked_greedy_pool); } + #[test] + fn test_track_consumers_pool_metrics() { + let track_consumers_pool = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(1000), + NonZeroUsize::new(3).unwrap(), + )); + let memory_pool: Arc = Arc::clone(&track_consumers_pool) as _; + + // Empty pool has no metrics + assert!(track_consumers_pool.metrics().is_empty()); + + // Register consumers with different spill settings + let r1 = MemoryConsumer::new("spilling") + .with_can_spill(true) + .register(&memory_pool); + let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool); + + // Grow r1 in two steps to verify peak tracking + r1.grow(100); + r1.grow(50); + r1.shrink(50); // reserved=100, peak=150 + + r2.grow(200); // reserved=200, peak=200 + + let mut metrics = track_consumers_pool.metrics(); + metrics.sort_by_key(|m| m.name.clone()); + + assert_eq!(metrics.len(), 2); + + let m_non = &metrics[0]; + assert_eq!(m_non.name, "non-spilling"); + assert!(!m_non.can_spill); + assert_eq!(m_non.reserved, 200); + assert_eq!(m_non.peak, 200); + + let m_spill = &metrics[1]; + assert_eq!(m_spill.name, "spilling"); + assert!(m_spill.can_spill); + assert_eq!(m_spill.reserved, 100); + assert_eq!(m_spill.peak, 150); + + // Unregistered consumers are removed from metrics + drop(r2); + let metrics = track_consumers_pool.metrics(); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].name, "spilling"); + } + #[test] fn test_tracked_consumers_pool_use_beyond_errors() { let setting = make_settings();