diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9b6e6aa5dac37..77fb0af5bfd87 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -684,6 +684,14 @@ config_namespace! { /// /// Disabled by default, set to a number greater than 0 for enabling it. pub hash_join_buffering_capacity: usize, default = 0 + + /// When set to true, after executing a plan via `collect()` or + /// `collect_partitioned()`, DataFusion verifies that operators + /// declaring `CardinalityEffect::Equal` produced exactly the same + /// number of output rows as their input. This is a post-execution + /// sanity check useful for debugging correctness issues. + /// Disabled by default as it adds a small amount of overhead. + pub verify_cardinality_effect: bool, default = false } } diff --git a/datafusion/physical-plan/src/cardinality_check.rs b/datafusion/physical-plan/src/cardinality_check.rs new file mode 100644 index 0000000000000..3592ded2f6d0d --- /dev/null +++ b/datafusion/physical-plan/src/cardinality_check.rs @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Post-execution cardinality validation. +//! +//! Provides [`validate_cardinality_effect`] which walks an executed plan tree +//! and verifies that every operator declaring [`CardinalityEffect::Equal`] +//! produced exactly as many output rows as its input. + +use crate::ExecutionPlan; +use crate::execution_plan::CardinalityEffect; +use crate::visitor::{ExecutionPlanVisitor, visit_execution_plan}; +use datafusion_common::{DataFusionError, Result}; + +/// Visitor that checks cardinality invariants after execution. +struct CardinalityCheckVisitor; + +impl ExecutionPlanVisitor for CardinalityCheckVisitor { + type Error = DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + if !matches!(plan.cardinality_effect(), CardinalityEffect::Equal) { + return Ok(true); + } + + let children = plan.children(); + if children.len() != 1 { + return Ok(true); + } + + // Operators with a fetch limit may legitimately produce fewer rows + // even if they declare CardinalityEffect::Equal (e.g. CoalesceBatchesExec). + if plan.fetch().is_some() { + return Ok(true); + } + + let child = children[0]; + + let parent_metrics = match plan.metrics() { + Some(m) => m, + None => return Ok(true), + }; + let child_metrics = match child.metrics() { + Some(m) => m, + None => return Ok(true), + }; + + let parent_rows = match parent_metrics.output_rows() { + Some(r) => r, + None => return Ok(true), + }; + let child_rows = match child_metrics.output_rows() { + Some(r) => r, + None => return Ok(true), + }; + + if parent_rows != child_rows { + return Err(DataFusionError::Internal(format!( + "CardinalityEffect::Equal violation in {}: \ + operator produced {parent_rows} output rows \ + but its input produced {child_rows} rows", + plan.name(), + ))); + } + + Ok(true) + } +} + +/// Walk the execution plan tree and verify that every operator declaring +/// [`CardinalityEffect::Equal`] produced exactly as many output rows as its +/// single child input, based on post-execution metrics. +/// +/// Nodes are silently skipped when: +/// - They do not declare `CardinalityEffect::Equal` +/// - They are not unary (zero or multiple children) +/// - They have a `fetch` limit set +/// - Metrics are unavailable on either the node or its child +/// +/// Returns `Err(DataFusionError::Internal)` on the first violation found. +pub fn validate_cardinality_effect(plan: &dyn ExecutionPlan) -> Result<()> { + let mut visitor = CardinalityCheckVisitor; + visit_execution_plan(plan, &mut visitor) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::execution_plan::{Boundedness, EmissionType}; + use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; + use crate::{DisplayAs, DisplayFormatType, PlanProperties}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::Result; + use datafusion_physical_expr::EquivalenceProperties; + use std::any::Any; + use std::fmt; + use std::sync::Arc; + + /// A mock execution plan for testing cardinality validation. + #[derive(Debug)] + struct MockExec { + name: &'static str, + children: Vec>, + effect: CardinalityEffect, + mock_output_rows: Option, + fetch: Option, + metrics: ExecutionPlanMetricsSet, + cache: Arc, + } + + impl MockExec { + fn new( + name: &'static str, + children: Vec>, + effect: CardinalityEffect, + mock_output_rows: Option, + fetch: Option, + ) -> Self { + let schema = test_schema(); + let cache = Arc::new(PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + crate::Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + )); + let metrics = ExecutionPlanMetricsSet::new(); + if let Some(rows) = mock_output_rows { + let counter = MetricBuilder::new(&metrics).output_rows(0); + counter.add(rows); + } + Self { + name, + children, + effect, + mock_output_rows, + fetch, + metrics, + cache, + } + } + } + + impl DisplayAs for MockExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MockExec: {}", self.name) + } + } + + impl ExecutionPlan for MockExec { + fn name(&self) -> &'static str { + self.name + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + self.children.iter().collect() + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut( + &dyn crate::PhysicalExpr, + ) + -> Result, + ) -> Result { + Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue) + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!("not needed for tests") + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("not needed for tests") + } + + fn cardinality_effect(&self) -> CardinalityEffect { + self.effect + } + + fn fetch(&self) -> Option { + self.fetch + } + + fn metrics(&self) -> Option { + if self.mock_output_rows.is_some() { + Some(self.metrics.clone_inner()) + } else { + None + } + } + } + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) + } + + #[test] + fn test_equal_cardinality_passes() { + // Parent and child both produce 100 rows + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "parent", + vec![child], + CardinalityEffect::Equal, + Some(100), + None, + )); + + assert!(validate_cardinality_effect(parent.as_ref()).is_ok()); + } + + #[test] + fn test_equal_cardinality_violation() { + // Parent produces 90 rows but child produced 100 + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "TestRepartition", + vec![child], + CardinalityEffect::Equal, + Some(90), + None, + )); + + let err = validate_cardinality_effect(parent.as_ref()).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("CardinalityEffect::Equal violation"), + "unexpected error: {msg}" + ); + assert!(msg.contains("TestRepartition"), "unexpected error: {msg}"); + assert!(msg.contains("90"), "unexpected error: {msg}"); + assert!(msg.contains("100"), "unexpected error: {msg}"); + } + + #[test] + fn test_skips_operator_with_fetch() { + // Operator declares Equal but has fetch set — should be skipped + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "parent_with_fetch", + vec![child], + CardinalityEffect::Equal, + Some(50), + Some(50), + )); + + assert!(validate_cardinality_effect(parent.as_ref()).is_ok()); + } + + #[test] + fn test_skips_operator_without_metrics() { + // Parent declares Equal but has no metrics — should be skipped + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "no_metrics_parent", + vec![child], + CardinalityEffect::Equal, + None, // no metrics + None, + )); + + assert!(validate_cardinality_effect(parent.as_ref()).is_ok()); + } + + #[test] + fn test_skips_non_equal_operator() { + // LowerEqual operator with fewer output rows — should not trigger + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "filter", + vec![child], + CardinalityEffect::LowerEqual, + Some(50), + None, + )); + + assert!(validate_cardinality_effect(parent.as_ref()).is_ok()); + } + + #[test] + fn test_deep_tree_catches_nested_violation() { + // Build: grandparent(Equal, 100) -> parent(Equal, 90!) -> child(Unknown, 100) + // The violation is at the parent level (90 != 100) + let child: Arc = Arc::new(MockExec::new( + "child", + vec![], + CardinalityEffect::Unknown, + Some(100), + None, + )); + let parent: Arc = Arc::new(MockExec::new( + "BadRepartition", + vec![child], + CardinalityEffect::Equal, + Some(90), + None, + )); + let grandparent: Arc = Arc::new(MockExec::new( + "projection", + vec![parent], + CardinalityEffect::Equal, + Some(90), // matches its child (parent), so this is fine + None, + )); + + let err = validate_cardinality_effect(grandparent.as_ref()).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("BadRepartition"), + "should identify the violating operator: {msg}" + ); + } +} diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d1e0978cfe1c0..6bb5193e7cfeb 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -1278,8 +1278,22 @@ pub async fn collect( plan: Arc, context: Arc, ) -> Result> { + let verify = context + .session_config() + .options() + .execution + .verify_cardinality_effect; + let plan_clone = if verify { + Some(Arc::clone(&plan)) + } else { + None + }; let stream = execute_stream(plan, context)?; - crate::common::collect(stream).await + let result = crate::common::collect(stream).await?; + if let Some(plan) = plan_clone { + crate::cardinality_check::validate_cardinality_effect(plan.as_ref())?; + } + Ok(result) } /// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es. @@ -1316,6 +1330,16 @@ pub async fn collect_partitioned( plan: Arc, context: Arc, ) -> Result>> { + let verify = context + .session_config() + .options() + .execution + .verify_cardinality_effect; + let plan_clone = if verify { + Some(Arc::clone(&plan)) + } else { + None + }; let streams = execute_stream_partitioned(plan, context)?; let mut join_set = JoinSet::new(); @@ -1348,6 +1372,10 @@ pub async fn collect_partitioned( batches.sort_by_key(|(idx, _)| *idx); let batches = batches.into_iter().map(|(_, batch)| batch).collect(); + if let Some(plan) = plan_clone { + crate::cardinality_check::validate_cardinality_effect(plan.as_ref())?; + } + Ok(batches) } @@ -1543,6 +1571,7 @@ pub fn get_plan_string(plan: &Arc) -> Vec { /// Indicates the effect an execution plan operator will have on the cardinality /// of its input stream +#[derive(Debug, Clone, Copy)] pub enum CardinalityEffect { /// Unknown effect. This is the default Unknown, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6467d7a2e389d..638fe19038aae 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -64,6 +64,7 @@ pub mod aggregates; pub mod analyze; pub mod async_func; pub mod buffer; +pub mod cardinality_check; pub mod coalesce; pub mod coalesce_batches; pub mod coalesce_partitions; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index aeeb3481c76b9..4c2fdd456bcd4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -275,6 +275,7 @@ datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false +datafusion.execution.verify_cardinality_effect false datafusion.explain.analyze_level dev datafusion.explain.format indent datafusion.explain.logical_plan_only false @@ -415,6 +416,7 @@ datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. +datafusion.execution.verify_cardinality_effect false When set to true, after executing a plan via `collect()` or `collect_partitioned()`, DataFusion verifies that operators declaring `CardinalityEffect::Equal` produced exactly the same number of output rows as their input. This is a post-execution sanity check useful for debugging correctness issues. Disabled by default as it adds a small amount of overhead. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 484ac7a272d9b..3d0dccb09482b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -134,6 +134,7 @@ The following configuration settings are available: | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | | datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | +| datafusion.execution.verify_cardinality_effect | false | When set to true, after executing a plan via `collect()` or `collect_partitioned()`, DataFusion verifies that operators declaring `CardinalityEffect::Equal` produced exactly the same number of output rows as their input. This is a post-execution sanity check useful for debugging correctness issues. Disabled by default as it adds a small amount of overhead. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |