From d95d5dafa0de44ab0ca4b4c851e5c124ad14a81e Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 22 Jan 2026 20:20:13 +0100 Subject: [PATCH 01/12] feat: Extract NDV (distinct_count) statistics from Parquet metadata This change adds support for reading Number of Distinct Values (NDV) statistics from Parquet file metadata when available. Previously, `distinct_count` in `ColumnStatistics` was always set to `Precision::Absent`. Now it is populated from parquet row group column statistics when present: - Single row group with NDV: `Precision::Exact(ndv)` - Multiple row groups with NDV: `Precision::Inexact(max)` as lower bound (we can't accurately merge NDV since duplicates may exist across row groups; max is more conservative than sum for join cardinality estimation) - No NDV available: `Precision::Absent` This provides foundation for improved join cardinality estimation and other statistics-based optimizations. Relates to #15265 --- datafusion/datasource-parquet/src/metadata.rs | 393 +++++++++++++++++- .../src/test_data/ndv_test.parquet | Bin 0 -> 1141 bytes 2 files changed, 390 insertions(+), 3 deletions(-) create mode 100644 datafusion/datasource-parquet/src/test_data/ndv_test.parquet diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 5a4c0bcdd514d..fca48e7798b60 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -297,6 +297,8 @@ impl<'a> DFParquetMetadata<'a> { vec![Some(true); logical_file_schema.fields().len()]; let mut is_min_value_exact = vec![Some(true); logical_file_schema.fields().len()]; + let mut distinct_counts_array = + vec![Precision::Absent; logical_file_schema.fields().len()]; logical_file_schema.fields().iter().enumerate().for_each( |(idx, field)| match StatisticsConverter::try_new( field.name(), @@ -311,8 +313,9 @@ impl<'a> DFParquetMetadata<'a> { is_min_value_exact: &mut is_min_value_exact, is_max_value_exact: &mut is_max_value_exact, column_byte_sizes: &mut column_byte_sizes, + distinct_counts_array: &mut distinct_counts_array, }; - summarize_min_max_null_counts( + summarize_column_statistics( file_metadata.schema_descr(), logical_file_schema, &physical_file_schema, @@ -338,6 +341,7 @@ impl<'a> DFParquetMetadata<'a> { &mut is_max_value_exact, &mut is_min_value_exact, &column_byte_sizes, + &distinct_counts_array, ) } else { // Record column sizes @@ -411,6 +415,7 @@ fn create_max_min_accs( (max_values, min_values) } +#[expect(clippy::too_many_arguments)] fn get_col_stats( schema: &Schema, null_counts: &[Precision], @@ -419,6 +424,7 @@ fn get_col_stats( is_max_value_exact: &mut [Option], is_min_value_exact: &mut [Option], column_byte_sizes: &[Precision], + distinct_counts: &[Precision], ) -> Vec { (0..schema.fields().len()) .map(|i| { @@ -451,7 +457,7 @@ fn get_col_stats( max_value: max_value.unwrap_or(Precision::Absent), min_value: min_value.unwrap_or(Precision::Absent), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: distinct_counts[i], byte_size: column_byte_sizes[i], } }) @@ -466,9 +472,10 @@ struct StatisticsAccumulators<'a> { is_min_value_exact: &'a mut [Option], is_max_value_exact: &'a mut [Option], column_byte_sizes: &'a mut [Precision], + distinct_counts_array: &'a mut [Precision], } -fn summarize_min_max_null_counts( +fn summarize_column_statistics( parquet_schema: &SchemaDescriptor, logical_file_schema: &Schema, physical_file_schema: &Schema, @@ -541,6 +548,36 @@ fn summarize_min_max_null_counts( ) .map(|(idx, _)| idx); + // Extract distinct counts from row group column statistics + accumulators.distinct_counts_array[logical_schema_index] = + if let Some(parquet_idx) = parquet_index { + let distinct_counts: Vec = row_groups_metadata + .iter() + .filter_map(|rg| { + rg.columns() + .get(parquet_idx) + .and_then(|col| col.statistics()) + .and_then(|stats| stats.distinct_count_opt()) + }) + .collect(); + + if distinct_counts.is_empty() { + Precision::Absent + } else if distinct_counts.len() == 1 { + // Single row group with distinct count - use exact value + Precision::Exact(distinct_counts[0] as usize) + } else { + // Multiple row groups - use max as a lower bound estimate + // (can't accurately merge NDV since duplicates may exist across row groups) + match distinct_counts.iter().max() { + Some(&max_ndv) => Precision::Inexact(max_ndv as usize), + None => Precision::Absent, + } + } + } else { + Precision::Absent + }; + let arrow_field = logical_file_schema.field(logical_schema_index); accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size( arrow_field.data_type(), @@ -805,4 +842,354 @@ mod tests { assert_eq!(result, Some(false)); } } + + mod ndv_tests { + use super::*; + use arrow::datatypes::Field; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use parquet::file::statistics::Statistics as ParquetStatistics; + use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + + fn create_schema_descr(num_columns: usize) -> Arc { + let fields: Vec> = (0..num_columns) + .map(|i| { + Arc::new( + SchemaType::primitive_type_builder( + &format!("col_{i}"), + PhysicalType::INT32, + ) + .build() + .unwrap(), + ) + }) + .collect(); + + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + + fn create_arrow_schema(num_columns: usize) -> SchemaRef { + let fields: Vec = (0..num_columns) + .map(|i| Field::new(format!("col_{i}"), DataType::Int32, true)) + .collect(); + Arc::new(Schema::new(fields)) + } + + fn create_row_group_with_stats( + schema_descr: &Arc, + column_stats: Vec>, + num_rows: i64, + ) -> RowGroupMetaData { + let columns: Vec = column_stats + .into_iter() + .enumerate() + .map(|(i, stats)| { + let mut builder = + ColumnChunkMetaData::builder(schema_descr.column(i)); + if let Some(s) = stats { + builder = builder.set_statistics(s); + } + builder.set_num_values(num_rows).build().unwrap() + }) + .collect(); + + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_total_byte_size(1000) + .set_column_metadata(columns) + .build() + .unwrap() + } + + fn create_parquet_metadata( + schema_descr: Arc, + row_groups: Vec, + ) -> ParquetMetaData { + use parquet::file::metadata::FileMetaData; + + let num_rows: i64 = row_groups.iter().map(|rg| rg.num_rows()).sum(); + let file_meta = FileMetaData::new( + 1, // version + num_rows, // num_rows + None, // created_by + None, // key_value_metadata + schema_descr, // schema_descr + None, // column_orders + ); + + ParquetMetaData::new(file_meta, row_groups) + } + + #[test] + fn test_distinct_count_single_row_group_with_ndv() { + // Single row group with distinct count should return Exact + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create statistics with distinct_count = 42 + let stats = ParquetStatistics::int32( + Some(1), // min + Some(100), // max + Some(42), // distinct_count + Some(0), // null_count + false, // is_deprecated + ); + + let row_group = + create_row_group_with_stats(&schema_descr, vec![Some(stats)], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Exact(42) + ); + } + + #[test] + fn test_distinct_count_multiple_row_groups_with_ndv() { + // Multiple row groups with distinct counts should return Inexact (sum) + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Row group 1: distinct_count = 10 + let stats1 = ParquetStatistics::int32( + Some(1), + Some(50), + Some(10), // distinct_count + Some(0), + false, + ); + + // Row group 2: distinct_count = 20 + let stats2 = ParquetStatistics::int32( + Some(51), + Some(100), + Some(20), // distinct_count + Some(0), + false, + ); + + let row_group1 = + create_row_group_with_stats(&schema_descr, vec![Some(stats1)], 500); + let row_group2 = + create_row_group_with_stats(&schema_descr, vec![Some(stats2)], 500); + let metadata = + create_parquet_metadata(schema_descr, vec![row_group1, row_group2]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + // Max of distinct counts (lower bound since we can't accurately merge NDV) + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); + } + + #[test] + fn test_distinct_count_no_ndv_available() { + // No distinct count in statistics should return Absent + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create statistics without distinct_count (None) + let stats = ParquetStatistics::int32( + Some(1), + Some(100), + None, // no distinct_count + Some(0), + false, + ); + + let row_group = + create_row_group_with_stats(&schema_descr, vec![Some(stats)], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent + ); + } + + #[test] + fn test_distinct_count_partial_ndv_in_row_groups() { + // Some row groups have NDV, some don't - should use only those that have it + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Row group 1: has distinct_count = 15 + let stats1 = + ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0), false); + + // Row group 2: no distinct_count + let stats2 = ParquetStatistics::int32( + Some(51), + Some(100), + None, // no distinct_count + Some(0), + false, + ); + + let row_group1 = + create_row_group_with_stats(&schema_descr, vec![Some(stats1)], 500); + let row_group2 = + create_row_group_with_stats(&schema_descr, vec![Some(stats2)], 500); + let metadata = + create_parquet_metadata(schema_descr, vec![row_group1, row_group2]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + // Only one row group has NDV, so it's Exact(15) + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Exact(15) + ); + } + + #[test] + fn test_distinct_count_multiple_columns() { + // Test with multiple columns, each with different NDV + let schema_descr = create_schema_descr(3); + let arrow_schema = create_arrow_schema(3); + + // col_0: distinct_count = 5 + let stats0 = + ParquetStatistics::int32(Some(1), Some(10), Some(5), Some(0), false); + // col_1: no distinct_count + let stats1 = + ParquetStatistics::int32(Some(1), Some(100), None, Some(0), false); + // col_2: distinct_count = 100 + let stats2 = + ParquetStatistics::int32(Some(1), Some(1000), Some(100), Some(0), false); + + let row_group = create_row_group_with_stats( + &schema_descr, + vec![Some(stats0), Some(stats1), Some(stats2)], + 1000, + ); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Exact(5) + ); + assert_eq!( + result.column_statistics[1].distinct_count, + Precision::Absent + ); + assert_eq!( + result.column_statistics[2].distinct_count, + Precision::Exact(100) + ); + } + + #[test] + fn test_distinct_count_no_statistics_at_all() { + // No statistics in row group should return Absent for all stats + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create row group without any statistics + let row_group = create_row_group_with_stats(&schema_descr, vec![None], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent + ); + } + + /// Integration test that reads a real Parquet file with distinct_count statistics. + /// The test file was created with DuckDB and has known NDV values: + /// - id: NULL (high cardinality, not tracked) + /// - category: 10 distinct values + /// - name: 5 distinct values + #[test] + fn test_distinct_count_from_real_parquet_file() { + use parquet::arrow::parquet_to_arrow_schema; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use std::fs::File; + use std::path::PathBuf; + + // Path to test file created by DuckDB with distinct_count statistics + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("src/test_data/ndv_test.parquet"); + + let file = File::open(&path).expect("Failed to open test parquet file"); + let reader = SerializedFileReader::new(file).expect("Failed to create reader"); + let parquet_metadata = reader.metadata(); + + // Derive Arrow schema from parquet file metadata + let arrow_schema = Arc::new( + parquet_to_arrow_schema( + parquet_metadata.file_metadata().schema_descr(), + None, + ) + .expect("Failed to convert schema"), + ); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + parquet_metadata, + &arrow_schema, + ) + .expect("Failed to extract statistics"); + + // id: no distinct_count (high cardinality) + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent, + "id column should have Absent distinct_count" + ); + + // category: 10 distinct values + assert_eq!( + result.column_statistics[1].distinct_count, + Precision::Exact(10), + "category column should have Exact(10) distinct_count" + ); + + // name: 5 distinct values + assert_eq!( + result.column_statistics[2].distinct_count, + Precision::Exact(5), + "name column should have Exact(5) distinct_count" + ); + } + } } diff --git a/datafusion/datasource-parquet/src/test_data/ndv_test.parquet b/datafusion/datasource-parquet/src/test_data/ndv_test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3ecbe320f506efd450c6c2ebd31fd626571db80f GIT binary patch literal 1141 zcmZwHOHUI~6bJBgr_J)2zRYXL5Ay_(98U+#%5_W`z zDGL*HVOSbBZjCFK#uYKfL_dHbegId-rSU&ZYY1_YU+(nGz30A8I+Pnua^hbCd{gS? z8w~Ffmx$g-hKGnY7^5Hy`$-6fSUY}*%o{-5>o$_x_?S0@piF*7e!ov7B`8x91Rw~z zpcz6ChTYHtt-wKrJ)nUOZLk*%*a!Qe9U^c5I-nDx5CanqLL9o_5FCbXI08Lz6cUhx z6!bzG`rsHGhYa)wW$}cwlatO)vWT35({KhX$iV=dg*=>t0-T2na1k!SWhlZG7=)`Z z1lM2~uEPl2fKj*!x8OG1fx9pU_h1|*;69Xsa&{8u?L1|-AujS)jN3)t$`KhN+!`_T z6~F@`ctHXm#>@3_{>FIXF9e?1Dgn_gfmN2Smw=cHrA8Ll-+9sb52F-wrn7mz$Q5U{ zR1^h6Go|UuM1m`ngcMiez5k+VRMj`e3){0-$Lh&FxmqFL`8xcyHkD6zw1uCIHj^mR z4@_bi22lfPE1LXN&PzgNS+N9jaE@6f`|xqyDo`}DWbN$k6_;6rmg5wVoXT7wS{2E9 zF4P1@5jkOTK`-{9;_QX;BYSdUzC2Z#E}`_f7!x$1YR97Pt6VNUsXUyWTXF&cd=s6W z#)#AnrW<ni=Pc$6H3Dz@-I>y!Kbo2f4 zsNb2nCYx(MLz5di&7ZQNNn7`quD1!5 zRApz(N%ykJNvCeMV4tR}zEvn5&*KF11Chnb+ Date: Thu, 22 Jan 2026 20:20:18 +0100 Subject: [PATCH 02/12] Improve NDV propagation through statistics merge and projection - Statistics merge: use max as conservative lower bound instead of discarding NDV (duplicates may exist across partitions) - Projection: preserve NDV for single-column expressions as upper bound --- datafusion/common/src/stats.rs | 23 +++++++++++++++--- datafusion/physical-expr/src/projection.rs | 27 ++++++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 759ebfe67a812..3539c380f8c76 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -660,7 +660,24 @@ impl Statistics { col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - col_stats.distinct_count = Precision::Absent; + // Use max as a conservative lower bound for distinct count + // (can't accurately merge NDV since duplicates may exist across partitions) + col_stats.distinct_count = + match (&col_stats.distinct_count, &item_col_stats.distinct_count) { + (Precision::Exact(a), Precision::Exact(b)) + | (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => { + Precision::Inexact(if a >= b { *a } else { *b }) + } + (Precision::Exact(v), Precision::Absent) + | (Precision::Inexact(v), Precision::Absent) + | (Precision::Absent, Precision::Exact(v)) + | (Precision::Absent, Precision::Inexact(v)) => { + Precision::Inexact(*v) + } + (Precision::Absent, Precision::Absent) => Precision::Absent, + }; col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); } @@ -1380,8 +1397,8 @@ mod tests { col_stats.max_value, Precision::Exact(ScalarValue::Int32(Some(20))) ); - // Distinct count should be Absent after merge - assert_eq!(col_stats.distinct_count, Precision::Absent); + // Distinct count should be Inexact(max) after merge as a conservative lower bound + assert_eq!(col_stats.distinct_count, Precision::Inexact(7)); } #[test] diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index dbbd289415277..ffeaac4bc27a3 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -711,9 +711,25 @@ impl ProjectionExprs { } } } else { - // TODO stats: estimate more statistics from expressions - // (expressions should compute their statistics themselves) - ColumnStatistics::new_unknown() + // TODO: expressions should compute their own statistics + // + // For now, try to preserve NDV if the expression references a + // single column (as a conservative upper bound). + // More accurate NDV propagation would require tracking injectivity + // of functions (e.g., `a + 1` preserves NDV exactly, `ABS(a)` may + // reduce it, `a % 10` bounds it to 10) + let columns = collect_columns(expr); + if columns.len() == 1 { + let col_idx = columns.iter().next().unwrap().index(); + ColumnStatistics { + distinct_count: stats.column_statistics[col_idx] + .distinct_count + .to_inexact(), + ..ColumnStatistics::new_unknown() + } + } else { + ColumnStatistics::new_unknown() + } }; column_statistics.push(col_stats); } @@ -2718,10 +2734,11 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression) should have unknown statistics + // First column (expression `col0 + 1`) preserves NDV from the single + // referenced column as a conservative upper bound (marked Inexact) assert_eq!( output_stats.column_statistics[0].distinct_count, - Precision::Absent + Precision::Inexact(5) ); assert_eq!( output_stats.column_statistics[0].max_value, From 519d11cd1d14b6506463c37e4b7cadff35a74c1f Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 23 Jan 2026 19:01:26 +0100 Subject: [PATCH 03/12] fix: cargo fmt --- datafusion/datasource-parquet/src/metadata.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index fca48e7798b60..dd6a3fb426ce0 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -1152,7 +1152,8 @@ mod tests { path.push("src/test_data/ndv_test.parquet"); let file = File::open(&path).expect("Failed to open test parquet file"); - let reader = SerializedFileReader::new(file).expect("Failed to create reader"); + let reader = + SerializedFileReader::new(file).expect("Failed to create reader"); let parquet_metadata = reader.metadata(); // Derive Arrow schema from parquet file metadata From 67aa530c42352c032249970cb5dd008b255d2c0f Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 23 Jan 2026 20:31:53 +0100 Subject: [PATCH 04/12] fix: update partition_statistics tests for NDV preservation Partition columns now preserve distinct_count as Inexact(1) when merging statistics, reflecting that each partition file has a single distinct partition value. --- .../tests/physical_optimizer/partition_statistics.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 12ce141b4759a..11e33c8fb6a63 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -154,13 +154,15 @@ mod test { // - null_count = 0 (partition values from paths are never null) // - min/max are the merged partition values across files in the group // - byte_size = num_rows * 4 (Date32 is 4 bytes per row) + // - distinct_count = Inexact(1) per partition file (single partition value per file), + // preserved via max() when merging stats across partitions let date32_byte_size = num_rows * 4; column_stats.push(ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))), min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(date32_byte_size), }); } @@ -581,7 +583,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4] @@ -615,7 +617,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2] @@ -1251,7 +1253,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column @@ -1279,7 +1281,7 @@ mod test { DATE_2025_03_03, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column From 8e4c002c6757db10c7c45d0bf596517f5ee6c56a Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 23 Jan 2026 21:23:27 +0100 Subject: [PATCH 05/12] refactor: simplify distinct_count merge logic Use get_value().max() chain instead of verbose match statement for merging NDV in Statistics::try_merge() --- datafusion/common/src/stats.rs | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 3539c380f8c76..47cca0491f4a4 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -662,22 +662,12 @@ impl Statistics { col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); // Use max as a conservative lower bound for distinct count // (can't accurately merge NDV since duplicates may exist across partitions) - col_stats.distinct_count = - match (&col_stats.distinct_count, &item_col_stats.distinct_count) { - (Precision::Exact(a), Precision::Exact(b)) - | (Precision::Inexact(a), Precision::Exact(b)) - | (Precision::Exact(a), Precision::Inexact(b)) - | (Precision::Inexact(a), Precision::Inexact(b)) => { - Precision::Inexact(if a >= b { *a } else { *b }) - } - (Precision::Exact(v), Precision::Absent) - | (Precision::Inexact(v), Precision::Absent) - | (Precision::Absent, Precision::Exact(v)) - | (Precision::Absent, Precision::Inexact(v)) => { - Precision::Inexact(*v) - } - (Precision::Absent, Precision::Absent) => Precision::Absent, - }; + col_stats.distinct_count = col_stats + .distinct_count + .get_value() + .max(item_col_stats.distinct_count.get_value()) + .map(|&v| Precision::Inexact(v)) + .unwrap_or(Precision::Absent); col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); } From 7ce32d285493e325fa6c58a5f91df71261709428 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 23 Jan 2026 21:24:06 +0100 Subject: [PATCH 06/12] refactor: add build_column_statistics method to StatisticsAccumulators Encapsulate get_col_stats parameters by adding build_column_statistics() method to StatisticsAccumulators, removing the standalone function. --- datafusion/datasource-parquet/src/metadata.rs | 111 ++++++++---------- 1 file changed, 52 insertions(+), 59 deletions(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index dd6a3fb426ce0..7c884bfdbbced 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -333,16 +333,16 @@ impl<'a> DFParquetMetadata<'a> { }, ); - get_col_stats( - logical_file_schema, - &null_counts_array, - &mut max_accs, - &mut min_accs, - &mut is_max_value_exact, - &mut is_min_value_exact, - &column_byte_sizes, - &distinct_counts_array, - ) + let mut accumulators = StatisticsAccumulators { + min_accs: &mut min_accs, + max_accs: &mut max_accs, + null_counts_array: &mut null_counts_array, + is_min_value_exact: &mut is_min_value_exact, + is_max_value_exact: &mut is_max_value_exact, + column_byte_sizes: &mut column_byte_sizes, + distinct_counts_array: &mut distinct_counts_array, + }; + accumulators.build_column_statistics(logical_file_schema) } else { // Record column sizes logical_file_schema @@ -415,55 +415,6 @@ fn create_max_min_accs( (max_values, min_values) } -#[expect(clippy::too_many_arguments)] -fn get_col_stats( - schema: &Schema, - null_counts: &[Precision], - max_values: &mut [Option], - min_values: &mut [Option], - is_max_value_exact: &mut [Option], - is_min_value_exact: &mut [Option], - column_byte_sizes: &[Precision], - distinct_counts: &[Precision], -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match ( - max_values.get_mut(i).unwrap(), - is_max_value_exact.get(i).unwrap(), - ) { - (Some(max_value), Some(true)) => { - max_value.evaluate().ok().map(Precision::Exact) - } - (Some(max_value), Some(false)) | (Some(max_value), None) => { - max_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - let min_value = match ( - min_values.get_mut(i).unwrap(), - is_min_value_exact.get(i).unwrap(), - ) { - (Some(min_value), Some(true)) => { - min_value.evaluate().ok().map(Precision::Exact) - } - (Some(min_value), Some(false)) | (Some(min_value), None) => { - min_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - ColumnStatistics { - null_count: null_counts[i], - max_value: max_value.unwrap_or(Precision::Absent), - min_value: min_value.unwrap_or(Precision::Absent), - sum_value: Precision::Absent, - distinct_count: distinct_counts[i], - byte_size: column_byte_sizes[i], - } - }) - .collect() -} - /// Holds the accumulator state for collecting statistics from row groups struct StatisticsAccumulators<'a> { min_accs: &'a mut [Option], @@ -475,6 +426,48 @@ struct StatisticsAccumulators<'a> { distinct_counts_array: &'a mut [Precision], } +impl StatisticsAccumulators<'_> { + /// Converts the accumulated statistics into a vector of `ColumnStatistics` + fn build_column_statistics(&mut self, schema: &Schema) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match ( + self.max_accs.get_mut(i).unwrap(), + self.is_max_value_exact.get(i).unwrap(), + ) { + (Some(max_value), Some(true)) => { + max_value.evaluate().ok().map(Precision::Exact) + } + (Some(max_value), Some(false)) | (Some(max_value), None) => { + max_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + let min_value = match ( + self.min_accs.get_mut(i).unwrap(), + self.is_min_value_exact.get(i).unwrap(), + ) { + (Some(min_value), Some(true)) => { + min_value.evaluate().ok().map(Precision::Exact) + } + (Some(min_value), Some(false)) | (Some(min_value), None) => { + min_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + ColumnStatistics { + null_count: self.null_counts_array[i], + max_value: max_value.unwrap_or(Precision::Absent), + min_value: min_value.unwrap_or(Precision::Absent), + sum_value: Precision::Absent, + distinct_count: self.distinct_counts_array[i], + byte_size: self.column_byte_sizes[i], + } + }) + .collect() + } +} + fn summarize_column_statistics( parquet_schema: &SchemaDescriptor, logical_file_schema: &Schema, From 1eae06db607797d475f3c98fb2f75e7f0b491192 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 23 Jan 2026 21:24:31 +0100 Subject: [PATCH 07/12] refactor: move ndv_tests imports to module level Move imports to module level in ndv_tests since they're in their own module anyway. --- datafusion/datasource-parquet/src/metadata.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 7c884bfdbbced..82546f2a8a949 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -839,10 +839,14 @@ mod tests { mod ndv_tests { use super::*; use arrow::datatypes::Field; + use parquet::arrow::parquet_to_arrow_schema; use parquet::basic::Type as PhysicalType; use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + use std::fs::File; + use std::path::PathBuf; fn create_schema_descr(num_columns: usize) -> Arc { let fields: Vec> = (0..num_columns) @@ -1135,11 +1139,6 @@ mod tests { /// - name: 5 distinct values #[test] fn test_distinct_count_from_real_parquet_file() { - use parquet::arrow::parquet_to_arrow_schema; - use parquet::file::reader::{FileReader, SerializedFileReader}; - use std::fs::File; - use std::path::PathBuf; - // Path to test file created by DuckDB with distinct_count statistics let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("src/test_data/ndv_test.parquet"); From 7e6e23069750d41370e99be6663e8d2cee160e92 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Tue, 10 Mar 2026 11:55:41 +0100 Subject: [PATCH 08/12] fix: update hash join partition_statistics test for NDV preservation through merges --- .../physical_optimizer/partition_statistics.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 11e33c8fb6a63..42c1e84534b6d 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -1418,6 +1418,8 @@ mod test { byte_size: Precision::Exact(16), }, // Left date column: all partitions (2025-03-01..2025-03-04) + // NDV is Inexact(1) because each Hive partition has exactly 1 distinct date value, + // and merging takes max as a conservative lower bound ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1427,7 +1429,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, // Right id column: partition 0 only (id 3..4) @@ -1440,6 +1442,7 @@ mod test { byte_size: Precision::Exact(8), }, // Right date column: partition 0 only (2025-03-01..2025-03-02) + // NDV is Inexact(1) from the single Hive partition's date value ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1449,7 +1452,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ], @@ -1501,7 +1504,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, // Right id column: partition 0 only (id 3..4) @@ -1523,7 +1526,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ], @@ -1575,7 +1578,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, // Right id column: all partitions (id 1..4) @@ -1597,7 +1600,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, ], From 72d3be3259b163f43d5c3dd71f3da65915e419a2 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 12 Mar 2026 11:55:17 +0100 Subject: [PATCH 09/12] Extract overlap-based NDV merge into shared utility Replace the max(left, right) heuristic in try_merge with overlap-based NDV estimation, which accounts for range overlap between partitions instead of blindly picking the larger value. Move estimate_ndv_with_overlap from union.rs (#20846) to stats.rs as a pub function and reuse it in try_merge for row group/partition merging. The merge loop is reordered so distinct_count is computed before min/max updates (needs pre-merge ranges for the overlap formula). Fallback to max(left, right) when min/max are absent or distance is unsupported. --- datafusion/common/src/stats.rs | 310 +++++++++++++++++++++++++- datafusion/physical-plan/src/union.rs | 96 +------- 2 files changed, 301 insertions(+), 105 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 47cca0491f4a4..12b9b10a52207 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -657,17 +657,22 @@ impl Statistics { .zip(column_statistics.iter_mut()) { col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); + + // NDV must be computed before min/max update (needs pre-merge ranges) + col_stats.distinct_count = match ( + col_stats.distinct_count.get_value(), + item_col_stats.distinct_count.get_value(), + ) { + (Some(&l), Some(&r)) => Precision::Inexact( + estimate_ndv_with_overlap(col_stats, item_col_stats, l, r) + .unwrap_or_else(|| usize::max(l, r)), + ), + _ => Precision::Absent, + }; + col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - // Use max as a conservative lower bound for distinct count - // (can't accurately merge NDV since duplicates may exist across partitions) - col_stats.distinct_count = col_stats - .distinct_count - .get_value() - .max(item_col_stats.distinct_count.get_value()) - .map(|&v| Precision::Inexact(v)) - .unwrap_or(Precision::Absent); col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); } @@ -679,6 +684,96 @@ impl Statistics { } } +/// Estimates the combined number of distinct values (NDV) when merging two +/// column statistics, using range overlap to avoid double-counting shared values. +/// +/// Assumes values are distributed uniformly within each input's +/// `[min, max]` range (the standard assumption when only summary +/// statistics are available). Under uniformity the fraction of an input's +/// distinct values that land in a sub-range equals the fraction of +/// the range that sub-range covers. +/// +/// The combined value space is split into three disjoint regions: +/// +/// ```text +/// |-- only A --|-- overlap --|-- only B --| +/// ``` +/// +/// * **Only in A/B** - values outside the other input's range +/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`. +/// * **Overlap** - both inputs may produce values here. We take +/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the +/// sum because values in the same sub-range are likely shared +/// (the smaller set is assumed to be a subset of the larger). +/// +/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`, +/// from full overlap to no overlap. +/// +/// ```text +/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection] +/// + (1 - overlap_a) * NDV_a [only in A] +/// + (1 - overlap_b) * NDV_b [only in B] +/// ``` +/// +/// Returns `None` when min/max are absent or distance is unsupported +/// (e.g. strings), in which case the caller should fall back to a simpler +/// estimate. +pub fn estimate_ndv_with_overlap( + left: &ColumnStatistics, + right: &ColumnStatistics, + ndv_left: usize, + ndv_right: usize, +) -> Option { + let left_min = left.min_value.get_value()?; + let left_max = left.max_value.get_value()?; + let right_min = right.min_value.get_value()?; + let right_max = right.max_value.get_value()?; + + let range_left = left_max.distance(left_min)?; + let range_right = right_max.distance(right_min)?; + + // Constant columns (range == 0) can't use the proportional overlap + // formula below, so check interval overlap directly instead. + if range_left == 0 || range_right == 0 { + let overlaps = left_min <= right_max && right_min <= left_max; + return Some(if overlaps { + usize::max(ndv_left, ndv_right) + } else { + ndv_left + ndv_right + }); + } + + let overlap_min = if left_min >= right_min { + left_min + } else { + right_min + }; + let overlap_max = if left_max <= right_max { + left_max + } else { + right_max + }; + + // Disjoint ranges: no overlap, NDVs are additive + if overlap_min > overlap_max { + return Some(ndv_left + ndv_right); + } + + let overlap_range = overlap_max.distance(overlap_min)? as f64; + + let overlap_left = overlap_range / range_left as f64; + let overlap_right = overlap_range / range_right as f64; + + let intersection = f64::max( + overlap_left * ndv_left as f64, + overlap_right * ndv_right as f64, + ); + let only_left = (1.0 - overlap_left) * ndv_left as f64; + let only_right = (1.0 - overlap_right) * ndv_right as f64; + + Some((intersection + only_left + only_right).round() as usize) +} + /// Creates an estimate of the number of rows in the output using the given /// optional value and exactness flag. fn check_num_rows(value: Option, is_exact: bool) -> Precision { @@ -1387,8 +1482,203 @@ mod tests { col_stats.max_value, Precision::Exact(ScalarValue::Int32(Some(20))) ); - // Distinct count should be Inexact(max) after merge as a conservative lower bound - assert_eq!(col_stats.distinct_count, Precision::Inexact(7)); + // Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10] + // range_left=9, range_right=15, overlap=5 + // overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33 + // result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10 + assert_eq!(col_stats.distinct_count, Precision::Inexact(10)); + } + + #[test] + fn test_try_merge_ndv_disjoint_ranges() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(20)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(30)))) + .with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // No overlap -> sum of NDVs + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(13) + ); + } + + #[test] + fn test_try_merge_ndv_identical_ranges() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(50)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(30)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // Full overlap -> max(50, 30) = 50 + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(50) + ); + } + + #[test] + fn test_try_merge_ndv_partial_overlap() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(80)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) + .with_distinct_count(Precision::Exact(60)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // overlap=[50,100], range_left=100, range_right=100, overlap_range=50 + // overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30 + // result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110 + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(110) + ); + } + + #[test] + fn test_try_merge_ndv_missing_min_max() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // No min/max -> fallback to max(5, 8) + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(8) + ); + } + + #[test] + fn test_try_merge_ndv_non_numeric_types() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Utf8(Some( + "aaa".to_string(), + )))) + .with_max_value(Precision::Exact(ScalarValue::Utf8(Some( + "zzz".to_string(), + )))) + .with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Utf8(Some( + "bbb".to_string(), + )))) + .with_max_value(Precision::Exact(ScalarValue::Utf8(Some( + "yyy".to_string(), + )))) + .with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // distance() unsupported for strings -> fallback to max + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(8) + ); + } + + #[test] + fn test_try_merge_ndv_constant_columns() { + // Same constant: [5,5]+[5,5] -> max + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(1) + ); + + // Different constants: [5,5]+[10,10] -> sum + let stats3 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + let stats4 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_distinct_count(Precision::Exact(1)), + ); + + let merged = stats3.try_merge(&stats4).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(2) + ); } #[test] diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dafcd6ee4014d..db550e7f147d1 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -49,7 +49,7 @@ use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::stats::Precision; +use datafusion_common::stats::{Precision, estimate_ndv_with_overlap}; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, @@ -886,100 +886,6 @@ fn union_distinct_count( Precision::Inexact(ndv_left + ndv_right) } -/// Estimates the distinct count for a union using range overlap, -/// following the approach used by Trino: -/// -/// Assumes values are distributed uniformly within each input's -/// `[min, max]` range (the standard assumption when only summary -/// statistics are available, classic for scalar-based statistics -/// propagation). Under uniformity the fraction of an input's -/// distinct values that land in a sub-range equals the fraction of -/// the range that sub-range covers. -/// -/// The combined value space is split into three disjoint regions: -/// -/// ```text -/// |-- only A --|-- overlap --|-- only B --| -/// ``` -/// -/// * **Only in A/B** – values outside the other input's range -/// contribute `(1 − overlap_a) · NDV_a` and `(1 − overlap_b) · NDV_b`. -/// * **Overlap** – both inputs may produce values here. We take -/// `max(overlap_a · NDV_a, overlap_b · NDV_b)` rather than the -/// sum because values in the same sub-range are likely shared -/// (the smaller set is assumed to be a subset of the larger). -/// This is conservative: it avoids inflating the NDV estimate, -/// which is safer for downstream join-order decisions. -/// -/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`, -/// from full overlap to no overlap. Boundary cases confirm this: -/// disjoint ranges → `NDV_a + NDV_b`, identical ranges → -/// `max(NDV_a, NDV_b)`. -/// -/// ```text -/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection] -/// + (1 - overlap_a) * NDV_a [only in A] -/// + (1 - overlap_b) * NDV_b [only in B] -/// ``` -fn estimate_ndv_with_overlap( - left: &ColumnStatistics, - right: &ColumnStatistics, - ndv_left: usize, - ndv_right: usize, -) -> Option { - let min_left = left.min_value.get_value()?; - let max_left = left.max_value.get_value()?; - let min_right = right.min_value.get_value()?; - let max_right = right.max_value.get_value()?; - - let range_left = max_left.distance(min_left)?; - let range_right = max_right.distance(min_right)?; - - // Constant columns (range == 0) can't use the proportional overlap - // formula below, so check interval overlap directly instead. - if range_left == 0 || range_right == 0 { - let overlaps = min_left <= max_right && min_right <= max_left; - return Some(if overlaps { - usize::max(ndv_left, ndv_right) - } else { - ndv_left + ndv_right - }); - } - - let overlap_min = if min_left >= min_right { - min_left - } else { - min_right - }; - let overlap_max = if max_left <= max_right { - max_left - } else { - max_right - }; - - // Short-circuit: when there's no overlap the formula naturally - // degrades to ndv_left + ndv_right (overlap_range = 0 gives - // overlap_left = overlap_right = 0), but returning early avoids - // the floating-point math and a fallible `distance()` call. - if overlap_min > overlap_max { - return Some(ndv_left + ndv_right); - } - - let overlap_range = overlap_max.distance(overlap_min)? as f64; - - let overlap_left = overlap_range / range_left as f64; - let overlap_right = overlap_range / range_right as f64; - - let intersection = f64::max( - overlap_left * ndv_left as f64, - overlap_right * ndv_right as f64, - ); - let only_left = (1.0 - overlap_left) * ndv_left as f64; - let only_right = (1.0 - overlap_right) * ndv_right as f64; - - Some((intersection + only_left + only_right).round() as usize) -} - fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { let Statistics { num_rows: right_num_rows, From faf521161e22d3b0af265711f3c624216eb13dcd Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 12 Mar 2026 12:13:44 +0100 Subject: [PATCH 10/12] Add threshold for partial NDV extraction from Parquet row groups When only a fraction of row groups report NDV statistics, the merged estimate can be misleading. Require at least 75% of row groups to have NDV before reporting an Inexact value, otherwise return Absent. --- datafusion/datasource-parquet/src/metadata.rs | 55 +++++++++++++++---- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 82546f2a8a949..0b55329e1ac77 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -52,6 +52,11 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +/// Minimum fraction of row groups that must report NDV statistics for the +/// merged result to be `Inexact` rather than `Absent`, as the estimate +/// would be too unreliable otherwise. +const PARTIAL_NDV_THRESHOLD: f64 = 0.75; + /// Handles fetching Parquet file schema, metadata and statistics /// from object store. /// @@ -544,6 +549,7 @@ fn summarize_column_statistics( // Extract distinct counts from row group column statistics accumulators.distinct_counts_array[logical_schema_index] = if let Some(parquet_idx) = parquet_index { + let num_row_groups = row_groups_metadata.len(); let distinct_counts: Vec = row_groups_metadata .iter() .filter_map(|rg| { @@ -554,9 +560,12 @@ fn summarize_column_statistics( }) .collect(); - if distinct_counts.is_empty() { + let coverage = + distinct_counts.len() as f64 / num_row_groups.max(1) as f64; + + if coverage < PARTIAL_NDV_THRESHOLD { Precision::Absent - } else if distinct_counts.len() == 1 { + } else if distinct_counts.len() == 1 && num_row_groups == 1 { // Single row group with distinct count - use exact value Precision::Exact(distinct_counts[0] as usize) } else { @@ -1029,20 +1038,17 @@ mod tests { } #[test] - fn test_distinct_count_partial_ndv_in_row_groups() { - // Some row groups have NDV, some don't - should use only those that have it + fn test_distinct_count_partial_ndv_below_threshold() { + // 1 of 2 row groups has NDV (50% < 75% threshold) -> Absent let schema_descr = create_schema_descr(1); let arrow_schema = create_arrow_schema(1); - // Row group 1: has distinct_count = 15 let stats1 = ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0), false); - - // Row group 2: no distinct_count let stats2 = ParquetStatistics::int32( Some(51), Some(100), - None, // no distinct_count + None, Some(0), false, ); @@ -1060,10 +1066,39 @@ mod tests { ) .unwrap(); - // Only one row group has NDV, so it's Exact(15) assert_eq!( result.column_statistics[0].distinct_count, - Precision::Exact(15) + Precision::Absent + ); + } + + #[test] + fn test_distinct_count_partial_ndv_above_threshold() { + // 3 of 4 row groups have NDV (75% >= 75% threshold) -> Inexact + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + let stats_with = + |ndv| ParquetStatistics::int32(Some(1), Some(100), Some(ndv), Some(0), false); + let stats_without = + ParquetStatistics::int32(Some(1), Some(100), None, Some(0), false); + + let rg1 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(10))], 250); + let rg2 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(20))], 250); + let rg3 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(15))], 250); + let rg4 = create_row_group_with_stats(&schema_descr, vec![Some(stats_without)], 250); + let metadata = + create_parquet_metadata(schema_descr, vec![rg1, rg2, rg3, rg4]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(20) ); } From 089b44a53152d4fd0f4618515512bd88f24e1eee Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 12 Mar 2026 12:47:01 +0100 Subject: [PATCH 11/12] fix: cargo fmt --- datafusion/common/src/stats.rs | 6 +-- datafusion/datasource-parquet/src/metadata.rs | 41 ++++++++++++------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 12b9b10a52207..75eafc132d962 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -1577,14 +1577,12 @@ mod tests { let stats1 = Statistics::default() .with_num_rows(Precision::Exact(10)) .add_column_statistics( - ColumnStatistics::new_unknown() - .with_distinct_count(Precision::Exact(5)), + ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)), ); let stats2 = Statistics::default() .with_num_rows(Precision::Exact(10)) .add_column_statistics( - ColumnStatistics::new_unknown() - .with_distinct_count(Precision::Exact(8)), + ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)), ); let merged = stats1.try_merge(&stats2).unwrap(); diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 0b55329e1ac77..e5781ad68ddf4 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -560,8 +560,7 @@ fn summarize_column_statistics( }) .collect(); - let coverage = - distinct_counts.len() as f64 / num_row_groups.max(1) as f64; + let coverage = distinct_counts.len() as f64 / num_row_groups.max(1) as f64; if coverage < PARTIAL_NDV_THRESHOLD { Precision::Absent @@ -1045,13 +1044,8 @@ mod tests { let stats1 = ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0), false); - let stats2 = ParquetStatistics::int32( - Some(51), - Some(100), - None, - Some(0), - false, - ); + let stats2 = + ParquetStatistics::int32(Some(51), Some(100), None, Some(0), false); let row_group1 = create_row_group_with_stats(&schema_descr, vec![Some(stats1)], 500); @@ -1078,15 +1072,32 @@ mod tests { let schema_descr = create_schema_descr(1); let arrow_schema = create_arrow_schema(1); - let stats_with = - |ndv| ParquetStatistics::int32(Some(1), Some(100), Some(ndv), Some(0), false); + let stats_with = |ndv| { + ParquetStatistics::int32(Some(1), Some(100), Some(ndv), Some(0), false) + }; let stats_without = ParquetStatistics::int32(Some(1), Some(100), None, Some(0), false); - let rg1 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(10))], 250); - let rg2 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(20))], 250); - let rg3 = create_row_group_with_stats(&schema_descr, vec![Some(stats_with(15))], 250); - let rg4 = create_row_group_with_stats(&schema_descr, vec![Some(stats_without)], 250); + let rg1 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(10))], + 250, + ); + let rg2 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(20))], + 250, + ); + let rg3 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(15))], + 250, + ); + let rg4 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_without)], + 250, + ); let metadata = create_parquet_metadata(schema_descr, vec![rg1, rg2, rg3, rg4]); From 0e23ef17daeaa7e81f98fe4dea4d0211420a40dd Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 12 Mar 2026 12:47:32 +0100 Subject: [PATCH 12/12] Revert NDV propagation through projections Remove the single-column expression heuristic for preserving NDV in projections, as it is too broad to be correct. Expression-level statistics propagation will be addressed in a follow-up design. --- datafusion/physical-expr/src/projection.rs | 27 ++++------------------ 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index ffeaac4bc27a3..dbbd289415277 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -711,25 +711,9 @@ impl ProjectionExprs { } } } else { - // TODO: expressions should compute their own statistics - // - // For now, try to preserve NDV if the expression references a - // single column (as a conservative upper bound). - // More accurate NDV propagation would require tracking injectivity - // of functions (e.g., `a + 1` preserves NDV exactly, `ABS(a)` may - // reduce it, `a % 10` bounds it to 10) - let columns = collect_columns(expr); - if columns.len() == 1 { - let col_idx = columns.iter().next().unwrap().index(); - ColumnStatistics { - distinct_count: stats.column_statistics[col_idx] - .distinct_count - .to_inexact(), - ..ColumnStatistics::new_unknown() - } - } else { - ColumnStatistics::new_unknown() - } + // TODO stats: estimate more statistics from expressions + // (expressions should compute their statistics themselves) + ColumnStatistics::new_unknown() }; column_statistics.push(col_stats); } @@ -2734,11 +2718,10 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression `col0 + 1`) preserves NDV from the single - // referenced column as a conservative upper bound (marked Inexact) + // First column (expression) should have unknown statistics assert_eq!( output_stats.column_statistics[0].distinct_count, - Precision::Inexact(5) + Precision::Absent ); assert_eq!( output_stats.column_statistics[0].max_value,