diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d5fbfc50a21..28d8833192837 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,7 +22,7 @@ use arrow_ipc::CompressionType; #[cfg(feature = "parquet_encryption")] use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; -use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; +use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType}; use crate::parquet_config::DFParquetWriterVersion; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; @@ -1211,7 +1211,13 @@ config_namespace! { /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev" /// "summary" shows common metrics for high-level insights. /// "dev" provides deep operator-level introspection for developers. - pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev + pub analyze_level: MetricType, default = MetricType::Dev + + /// Which metric categories to include in "EXPLAIN ANALYZE" output. + /// Comma-separated list of: "rows", "bytes", "timing", "uncategorized". + /// Use "none" to show plan structure only, or "all" (default) to show everything. + /// Metrics without a declared category are treated as "uncategorized". + pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All } } diff --git a/datafusion/common/src/format.rs b/datafusion/common/src/format.rs index a505bd0e1c74e..a6bd42be691a9 100644 --- a/datafusion/common/src/format.rs +++ b/datafusion/common/src/format.rs @@ -206,23 +206,50 @@ impl ConfigField for ExplainFormat { } } -/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics +/// Categorizes metrics so the display layer can choose the desired verbosity. +/// +/// The `datafusion.explain.analyze_level` configuration controls which +/// type is shown: +/// - `"dev"` (the default): all metrics are shown. +/// - `"summary"`: only metrics tagged as `Summary` are shown. +/// +/// This is orthogonal to [`MetricCategory`], which filters by *what kind* +/// of value a metric represents (rows / bytes / timing). +/// +/// # Difference from `EXPLAIN ANALYZE VERBOSE` +/// +/// The `VERBOSE` keyword controls whether per-partition metrics are shown +/// (when specified) or aggregated metrics are displayed (when omitted). +/// In contrast, `MetricType` determines which *levels* of metrics are +/// displayed. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum ExplainAnalyzeLevel { - /// Show a compact view containing high-level metrics +pub enum MetricType { + /// Common metrics for high-level insights (answering which operator is slow) Summary, - /// Show a developer-focused view with per-operator details + /// For deep operator-level introspection for developers Dev, - // When adding new enum, update the error message in `from_str()` accordingly. } -impl FromStr for ExplainAnalyzeLevel { +impl MetricType { + /// Returns the set of metric types that should be shown for this level. + /// + /// `Dev` is a superset of `Summary`: when the user selects + /// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown. + pub fn included_types(self) -> Vec { + match self { + MetricType::Summary => vec![MetricType::Summary], + MetricType::Dev => vec![MetricType::Summary, MetricType::Dev], + } + } +} + +impl FromStr for MetricType { type Err = DataFusionError; - fn from_str(level: &str) -> Result { - match level.to_lowercase().as_str() { - "summary" => Ok(ExplainAnalyzeLevel::Summary), - "dev" => Ok(ExplainAnalyzeLevel::Dev), + fn from_str(s: &str) -> Result { + match s.trim().to_lowercase().as_str() { + "summary" => Ok(Self::Summary), + "dev" => Ok(Self::Dev), other => Err(DataFusionError::Configuration(format!( "Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'" ))), @@ -230,23 +257,176 @@ impl FromStr for ExplainAnalyzeLevel { } } -impl Display for ExplainAnalyzeLevel { +impl Display for MetricType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match self { - ExplainAnalyzeLevel::Summary => "summary", - ExplainAnalyzeLevel::Dev => "dev", - }; - write!(f, "{s}") + match self { + Self::Summary => write!(f, "summary"), + Self::Dev => write!(f, "dev"), + } + } +} + +impl ConfigField for MetricType { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = MetricType::from_str(value)?; + Ok(()) + } +} + +/// Classifies a metric by what it measures. +/// +/// This is orthogonal to [`MetricType`] (Summary / Dev), which controls +/// *verbosity*. `MetricCategory` controls *what kind of value* is shown, +/// so that `EXPLAIN ANALYZE` output can be narrowed to only the categories +/// that are useful in a given context. +/// +/// In particular this is useful for testing since metrics differ in their stability across runs: +/// - [`Rows`](Self::Rows) and [`Bytes`](Self::Bytes) depend only on the plan +/// and the data, so they are mostly deterministic across runs (given the same +/// input). Variations can existing e.g. because of non-deterministic ordering +/// of evaluation between threads. +/// Running with a single target partition often makes these metrics stable enough to assert on in tests. +/// - [`Timing`](Self::Timing) depends on hardware, system load, scheduling, +/// etc., so it varies from run to run even on the same machine. +/// +/// [`MetricCategory`] is especially useful in sqllogictest (`.slt`) files: +/// setting `datafusion.explain.analyze_categories = 'rows'` lets a test +/// assert on row-count metrics without sprinkling `` over every +/// timing value. +/// +/// Metrics that do not declare a category (the default for custom +/// `Count` / `Gauge` metrics) are treated as +/// [`Uncategorized`](Self::Uncategorized) for filtering purposes. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetricCategory { + /// Row counts and related dimensionless counters: `output_rows`, + /// `spilled_rows`, `output_batches`, pruning metrics, ratios, etc. + /// + /// Mostly deterministic given the same plan and data. + Rows, + /// Byte measurements: `output_bytes`, `spilled_bytes`, + /// `current_memory_usage`, `bytes_scanned`, etc. + /// + /// Mostly deterministic given the same plan and data. + Bytes, + /// Wall-clock durations and timestamps: `elapsed_compute`, + /// operator-defined `Time` metrics, `start_timestamp` / + /// `end_timestamp`, etc. + /// + /// **Non-deterministic** — varies across runs even on the same hardware. + Timing, + /// Catch-all for metrics that do not fit into [`Rows`](Self::Rows), + /// [`Bytes`](Self::Bytes), or [`Timing`](Self::Timing). + /// + /// Custom `Count` / `Gauge` metrics that are not explicitly assigned + /// a category are treated as `Uncategorized` for filtering purposes. + /// + /// This variant lets users explicitly include or exclude these + /// metrics, e.g.: + /// ```sql + /// SET datafusion.explain.analyze_categories = 'rows, bytes, uncategorized'; + /// ``` + Uncategorized, +} + +impl FromStr for MetricCategory { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.trim().to_lowercase().as_str() { + "rows" => Ok(Self::Rows), + "bytes" => Ok(Self::Bytes), + "timing" => Ok(Self::Timing), + "uncategorized" => Ok(Self::Uncategorized), + other => Err(DataFusionError::Configuration(format!( + "Invalid metric category '{other}'. \ + Expected 'rows', 'bytes', 'timing', or 'uncategorized'." + ))), + } + } +} + +impl Display for MetricCategory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Rows => write!(f, "rows"), + Self::Bytes => write!(f, "bytes"), + Self::Timing => write!(f, "timing"), + Self::Uncategorized => write!(f, "uncategorized"), + } + } +} + +/// Controls which [`MetricCategory`] values are shown in `EXPLAIN ANALYZE`. +/// +/// Set via `SET datafusion.explain.analyze_categories = '...'`. +/// +/// See [`MetricCategory`] for the determinism properties that motivate +/// this filter. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub enum ExplainAnalyzeCategories { + /// Show all metrics regardless of category (the default). + #[default] + All, + /// Show only metrics whose category is in the list. + /// Metrics with no declared category are treated as + /// [`Uncategorized`](MetricCategory::Uncategorized) for filtering. + /// + /// An **empty** vec means "plan only" — suppress all metrics. + Only(Vec), +} + +impl FromStr for ExplainAnalyzeCategories { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.trim().to_lowercase(); + match s.as_str() { + "all" => Ok(Self::All), + "none" => Ok(Self::Only(vec![])), + other => { + let mut cats = Vec::new(); + for part in other.split(',') { + cats.push(part.trim().parse::()?); + } + cats.dedup(); + Ok(Self::Only(cats)) + } + } + } +} + +impl Display for ExplainAnalyzeCategories { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::All => write!(f, "all"), + Self::Only(cats) if cats.is_empty() => write!(f, "none"), + Self::Only(cats) => { + let mut first = true; + for cat in cats { + if !first { + write!(f, ",")?; + } + first = false; + write!(f, "{cat}")?; + } + Ok(()) + } + } } } -impl ConfigField for ExplainAnalyzeLevel { +impl ConfigField for ExplainAnalyzeCategories { fn visit(&self, v: &mut V, key: &str, description: &'static str) { v.some(key, self, description) } fn set(&mut self, _: &str, value: &str) -> Result<()> { - *self = ExplainAnalyzeLevel::from_str(value)?; + *self = ExplainAnalyzeCategories::from_str(value)?; Ok(()) } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4c6d915d5bcaa..0d1b0be906a82 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -234,7 +234,8 @@ mod tests { let analyze_exec = Arc::new(AnalyzeExec::new( false, false, - vec![MetricType::SUMMARY, MetricType::DEV], + vec![MetricType::Summary, MetricType::Dev], + None, // use a new ParquetSource to avoid sharing execution metrics self.build_parquet_exec( file_group.clone(), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b4fb44f670e8d..e25969903521c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -64,7 +64,7 @@ use arrow_schema::Field; use datafusion_catalog::ScanArgs; use datafusion_common::Column; use datafusion_common::display::ToStringifiedPlan; -use datafusion_common::format::ExplainAnalyzeLevel; +use datafusion_common::format::ExplainAnalyzeCategories; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor, }; @@ -99,7 +99,6 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::InvariantLevel; use datafusion_physical_plan::joins::PiecewiseMergeJoinExec; -use datafusion_physical_plan::metrics::MetricType; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::recursive_query::RecursiveQueryExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -2716,14 +2715,21 @@ impl DefaultPhysicalPlanner { let schema = Arc::clone(a.schema.inner()); let show_statistics = session_state.config_options().explain.show_statistics; let analyze_level = session_state.config_options().explain.analyze_level; - let metric_types = match analyze_level { - ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY], - ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV], + let metric_types = analyze_level.included_types(); + let analyze_categories = session_state + .config_options() + .explain + .analyze_categories + .clone(); + let metric_categories = match analyze_categories { + ExplainAnalyzeCategories::All => None, + ExplainAnalyzeCategories::Only(cats) => Some(cats), }; Ok(Arc::new(AnalyzeExec::new( a.verbose, show_statistics, metric_types, + metric_categories, input, schema, ))) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 5f62f7204eff1..570a63c4117f5 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -22,7 +22,7 @@ use rstest::rstest; use datafusion::config::ConfigOptions; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::Timestamp; -use datafusion_common::format::ExplainAnalyzeLevel; +use datafusion_common::format::{ExplainAnalyzeCategories, MetricCategory, MetricType}; use object_store::path::Path; #[tokio::test] @@ -205,7 +205,7 @@ fn nanos_from_timestamp(ts: &Timestamp) -> i64 { async fn collect_plan_with_context( sql_str: &str, ctx: &SessionContext, - level: ExplainAnalyzeLevel, + level: MetricType, ) -> String { { let state = ctx.state_ref(); @@ -219,7 +219,24 @@ async fn collect_plan_with_context( .to_string() } -async fn collect_plan(sql_str: &str, level: ExplainAnalyzeLevel) -> String { +async fn collect_plan_with_categories( + sql_str: &str, + categories: ExplainAnalyzeCategories, +) -> String { + let ctx = SessionContext::new(); + { + let state = ctx.state_ref(); + let mut state = state.write(); + state.config_mut().options_mut().explain.analyze_categories = categories; + } + let dataframe = ctx.sql(sql_str).await.unwrap(); + let batches = dataframe.collect().await.unwrap(); + arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string() +} + +async fn collect_plan(sql_str: &str, level: MetricType) -> String { let ctx = SessionContext::new(); collect_plan_with_context(sql_str, &ctx, level).await } @@ -232,14 +249,14 @@ async fn explain_analyze_level() { ORDER BY v1 DESC"; for (level, needle, should_contain) in [ - (ExplainAnalyzeLevel::Summary, "spill_count", false), - (ExplainAnalyzeLevel::Summary, "output_batches", false), - (ExplainAnalyzeLevel::Summary, "output_rows", true), - (ExplainAnalyzeLevel::Summary, "output_bytes", true), - (ExplainAnalyzeLevel::Dev, "spill_count", true), - (ExplainAnalyzeLevel::Dev, "output_rows", true), - (ExplainAnalyzeLevel::Dev, "output_bytes", true), - (ExplainAnalyzeLevel::Dev, "output_batches", true), + (MetricType::Summary, "spill_count", false), + (MetricType::Summary, "output_batches", false), + (MetricType::Summary, "output_rows", true), + (MetricType::Summary, "output_bytes", true), + (MetricType::Dev, "spill_count", true), + (MetricType::Dev, "output_rows", true), + (MetricType::Dev, "output_bytes", true), + (MetricType::Dev, "output_batches", true), ] { let plan = collect_plan(sql, level).await; assert_eq!( @@ -263,10 +280,10 @@ async fn explain_analyze_level_datasource_parquet() { .expect("register parquet table for explain analyze test"); for (level, needle, should_contain) in [ - (ExplainAnalyzeLevel::Summary, "metadata_load_time", true), - (ExplainAnalyzeLevel::Summary, "page_index_eval_time", false), - (ExplainAnalyzeLevel::Dev, "metadata_load_time", true), - (ExplainAnalyzeLevel::Dev, "page_index_eval_time", true), + (MetricType::Summary, "metadata_load_time", true), + (MetricType::Summary, "page_index_eval_time", false), + (MetricType::Dev, "metadata_load_time", true), + (MetricType::Dev, "page_index_eval_time", true), ] { let plan = collect_plan_with_context(&sql, &ctx, level).await; @@ -299,8 +316,7 @@ async fn explain_analyze_parquet_pruning_metrics() { "explain analyze select * from {table_name} where l_orderkey = {l_orderkey};" ); - let plan = - collect_plan_with_context(&sql, &ctx, ExplainAnalyzeLevel::Summary).await; + let plan = collect_plan_with_context(&sql, &ctx, MetricType::Summary).await; let expected_metrics = format!("files_ranges_pruned_statistics={expected_pruning_metrics}"); @@ -1149,8 +1165,8 @@ async fn explain_analyze_hash_join() { ON t1.a=t2.b"; for (level, needle, should_contain) in [ - (ExplainAnalyzeLevel::Summary, "probe_hit_rate", true), - (ExplainAnalyzeLevel::Summary, "avg_fanout", true), + (MetricType::Summary, "probe_hit_rate", true), + (MetricType::Summary, "avg_fanout", true), ] { let plan = collect_plan(sql, level).await; assert_eq!( @@ -1160,3 +1176,93 @@ async fn explain_analyze_hash_join() { ); } } + +#[tokio::test] +async fn explain_analyze_categories() { + let sql = "EXPLAIN ANALYZE \ + SELECT * \ + FROM generate_series(10) as t1(v1) \ + ORDER BY v1 DESC"; + + for (categories, needle, should_contain) in [ + // "rows" category: output_rows yes, elapsed_compute no, output_bytes no + ( + ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]), + "output_rows", + true, + ), + ( + ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]), + "elapsed_compute", + false, + ), + ( + ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]), + "output_bytes", + false, + ), + // "none" — plan only, no metrics at all + (ExplainAnalyzeCategories::Only(vec![]), "output_rows", false), + ( + ExplainAnalyzeCategories::Only(vec![]), + "elapsed_compute", + false, + ), + // "all" — everything shown + (ExplainAnalyzeCategories::All, "output_rows", true), + (ExplainAnalyzeCategories::All, "elapsed_compute", true), + (ExplainAnalyzeCategories::All, "output_bytes", true), + // "rows,bytes" — row + byte metrics, no timing + ( + ExplainAnalyzeCategories::Only(vec![ + MetricCategory::Rows, + MetricCategory::Bytes, + ]), + "output_rows", + true, + ), + ( + ExplainAnalyzeCategories::Only(vec![ + MetricCategory::Rows, + MetricCategory::Bytes, + ]), + "output_bytes", + true, + ), + ( + ExplainAnalyzeCategories::Only(vec![ + MetricCategory::Rows, + MetricCategory::Bytes, + ]), + "elapsed_compute", + false, + ), + // "rows,bytes,uncategorized" — everything except timing + ( + ExplainAnalyzeCategories::Only(vec![ + MetricCategory::Rows, + MetricCategory::Bytes, + MetricCategory::Uncategorized, + ]), + "output_rows", + true, + ), + ( + ExplainAnalyzeCategories::Only(vec![ + MetricCategory::Rows, + MetricCategory::Bytes, + MetricCategory::Uncategorized, + ]), + "elapsed_compute", + false, + ), + ] { + let plan = collect_plan_with_categories(sql, categories.clone()).await; + assert_eq!( + plan.contains(needle), + should_contain, + "plan for categories {categories:?} should{} contain '{needle}':\n{plan}", + if should_contain { "" } else { " NOT" } + ); + } +} diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c391b299d4a29..da35a1a34d441 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -56,7 +56,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::metrics::{ - ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet, }; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; @@ -1339,13 +1339,15 @@ impl FileSink for ParquetSink { mut file_stream_rx: DemuxedStreamReceiver, object_store: Arc, ) -> Result { - let rows_written_counter = - MetricBuilder::new(&self.metrics).global_counter("rows_written"); + let rows_written_counter = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .global_counter("rows_written"); // Note: bytes_written is the sum of compressed row group sizes, which // may differ slightly from the actual on-disk file size (excludes footer, // page indexes, and other Parquet metadata overhead). - let bytes_written_counter = - MetricBuilder::new(&self.metrics).global_counter("bytes_written"); + let bytes_written_counter = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Bytes) + .global_counter("bytes_written"); let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); let write_start = datafusion_common::instant::Instant::now(); diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 2d6fb69270bf3..8eb5912b919da 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -16,8 +16,8 @@ // under the License. use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricType, PruningMetrics, - RatioMergeStrategy, RatioMetrics, Time, + Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType, + PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, }; /// Stores metrics about the parquet execution for a particular parquet file. @@ -105,41 +105,42 @@ impl ParquetFileMetrics { // ----------------------- let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_bloom_filter", partition); let limit_pruned_row_groups = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .pruning_metrics("limit_pruned_row_groups", partition); let row_groups_pruned_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_statistics", partition); let page_index_pages_pruned = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .pruning_metrics("page_index_pages_pruned", partition); let bytes_scanned = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) + .with_category(MetricCategory::Bytes) .counter("bytes_scanned", partition); let metadata_load_time = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .subset_time("metadata_load_time", partition); let files_ranges_pruned_statistics = MetricBuilder::new(metrics) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .pruning_metrics("files_ranges_pruned_statistics", partition); let scan_efficiency_ratio = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .ratio_metrics_with_strategy( "scan_efficiency_ratio", partition, @@ -151,13 +152,16 @@ impl ParquetFileMetrics { // ----------------------- let predicate_evaluation_errors = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) + .with_category(MetricCategory::Rows) .counter("predicate_evaluation_errors", partition); let pushdown_rows_pruned = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) + .with_category(MetricCategory::Rows) .counter("pushdown_rows_pruned", partition); let pushdown_rows_matched = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) + .with_category(MetricCategory::Rows) .counter("pushdown_rows_matched", partition); let row_pushdown_eval_time = MetricBuilder::new(metrics) @@ -180,10 +184,12 @@ impl ParquetFileMetrics { let predicate_cache_inner_records = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) + .with_category(MetricCategory::Rows) .gauge("predicate_cache_inner_records", partition); let predicate_cache_records = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) + .with_category(MetricCategory::Rows) .gauge("predicate_cache_records", partition); Self { diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 2522ae3050000..ae3da60cf0ee2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -48,7 +48,8 @@ use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; use datafusion_physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, + BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, + MetricCategory, PruningMetrics, }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; @@ -217,6 +218,7 @@ impl FileOpener for ParquetOpener { let metrics = self.metrics.clone(); let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) .global_counter("num_predicate_creation_errors"); let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 76279ab9ffa19..8a4ec4a7f1d1a 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -32,7 +32,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::error::Result; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time, }; use arrow::record_batch::RecordBatch; @@ -369,16 +369,21 @@ impl FileStreamMetrics { start: None, }; - let file_open_errors = - MetricBuilder::new(metrics).counter("file_open_errors", partition); + let file_open_errors = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("file_open_errors", partition); - let file_scan_errors = - MetricBuilder::new(metrics).counter("file_scan_errors", partition); + let file_scan_errors = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("file_scan_errors", partition); - let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); + let files_opened = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("files_opened", partition); - let files_processed = - MetricBuilder::new(metrics).counter("files_processed", partition); + let files_processed = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("files_processed", partition); Self { time_opening, diff --git a/datafusion/physical-expr-common/src/metrics/baseline.rs b/datafusion/physical-expr-common/src/metrics/baseline.rs index 0de8e26494931..74e30f0dc104d 100644 --- a/datafusion/physical-expr-common/src/metrics/baseline.rs +++ b/datafusion/physical-expr-common/src/metrics/baseline.rs @@ -78,19 +78,19 @@ impl BaselineMetrics { Self { end_time: MetricBuilder::new(metrics) - .with_type(super::MetricType::SUMMARY) + .with_type(super::MetricType::Summary) .end_timestamp(partition), elapsed_compute: MetricBuilder::new(metrics) - .with_type(super::MetricType::SUMMARY) + .with_type(super::MetricType::Summary) .elapsed_compute(partition), output_rows: MetricBuilder::new(metrics) - .with_type(super::MetricType::SUMMARY) + .with_type(super::MetricType::Summary) .output_rows(partition), output_bytes: MetricBuilder::new(metrics) - .with_type(super::MetricType::SUMMARY) + .with_type(super::MetricType::Summary) .output_bytes(partition), output_batches: MetricBuilder::new(metrics) - .with_type(super::MetricType::DEV) + .with_type(super::MetricType::Dev) .output_batches(partition), } } @@ -215,6 +215,7 @@ impl SplitMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { batches_split: MetricBuilder::new(metrics) + .with_category(super::MetricCategory::Rows) .counter("batches_split", partition), } } diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index 4fa938f69ed36..e9c0b76af2582 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -20,7 +20,7 @@ use std::{borrow::Cow, sync::Arc}; use crate::metrics::{ - MetricType, + MetricCategory, MetricType, value::{PruningMetrics, RatioMergeStrategy, RatioMetrics}, }; @@ -60,6 +60,10 @@ pub struct MetricBuilder<'a> { /// The type controlling the verbosity/category for this builder /// See comments in [`MetricType`] for details metric_type: MetricType, + + /// Semantic category (rows / bytes / timing). + /// `None` means "always include" (the default for custom metrics). + metric_category: Option, } impl<'a> MetricBuilder<'a> { @@ -72,7 +76,8 @@ impl<'a> MetricBuilder<'a> { metrics, partition: None, labels: vec![], - metric_type: MetricType::DEV, + metric_type: MetricType::Dev, + metric_category: None, } } @@ -88,6 +93,15 @@ impl<'a> MetricBuilder<'a> { self } + /// Set the semantic category for the metric being constructed. + /// + /// See [`MetricCategory`] for details on the determinism properties + /// of each category. + pub fn with_category(mut self, category: MetricCategory) -> Self { + self.metric_category = Some(category); + self + } + /// Add a label to the metric being constructed pub fn with_new_label( self, @@ -111,17 +125,21 @@ impl<'a> MetricBuilder<'a> { partition, metrics, metric_type, + metric_category, } = self; - let metric = Arc::new( - Metric::new_with_labels(value, partition, labels).with_type(metric_type), - ); - metrics.register(metric); + let mut metric = + Metric::new_with_labels(value, partition, labels).with_type(metric_type); + if let Some(category) = metric_category { + metric = metric.with_category(category); + } + metrics.register(Arc::new(metric)); } /// Consume self and create a new counter for recording output rows pub fn output_rows(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Rows) + .with_partition(partition) .build(MetricValue::OutputRows(count.clone())); count } @@ -130,7 +148,8 @@ impl<'a> MetricBuilder<'a> { /// triggered by an operator pub fn spill_count(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Rows) + .with_partition(partition) .build(MetricValue::SpillCount(count.clone())); count } @@ -139,7 +158,8 @@ impl<'a> MetricBuilder<'a> { /// triggered by an operator pub fn spilled_bytes(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Bytes) + .with_partition(partition) .build(MetricValue::SpilledBytes(count.clone())); count } @@ -148,7 +168,8 @@ impl<'a> MetricBuilder<'a> { /// triggered by an operator pub fn spilled_rows(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Rows) + .with_partition(partition) .build(MetricValue::SpilledRows(count.clone())); count } @@ -156,7 +177,8 @@ impl<'a> MetricBuilder<'a> { /// Consume self and create a new counter for recording total output bytes pub fn output_bytes(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Bytes) + .with_partition(partition) .build(MetricValue::OutputBytes(count.clone())); count } @@ -164,7 +186,8 @@ impl<'a> MetricBuilder<'a> { /// Consume self and create a new counter for recording total output batches pub fn output_batches(self, partition: usize) -> Count { let count = Count::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Rows) + .with_partition(partition) .build(MetricValue::OutputBatches(count.clone())); count } @@ -172,7 +195,8 @@ impl<'a> MetricBuilder<'a> { /// Consume self and create a new gauge for reporting current memory usage pub fn mem_used(self, partition: usize) -> Gauge { let gauge = Gauge::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Bytes) + .with_partition(partition) .build(MetricValue::CurrentMemoryUsage(gauge.clone())); gauge } @@ -223,7 +247,8 @@ impl<'a> MetricBuilder<'a> { /// CPU time spent by an operator pub fn elapsed_compute(self, partition: usize) -> Time { let time = Time::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Timing) + .with_partition(partition) .build(MetricValue::ElapsedCompute(time.clone())); time } @@ -236,10 +261,12 @@ impl<'a> MetricBuilder<'a> { partition: usize, ) -> Time { let time = Time::new(); - self.with_partition(partition).build(MetricValue::Time { - name: subset_name.into(), - time: time.clone(), - }); + self.with_category(MetricCategory::Timing) + .with_partition(partition) + .build(MetricValue::Time { + name: subset_name.into(), + time: time.clone(), + }); time } @@ -247,7 +274,8 @@ impl<'a> MetricBuilder<'a> { /// starting time of execution for a partition pub fn start_timestamp(self, partition: usize) -> Timestamp { let timestamp = Timestamp::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Timing) + .with_partition(partition) .build(MetricValue::StartTimestamp(timestamp.clone())); timestamp } @@ -256,7 +284,8 @@ impl<'a> MetricBuilder<'a> { /// ending time of execution for a partition pub fn end_timestamp(self, partition: usize) -> Timestamp { let timestamp = Timestamp::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Timing) + .with_partition(partition) .build(MetricValue::EndTimestamp(timestamp.clone())); timestamp } @@ -268,7 +297,8 @@ impl<'a> MetricBuilder<'a> { partition: usize, ) -> PruningMetrics { let pruning_metrics = PruningMetrics::new(); - self.with_partition(partition) + self.with_category(MetricCategory::Rows) + .with_partition(partition) .build(MetricValue::PruningMetrics { name: name.into(), // inner values will be `Arc::clone()` @@ -294,10 +324,12 @@ impl<'a> MetricBuilder<'a> { merge_strategy: RatioMergeStrategy, ) -> RatioMetrics { let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy); - self.with_partition(partition).build(MetricValue::Ratio { - name: name.into(), - ratio_metrics: ratio_metrics.clone(), - }); + self.with_category(MetricCategory::Rows) + .with_partition(partition) + .build(MetricValue::Ratio { + name: name.into(), + ratio_metrics: ratio_metrics.clone(), + }); ratio_metrics } } diff --git a/datafusion/physical-expr-common/src/metrics/expression.rs b/datafusion/physical-expr-common/src/metrics/expression.rs index 4a092b0d1b522..8dd4705d486d4 100644 --- a/datafusion/physical-expr-common/src/metrics/expression.rs +++ b/datafusion/physical-expr-common/src/metrics/expression.rs @@ -61,7 +61,7 @@ impl ExpressionEvaluatorMetrics { .map(|(idx, label)| { MetricBuilder::new(metrics) .with_new_label("expr", label.into()) - .with_type(MetricType::DEV) + .with_type(MetricType::Dev) // Existing PhysicalExpr formatter is a bit verbose, so use simple name .subset_time(format!("expr_{idx}_eval_time"), partition) }) diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index 18dafa41276d9..8d877713cae82 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -24,10 +24,11 @@ mod expression; mod value; use datafusion_common::HashMap; +pub use datafusion_common::format::{MetricCategory, MetricType}; use parking_lot::Mutex; use std::{ borrow::Cow, - fmt::{Debug, Display}, + fmt::{self, Debug, Display}, sync::Arc, }; @@ -81,31 +82,17 @@ pub struct Metric { partition: Option, metric_type: MetricType, -} -/// Categorizes metrics so the display layer can choose the desired verbosity. -/// -/// # How is it used: -/// The `datafusion.explain.analyze_level` configuration controls which category is shown. -/// - When set to `dev`, all metrics with type `MetricType::Summary` or `MetricType::DEV` -/// will be shown. -/// - When set to `summary`, only metrics with type `MetricType::Summary` are shown. -/// -/// # Difference from `EXPLAIN ANALYZE VERBOSE`: -/// The `VERBOSE` keyword controls whether per-partition metrics are shown (when specified), -/// or aggregated metrics are displayed (when omitted). -/// In contrast, the `analyze_level` configuration determines which categories or -/// levels of metrics are displayed. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum MetricType { - /// Common metrics for high-level insights (answering which operator is slow) - SUMMARY, - /// For deep operator-level introspection for developers - DEV, + /// Optional semantic category (rows / bytes / timing). + /// + /// When `None` (the default for custom metrics), the metric is + /// **always included** unless the user sets + /// `analyze_categories = 'none'`. + metric_category: Option, } impl Display for Metric { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.value.name())?; let mut iter = self @@ -146,7 +133,8 @@ impl Metric { value, labels: vec![], partition, - metric_type: MetricType::DEV, + metric_type: MetricType::Dev, + metric_category: None, } } @@ -161,16 +149,26 @@ impl Metric { value, labels, partition, - metric_type: MetricType::DEV, + metric_type: MetricType::Dev, + metric_category: None, } } - /// Set the type for this metric. Defaults to [`MetricType::DEV`] + /// Set the type for this metric. Defaults to [`MetricType::Dev`] pub fn with_type(mut self, metric_type: MetricType) -> Self { self.metric_type = metric_type; self } + /// Set the semantic category for this metric. + /// + /// See [`MetricCategory`] for details on the determinism properties + /// of each category. + pub fn with_category(mut self, category: MetricCategory) -> Self { + self.metric_category = Some(category); + self + } + /// Add a new label to this metric pub fn with_label(mut self, label: Label) -> Self { self.labels.push(label); @@ -201,6 +199,13 @@ impl Metric { pub fn metric_type(&self) -> MetricType { self.metric_type } + + /// Return the metric category, if one was declared. + /// + /// `None` means the metric is always included (except in `none` mode). + pub fn metric_category(&self) -> Option { + self.metric_category + } } /// A snapshot of the metrics for a particular execution plan. @@ -327,6 +332,9 @@ impl MetricsSet { let partition = None; let mut accum = Metric::new(metric.value().new_empty(), partition) .with_type(metric.metric_type()); + if let Some(cat) = metric.metric_category() { + accum = accum.with_category(cat); + } accum.value_mut().aggregate(metric.value()); accum }); @@ -379,11 +387,37 @@ impl MetricsSet { .collect::>(); Self { metrics } } + + /// Returns a new `MetricsSet` filtered by [`MetricCategory`]. + /// + /// - Metrics that declared a category are kept only when that + /// category appears in `allowed`. + /// - Metrics with **no** declared category are treated as + /// [`Uncategorized`](MetricCategory::Uncategorized) for filtering. + /// - An **empty** `allowed` slice means "plan only": all metrics are + /// removed. + pub fn filter_by_categories(self, allowed: &[MetricCategory]) -> Self { + if allowed.is_empty() { + return Self { metrics: vec![] }; + } + + let metrics = self + .metrics + .into_iter() + .filter(|metric| { + let cat = metric + .metric_category() + .unwrap_or(MetricCategory::Uncategorized); + allowed.contains(&cat) + }) + .collect::>(); + Self { metrics } + } } impl Display for MetricsSet { /// Format the [`MetricsSet`] as a single string - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut is_first = true; for i in self.metrics.iter() { if !is_first { @@ -473,7 +507,7 @@ impl Label { } impl Display for Label { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}={}", self.name, self.value) } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 5b41a47406797..056a7f171a516 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -29,7 +29,7 @@ use crate::aggregates::{ AggregateInputMode, AggregateMode, AggregateOutputMode, PhysicalGroupBy, create_schema, evaluate_group_by, evaluate_many, evaluate_optional, }; -use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; +use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; use crate::{PhysicalExpr, aggregates, metrics}; @@ -613,6 +613,7 @@ impl GroupedHashAggregateStream { merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(merging_group_by_expr), peak_mem_used: MetricBuilder::new(&agg.metrics) + .with_category(MetricCategory::Bytes) .gauge("peak_mem_used", partition), spill_manager, }; @@ -637,6 +638,7 @@ impl GroupedHashAggregateStream { let probe_ratio_threshold = options.skip_partial_aggregation_probe_ratio_threshold; let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) + .with_category(MetricCategory::Rows) .counter("skipped_aggregation_rows", partition); Some(SkipAggregationProbe::new( probe_rows_threshold, @@ -650,7 +652,7 @@ impl GroupedHashAggregateStream { let reduction_factor = if agg.mode == AggregateMode::Partial { Some( MetricBuilder::new(&agg.metrics) - .with_type(metrics::MetricType::SUMMARY) + .with_type(metrics::MetricType::Summary) .ratio_metrics("reduction_factor", partition), ) } else { diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 4aa78055daee3..845908fdd2187 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -26,7 +26,7 @@ use super::{ SendableRecordBatchStream, }; use crate::display::DisplayableExecutionPlan; -use crate::metrics::MetricType; +use crate::metrics::{MetricCategory, MetricType}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; @@ -49,6 +49,8 @@ pub struct AnalyzeExec { show_statistics: bool, /// Which metric categories should be displayed metric_types: Vec, + /// Optional filter by semantic category (rows / bytes / timing). + metric_categories: Option>, /// The input plan (the plan being analyzed) pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node @@ -62,6 +64,7 @@ impl AnalyzeExec { verbose: bool, show_statistics: bool, metric_types: Vec, + metric_categories: Option>, input: Arc, schema: SchemaRef, ) -> Self { @@ -70,6 +73,7 @@ impl AnalyzeExec { verbose, show_statistics, metric_types, + metric_categories, input, schema, cache: Arc::new(cache), @@ -86,6 +90,11 @@ impl AnalyzeExec { self.show_statistics } + /// Access to metric_categories + pub fn metric_categories(&self) -> Option<&[MetricCategory]> { + self.metric_categories.as_deref() + } + /// The input plan pub fn input(&self) -> &Arc { &self.input @@ -160,6 +169,7 @@ impl ExecutionPlan for AnalyzeExec { self.verbose, self.show_statistics, self.metric_types.clone(), + self.metric_categories.clone(), children.pop().unwrap(), Arc::clone(&self.schema), ))) @@ -198,6 +208,7 @@ impl ExecutionPlan for AnalyzeExec { let verbose = self.verbose; let show_statistics = self.show_statistics; let metric_types = self.metric_types.clone(); + let metric_categories = self.metric_categories.clone(); // future that gathers the results from all the tasks in the // JoinSet that computes the overall row count and final @@ -218,6 +229,7 @@ impl ExecutionPlan for AnalyzeExec { &captured_input, &captured_schema, &metric_types, + metric_categories.as_deref(), ) }; @@ -229,6 +241,7 @@ impl ExecutionPlan for AnalyzeExec { } /// Creates the output of AnalyzeExec as a RecordBatch +#[expect(clippy::too_many_arguments)] fn create_output_batch( verbose: bool, show_statistics: bool, @@ -237,6 +250,7 @@ fn create_output_batch( input: &Arc, schema: &SchemaRef, metric_types: &[MetricType], + metric_categories: Option<&[MetricCategory]>, ) -> Result { let mut type_builder = StringBuilder::with_capacity(1, 1024); let mut plan_builder = StringBuilder::with_capacity(1, 1024); @@ -246,6 +260,7 @@ fn create_output_batch( let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) .set_metric_types(metric_types.to_vec()) + .set_metric_categories(metric_categories.map(|c| c.to_vec())) .set_show_statistics(show_statistics) .indent(verbose) .to_string(); @@ -258,6 +273,7 @@ fn create_output_batch( let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) .set_metric_types(metric_types.to_vec()) + .set_metric_categories(metric_categories.map(|c| c.to_vec())) .set_show_statistics(show_statistics) .indent(verbose) .to_string(); @@ -305,7 +321,8 @@ mod tests { let analyze_exec = Arc::new(AnalyzeExec::new( true, false, - vec![MetricType::SUMMARY, MetricType::DEV], + vec![MetricType::Summary, MetricType::Dev], + None, blocking_exec, schema, )); diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 890d9f63b3929..31e38419b32fa 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -37,7 +37,7 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr_common::metrics::{ - ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet, }; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -204,12 +204,16 @@ impl ExecutionPlan for BufferExec { let curr_mem_in = Arc::new(AtomicUsize::new(0)); let curr_mem_out = Arc::clone(&curr_mem_in); let mut max_mem_in = 0; - let max_mem = MetricBuilder::new(&self.metrics).gauge("max_mem_used", partition); + let max_mem = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Bytes) + .gauge("max_mem_used", partition); let curr_queued_in = Arc::new(AtomicUsize::new(0)); let curr_queued_out = Arc::clone(&curr_queued_in); let mut max_queued_in = 0; - let max_queued = MetricBuilder::new(&self.metrics).gauge("max_queued", partition); + let max_queued = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .gauge("max_queued", partition); // Capture metrics when an element is queued on the stream. let in_stream = in_stream.inspect_ok(move |v| { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index aaf83345d99b8..964eb152e8096 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -28,7 +28,7 @@ use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan}; use datafusion_expr::display_schema; use datafusion_physical_expr::LexOrdering; -use crate::metrics::MetricType; +use crate::metrics::{MetricCategory, MetricType}; use crate::render_tree::RenderTree; use super::{ExecutionPlan, ExecutionPlanVisitor, accept}; @@ -123,13 +123,16 @@ pub struct DisplayableExecutionPlan<'a> { show_schema: bool, /// Which metric categories should be included when rendering metric_types: Vec, + /// Optional filter by semantic category (rows / bytes / timing). + /// `None` means show all categories; `Some(vec![])` means plan-only. + metric_categories: Option>, // (TreeRender) Maximum total width of the rendered tree tree_maximum_render_width: usize, } impl<'a> DisplayableExecutionPlan<'a> { fn default_metric_types() -> Vec { - vec![MetricType::SUMMARY, MetricType::DEV] + vec![MetricType::Summary, MetricType::Dev] } /// Create a wrapper around an [`ExecutionPlan`] which can be @@ -141,6 +144,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: false, show_schema: false, metric_types: Self::default_metric_types(), + metric_categories: None, tree_maximum_render_width: 240, } } @@ -155,6 +159,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: false, show_schema: false, metric_types: Self::default_metric_types(), + metric_categories: None, tree_maximum_render_width: 240, } } @@ -169,6 +174,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: false, show_schema: false, metric_types: Self::default_metric_types(), + metric_categories: None, tree_maximum_render_width: 240, } } @@ -194,6 +200,23 @@ impl<'a> DisplayableExecutionPlan<'a> { self } + /// Specify which metric categories to include. + /// + /// - `None` means show all categories (default). + /// - `Some(vec![])` means plan-only — suppress all metrics. + /// - `Some(vec![Rows])` means show only row-count metrics (plus + /// uncategorized metrics). + /// + /// See [`MetricCategory`] for the determinism properties of each + /// category. + pub fn set_metric_categories( + mut self, + metric_categories: Option>, + ) -> Self { + self.metric_categories = metric_categories; + self + } + /// Set the maximum render width for the tree format pub fn set_tree_maximum_render_width(mut self, width: usize) -> Self { self.tree_maximum_render_width = width; @@ -223,6 +246,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: bool, show_schema: bool, metric_types: Vec, + metric_categories: Option>, } impl fmt::Display for Wrapper<'_> { fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -234,6 +258,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: self.show_statistics, show_schema: self.show_schema, metric_types: &self.metric_types, + metric_categories: self.metric_categories.as_deref(), }; accept(self.plan, &mut visitor) } @@ -245,6 +270,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: self.show_statistics, show_schema: self.show_schema, metric_types: self.metric_types.clone(), + metric_categories: self.metric_categories.clone(), } } @@ -265,6 +291,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics, show_statistics: bool, metric_types: Vec, + metric_categories: Option>, } impl fmt::Display for Wrapper<'_> { fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -276,6 +303,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, metric_types: &self.metric_types, + metric_categories: self.metric_categories.as_deref(), graphviz_builder: GraphvizBuilder::default(), parents: Vec::new(), }; @@ -294,6 +322,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, metric_types: self.metric_types.clone(), + metric_categories: self.metric_categories.clone(), } } @@ -329,6 +358,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: bool, show_schema: bool, metric_types: Vec, + metric_categories: Option>, } impl fmt::Display for Wrapper<'_> { @@ -341,6 +371,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: self.show_statistics, show_schema: self.show_schema, metric_types: &self.metric_types, + metric_categories: self.metric_categories.as_deref(), }; visitor.pre_visit(self.plan)?; Ok(()) @@ -353,6 +384,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_statistics: self.show_statistics, show_schema: self.show_schema, metric_types: self.metric_types.clone(), + metric_categories: self.metric_categories.clone(), } } @@ -409,6 +441,8 @@ struct IndentVisitor<'a, 'b> { show_schema: bool, /// Which metric types should be rendered metric_types: &'a [MetricType], + /// Optional filter by semantic category (rows / bytes / timing). + metric_categories: Option<&'a [MetricCategory]>, } impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { @@ -420,12 +454,14 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { ShowMetrics::None => {} ShowMetrics::Aggregated => { if let Some(metrics) = plan.metrics() { - let metrics = metrics + let mut metrics = metrics .filter_by_metric_types(self.metric_types) .aggregate_by_name() .sorted_for_display() .timestamps_removed(); - + if let Some(cats) = self.metric_categories { + metrics = metrics.filter_by_categories(cats); + } write!(self.f, ", metrics=[{metrics}]")?; } else { write!(self.f, ", metrics=[]")?; @@ -433,7 +469,10 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { } ShowMetrics::Full => { if let Some(metrics) = plan.metrics() { - let metrics = metrics.filter_by_metric_types(self.metric_types); + let mut metrics = metrics.filter_by_metric_types(self.metric_types); + if let Some(cats) = self.metric_categories { + metrics = metrics.filter_by_categories(cats); + } write!(self.f, ", metrics=[{metrics}]")?; } else { write!(self.f, ", metrics=[]")?; @@ -472,6 +511,8 @@ struct GraphvizVisitor<'a, 'b> { show_statistics: bool, /// Which metric types should be rendered metric_types: &'a [MetricType], + /// Optional filter by semantic category + metric_categories: Option<&'a [MetricCategory]>, graphviz_builder: GraphvizBuilder, /// Used to record parent node ids when visiting a plan. @@ -508,12 +549,14 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { ShowMetrics::None => "".to_string(), ShowMetrics::Aggregated => { if let Some(metrics) = plan.metrics() { - let metrics = metrics + let mut metrics = metrics .filter_by_metric_types(self.metric_types) .aggregate_by_name() .sorted_for_display() .timestamps_removed(); - + if let Some(cats) = self.metric_categories { + metrics = metrics.filter_by_categories(cats); + } format!("metrics=[{metrics}]") } else { "metrics=[]".to_string() @@ -521,7 +564,10 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { } ShowMetrics::Full => { if let Some(metrics) = plan.metrics() { - let metrics = metrics.filter_by_metric_types(self.metric_types); + let mut metrics = metrics.filter_by_metric_types(self.metric_types); + if let Some(cats) = self.metric_categories { + metrics = metrics.filter_by_categories(cats); + } format!("metrics=[{metrics}]") } else { "metrics=[]".to_string() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 802261430c47a..c3de2d5b57cc8 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -903,7 +903,7 @@ impl FilterExecMetrics { Self { baseline_metrics: BaselineMetrics::new(metrics, partition), selectivity: MetricBuilder::new(metrics) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .ratio_metrics("selectivity", partition), } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 2d6e94769b54a..8f0b5181e0618 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -46,7 +46,7 @@ use crate::joins::utils::{ swap_join_projection, update_hash, }; use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; -use crate::metrics::{Count, MetricBuilder}; +use crate::metrics::{Count, MetricBuilder, MetricCategory}; use crate::projection::{ EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, try_pushdown_through_join, @@ -1317,6 +1317,7 @@ impl ExecutionPlan for HashJoinExec { let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let array_map_created_count = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); let left_fut = match self.mode { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index a0e214a38acfc..1582556b01e11 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -923,7 +923,7 @@ impl NestedLoopJoinMetrics { Self { join_metrics: BuildProbeJoinMetrics::new(partition, metrics), selectivity: MetricBuilder::new(metrics) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .ratio_metrics("selectivity", partition), } } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs index 8457408919e63..cf76b2ba8c456 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs @@ -18,8 +18,8 @@ //! Module for tracking Sort Merge Join metrics use crate::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, SpillMetrics, - Time, + BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, + MetricCategory, SpillMetrics, Time, }; /// Metrics for SortMergeJoinExec @@ -42,10 +42,15 @@ pub(super) struct SortMergeJoinMetrics { impl SortMergeJoinMetrics { pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition); - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); + let input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_batches", partition); + let input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_rows", partition); + let peak_mem_used = MetricBuilder::new(metrics) + .with_category(MetricCategory::Bytes) + .gauge("peak_mem_used", partition); let spill_metrics = SpillMetrics::new(metrics, partition); let baseline_metrics = BaselineMetrics::new(metrics, partition); diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index beed07f562db3..d00b74df6b1d2 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -28,7 +28,9 @@ use crate::joins::join_hash_map::{ update_from_iter, }; use crate::joins::utils::{JoinFilter, JoinHashMapType}; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, +}; use crate::{ExecutionPlan, metrics}; use arrow::array::{ @@ -700,26 +702,31 @@ pub struct StreamJoinMetrics { impl StreamJoinMetrics { pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let input_batches = - MetricBuilder::new(metrics).counter("left_input_batches", partition); - let input_rows = - MetricBuilder::new(metrics).counter("left_input_rows", partition); + let input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("left_input_batches", partition); + let input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("left_input_rows", partition); let left = StreamJoinSideMetrics { input_batches, input_rows, }; - let input_batches = - MetricBuilder::new(metrics).counter("right_input_batches", partition); - let input_rows = - MetricBuilder::new(metrics).counter("right_input_rows", partition); + let input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("right_input_batches", partition); + let input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("right_input_rows", partition); let right = StreamJoinSideMetrics { input_batches, input_rows, }; - let stream_memory_usage = - MetricBuilder::new(metrics).gauge("stream_memory_usage", partition); + let stream_memory_usage = MetricBuilder::new(metrics) + .with_category(MetricCategory::Bytes) + .gauge("stream_memory_usage", partition); Self { left, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 47cb118aee2b0..ffe8cf4a9efd8 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -28,7 +28,8 @@ use std::task::{Context, Poll}; use crate::joins::SharedBitmapBuilder; use crate::metrics::{ - self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType, + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, + MetricType, }; use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::{ @@ -1420,26 +1421,32 @@ impl BuildProbeJoinMetrics { let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition); - let build_input_batches = - MetricBuilder::new(metrics).counter("build_input_batches", partition); + let build_input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("build_input_batches", partition); - let build_input_rows = - MetricBuilder::new(metrics).counter("build_input_rows", partition); + let build_input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("build_input_rows", partition); - let build_mem_used = - MetricBuilder::new(metrics).gauge("build_mem_used", partition); + let build_mem_used = MetricBuilder::new(metrics) + .with_category(MetricCategory::Bytes) + .gauge("build_mem_used", partition); - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); + let input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_batches", partition); - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + let input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_rows", partition); let probe_hit_rate = MetricBuilder::new(metrics) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .ratio_metrics("probe_hit_rate", partition); let avg_fanout = MetricBuilder::new(metrics) - .with_type(MetricType::SUMMARY) + .with_type(MetricType::Summary) .ratio_metrics("avg_fanout", partition); Self { diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index a5d4669e7288d..ab9249985b863 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -27,7 +27,8 @@ use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; use super::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, RecordOutput, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, + RecordOutput, }; use crate::spill::get_record_batch_memory_size; use crate::{SendableRecordBatchStream, stream::RecordBatchStreamAdapter}; @@ -647,6 +648,7 @@ impl TopKMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), row_replacements: MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) .counter("row_replacements", partition), } } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 85799250181b6..a70b292120334 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -22,8 +22,8 @@ use std::task::{Poll, ready}; use std::{any::Any, sync::Arc}; use super::metrics::{ - self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, - RecordOutput, + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, + MetricsSet, RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::{ @@ -302,10 +302,13 @@ struct UnnestMetrics { impl UnnestMetrics { fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); + let input_batches = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_batches", partition); - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + let input_rows = MetricBuilder::new(metrics) + .with_category(MetricCategory::Rows) + .counter("input_rows", partition); Self { baseline_metrics: BaselineMetrics::new(metrics, partition), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e422ce7bed4f3..9464e85727d4f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1199,6 +1199,10 @@ message AnalyzeExecNode { bool show_statistics = 2; PhysicalPlanNode input = 3; datafusion_common.Schema schema = 4; + // Optional metric category filter. + // Empty means "plan only". Absent (has_metric_categories=false) means "all". + bool has_metric_categories = 5; + repeated string metric_categories = 6; } message CrossJoinExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index eb86afe3d6e00..c81e2fabe1185 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -975,6 +975,12 @@ impl serde::Serialize for AnalyzeExecNode { if self.schema.is_some() { len += 1; } + if self.has_metric_categories { + len += 1; + } + if !self.metric_categories.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.AnalyzeExecNode", len)?; if self.verbose { struct_ser.serialize_field("verbose", &self.verbose)?; @@ -988,6 +994,12 @@ impl serde::Serialize for AnalyzeExecNode { if let Some(v) = self.schema.as_ref() { struct_ser.serialize_field("schema", v)?; } + if self.has_metric_categories { + struct_ser.serialize_field("hasMetricCategories", &self.has_metric_categories)?; + } + if !self.metric_categories.is_empty() { + struct_ser.serialize_field("metricCategories", &self.metric_categories)?; + } struct_ser.end() } } @@ -1003,6 +1015,10 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { "showStatistics", "input", "schema", + "has_metric_categories", + "hasMetricCategories", + "metric_categories", + "metricCategories", ]; #[allow(clippy::enum_variant_names)] @@ -1011,6 +1027,8 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { ShowStatistics, Input, Schema, + HasMetricCategories, + MetricCategories, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1036,6 +1054,8 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { "showStatistics" | "show_statistics" => Ok(GeneratedField::ShowStatistics), "input" => Ok(GeneratedField::Input), "schema" => Ok(GeneratedField::Schema), + "hasMetricCategories" | "has_metric_categories" => Ok(GeneratedField::HasMetricCategories), + "metricCategories" | "metric_categories" => Ok(GeneratedField::MetricCategories), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1059,6 +1079,8 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { let mut show_statistics__ = None; let mut input__ = None; let mut schema__ = None; + let mut has_metric_categories__ = None; + let mut metric_categories__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Verbose => { @@ -1085,6 +1107,18 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { } schema__ = map_.next_value()?; } + GeneratedField::HasMetricCategories => { + if has_metric_categories__.is_some() { + return Err(serde::de::Error::duplicate_field("hasMetricCategories")); + } + has_metric_categories__ = Some(map_.next_value()?); + } + GeneratedField::MetricCategories => { + if metric_categories__.is_some() { + return Err(serde::de::Error::duplicate_field("metricCategories")); + } + metric_categories__ = Some(map_.next_value()?); + } } } Ok(AnalyzeExecNode { @@ -1092,6 +1126,8 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode { show_statistics: show_statistics__.unwrap_or_default(), input: input__, schema: schema__, + has_metric_categories: has_metric_categories__.unwrap_or_default(), + metric_categories: metric_categories__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e0a0c636fbb32..ff9133b1ced5b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1785,6 +1785,12 @@ pub struct AnalyzeExecNode { pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, tag = "4")] pub schema: ::core::option::Option, + /// Optional metric category filter. + /// Empty means "plan only". Absent (has_metric_categories=false) means "all". + #[prost(bool, tag = "5")] + pub has_metric_categories: bool, + #[prost(string, repeated, tag = "6")] + pub metric_categories: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CrossJoinExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index cf06b60d1cd05..0f37e9ad3f942 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -79,7 +79,7 @@ use datafusion_physical_plan::joins::{ }; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::memory::LazyMemoryExec; -use datafusion_physical_plan::metrics::MetricType; +use datafusion_physical_plan::metrics::{MetricCategory, MetricType}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -1831,10 +1831,21 @@ impl protobuf::PhysicalPlanNode { ) -> Result> { let input: Arc = into_physical_plan(&analyze.input, ctx, codec, proto_converter)?; + let metric_categories = if analyze.has_metric_categories { + let cats: Result> = analyze + .metric_categories + .iter() + .map(|s| s.parse::()) + .collect(); + Some(cats?) + } else { + None + }; Ok(Arc::new(AnalyzeExec::new( analyze.verbose, analyze.show_statistics, - vec![MetricType::SUMMARY, MetricType::DEV], + vec![MetricType::Summary, MetricType::Dev], + metric_categories, input, Arc::new(convert_required!(analyze.schema)?), ))) @@ -2302,6 +2313,10 @@ impl protobuf::PhysicalPlanNode { codec, proto_converter, )?; + let (has_metric_categories, metric_categories) = match exec.metric_categories() { + Some(cats) => (true, cats.iter().map(|c| c.to_string()).collect()), + None => (false, vec![]), + }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new( protobuf::AnalyzeExecNode { @@ -2309,6 +2324,8 @@ impl protobuf::PhysicalPlanNode { show_statistics: exec.show_statistics(), input: Some(Box::new(input)), schema: Some(exec.schema().as_ref().try_into()?), + has_metric_categories, + metric_categories, }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 2e77139ecd2ff..2b744a8fbdd19 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1558,7 +1558,8 @@ fn roundtrip_analyze() -> Result<()> { roundtrip_test(Arc::new(AnalyzeExec::new( false, false, - vec![MetricType::SUMMARY, MetricType::DEV], + vec![MetricType::Summary, MetricType::Dev], + None, input, Arc::new(schema), ))) diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index e109b32a95ed1..b35e9c8588b22 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -66,3 +66,134 @@ Plan with Metrics statement ok reset datafusion.explain.analyze_level; + +# ------------------------------------------------ +# Test analyze_categories: filter metrics by kind +# ------------------------------------------------ +# Categories classify metrics by determinism: +# rows, bytes — depend on plan + data, deterministic across runs +# timing — varies run-to-run even on same hardware + +# --- Setup: create a small parquet table with multiple row groups --- + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE TABLE _cat_data AS VALUES + ('Anow Vole', 7), + ('Brown Bear', 133), + ('Gray Wolf', 82), + ('Lynx', 71), + ('Red Fox', 40), + ('Alpine Bat', 6), + ('Nlpine Ibex', 101), + ('Nlpine Goat', 76), + ('Nlpine Sheep', 83), + ('Europ. Mole', 4), + ('Polecat', 16), + ('Alpine Ibex', 97); + +statement ok +COPY (SELECT column1 as species, column2 as s FROM _cat_data) +TO 'test_files/scratch/explain_analyze/data.parquet' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' '3'); + +statement ok +drop table _cat_data; + +statement ok +CREATE EXTERNAL TABLE cat_tracking +STORED AS PARQUET +LOCATION 'test_files/scratch/explain_analyze/data.parquet'; + +# ---- categories = 'none': plan only, no metrics at all ---- + +statement ok +set datafusion.explain.analyze_level = summary; + +statement ok +set datafusion.explain.analyze_categories = 'none'; + +query TT +explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[] + +statement ok +reset datafusion.explain.analyze_categories; + +# ---- categories = 'rows': deterministic row-count metrics only ---- +# Note: no elapsed_compute, no output_bytes, no bytes_scanned, no metadata_load_time + +statement ok +set datafusion.explain.analyze_categories = 'rows'; + +query TT +explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22% (521/2.35 K)] + +statement ok +reset datafusion.explain.analyze_categories; + +# ---- categories = 'rows,bytes': add byte metrics, still no timing ---- + +statement ok +set datafusion.explain.analyze_categories = 'rows,bytes'; + +query TT +explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] + +statement ok +reset datafusion.explain.analyze_categories; + +# ---- categories = 'rows,bytes,uncategorized': everything except timing ---- + +statement ok +set datafusion.explain.analyze_categories = 'rows,bytes,uncategorized'; + +query TT +explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] + +statement ok +reset datafusion.explain.analyze_categories; + +# ---- categories = 'timing': only timing metrics (non-deterministic) ---- + +statement ok +set datafusion.explain.analyze_categories = 'timing'; + +query TT +explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[elapsed_compute=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[elapsed_compute=, metadata_load_time=] + +statement ok +reset datafusion.explain.analyze_categories; + +statement ok +reset datafusion.explain.analyze_level; + +# --- Teardown --- + +statement ok +drop table cat_tracking; + +statement ok +reset datafusion.execution.parquet.pushdown_filters; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2da823421dd76..ba1f1403450a9 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -275,6 +275,7 @@ datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false +datafusion.explain.analyze_categories all datafusion.explain.analyze_level dev datafusion.explain.format indent datafusion.explain.logical_plan_only false @@ -416,6 +417,7 @@ datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. +datafusion.explain.analyze_categories all Which metric categories to include in "EXPLAIN ANALYZE" output. Comma-separated list of: "rows", "bytes", "timing", "uncategorized". Use "none" to show plan structure only, or "all" (default) to show everything. Metrics without a declared category are treated as "uncategorized". datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 56ab4d1539f92..7ddced931a349 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -177,6 +177,7 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | +| datafusion.explain.analyze_categories | all | Which metric categories to include in "EXPLAIN ANALYZE" output. Comma-separated list of: "rows", "bytes", "timing", "uncategorized". Use "none" to show plan structure only, or "all" (default) to show everything. Metrics without a declared category are treated as "uncategorized". | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. |