diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 759ebfe67a812..75eafc132d962 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -657,10 +657,22 @@ impl Statistics { .zip(column_statistics.iter_mut()) { col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); + + // NDV must be computed before min/max update (needs pre-merge ranges) + col_stats.distinct_count = match ( + col_stats.distinct_count.get_value(), + item_col_stats.distinct_count.get_value(), + ) { + (Some(&l), Some(&r)) => Precision::Inexact( + estimate_ndv_with_overlap(col_stats, item_col_stats, l, r) + .unwrap_or_else(|| usize::max(l, r)), + ), + _ => Precision::Absent, + }; + col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - col_stats.distinct_count = Precision::Absent; col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); } @@ -672,6 +684,96 @@ impl Statistics { } } +/// Estimates the combined number of distinct values (NDV) when merging two +/// column statistics, using range overlap to avoid double-counting shared values. +/// +/// Assumes values are distributed uniformly within each input's +/// `[min, max]` range (the standard assumption when only summary +/// statistics are available). Under uniformity the fraction of an input's +/// distinct values that land in a sub-range equals the fraction of +/// the range that sub-range covers. +/// +/// The combined value space is split into three disjoint regions: +/// +/// ```text +/// |-- only A --|-- overlap --|-- only B --| +/// ``` +/// +/// * **Only in A/B** - values outside the other input's range +/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`. +/// * **Overlap** - both inputs may produce values here. We take +/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the +/// sum because values in the same sub-range are likely shared +/// (the smaller set is assumed to be a subset of the larger). +/// +/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`, +/// from full overlap to no overlap. +/// +/// ```text +/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection] +/// + (1 - overlap_a) * NDV_a [only in A] +/// + (1 - overlap_b) * NDV_b [only in B] +/// ``` +/// +/// Returns `None` when min/max are absent or distance is unsupported +/// (e.g. strings), in which case the caller should fall back to a simpler +/// estimate. +pub fn estimate_ndv_with_overlap( + left: &ColumnStatistics, + right: &ColumnStatistics, + ndv_left: usize, + ndv_right: usize, +) -> Option { + let left_min = left.min_value.get_value()?; + let left_max = left.max_value.get_value()?; + let right_min = right.min_value.get_value()?; + let right_max = right.max_value.get_value()?; + + let range_left = left_max.distance(left_min)?; + let range_right = right_max.distance(right_min)?; + + // Constant columns (range == 0) can't use the proportional overlap + // formula below, so check interval overlap directly instead. + if range_left == 0 || range_right == 0 { + let overlaps = left_min <= right_max && right_min <= left_max; + return Some(if overlaps { + usize::max(ndv_left, ndv_right) + } else { + ndv_left + ndv_right + }); + } + + let overlap_min = if left_min >= right_min { + left_min + } else { + right_min + }; + let overlap_max = if left_max <= right_max { + left_max + } else { + right_max + }; + + // Disjoint ranges: no overlap, NDVs are additive + if overlap_min > overlap_max { + return Some(ndv_left + ndv_right); + } + + let overlap_range = overlap_max.distance(overlap_min)? as f64; + + let overlap_left = overlap_range / range_left as f64; + let overlap_right = overlap_range / range_right as f64; + + let intersection = f64::max( + overlap_left * ndv_left as f64, + overlap_right * ndv_right as f64, + ); + let only_left = (1.0 - overlap_left) * ndv_left as f64; + let only_right = (1.0 - overlap_right) * ndv_right as f64; + + Some((intersection + only_left + only_right).round() as usize) +} + /// Creates an estimate of the number of rows in the output using the given /// optional value and exactness flag. fn check_num_rows(value: Option, is_exact: bool) -> Precision { @@ -1380,8 +1482,201 @@ mod tests { col_stats.max_value, Precision::Exact(ScalarValue::Int32(Some(20))) ); - // Distinct count should be Absent after merge - assert_eq!(col_stats.distinct_count, Precision::Absent); + // Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10] + // range_left=9, range_right=15, overlap=5 + // overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33 + // result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10 + assert_eq!(col_stats.distinct_count, Precision::Inexact(10)); + } + + #[test] + fn test_try_merge_ndv_disjoint_ranges() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(20)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(30)))) + .with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // No overlap -> sum of NDVs + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(13) + ); + } + + #[test] + fn test_try_merge_ndv_identical_ranges() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(50)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(30)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // Full overlap -> max(50, 30) = 50 + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(50) + ); + } + + #[test] + fn test_try_merge_ndv_partial_overlap() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(80)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) + .with_distinct_count(Precision::Exact(60)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // overlap=[50,100], range_left=100, range_right=100, overlap_range=50 + // overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30 + // result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110 + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(110) + ); + } + + #[test] + fn test_try_merge_ndv_missing_min_max() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // No min/max -> fallback to max(5, 8) + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(8) + ); + } + + #[test] + fn test_try_merge_ndv_non_numeric_types() { + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Utf8(Some( + "aaa".to_string(), + )))) + .with_max_value(Precision::Exact(ScalarValue::Utf8(Some( + "zzz".to_string(), + )))) + .with_distinct_count(Precision::Exact(5)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Utf8(Some( + "bbb".to_string(), + )))) + .with_max_value(Precision::Exact(ScalarValue::Utf8(Some( + "yyy".to_string(), + )))) + .with_distinct_count(Precision::Exact(8)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + // distance() unsupported for strings -> fallback to max + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(8) + ); + } + + #[test] + fn test_try_merge_ndv_constant_columns() { + // Same constant: [5,5]+[5,5] -> max + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + + let merged = stats1.try_merge(&stats2).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(1) + ); + + // Different constants: [5,5]+[10,10] -> sum + let stats3 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) + .with_distinct_count(Precision::Exact(1)), + ); + let stats4 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) + .with_distinct_count(Precision::Exact(1)), + ); + + let merged = stats3.try_merge(&stats4).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(2) + ); } #[test] diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 12ce141b4759a..42c1e84534b6d 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -154,13 +154,15 @@ mod test { // - null_count = 0 (partition values from paths are never null) // - min/max are the merged partition values across files in the group // - byte_size = num_rows * 4 (Date32 is 4 bytes per row) + // - distinct_count = Inexact(1) per partition file (single partition value per file), + // preserved via max() when merging stats across partitions let date32_byte_size = num_rows * 4; column_stats.push(ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))), min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(date32_byte_size), }); } @@ -581,7 +583,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4] @@ -615,7 +617,7 @@ mod test { max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Absent, }, // column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2] @@ -1251,7 +1253,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column @@ -1279,7 +1281,7 @@ mod test { DATE_2025_03_03, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ColumnStatistics::new_unknown(), // window column @@ -1416,6 +1418,8 @@ mod test { byte_size: Precision::Exact(16), }, // Left date column: all partitions (2025-03-01..2025-03-04) + // NDV is Inexact(1) because each Hive partition has exactly 1 distinct date value, + // and merging takes max as a conservative lower bound ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1425,7 +1429,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, // Right id column: partition 0 only (id 3..4) @@ -1438,6 +1442,7 @@ mod test { byte_size: Precision::Exact(8), }, // Right date column: partition 0 only (2025-03-01..2025-03-02) + // NDV is Inexact(1) from the single Hive partition's date value ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Date32(Some( @@ -1447,7 +1452,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ], @@ -1499,7 +1504,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, // Right id column: partition 0 only (id 3..4) @@ -1521,7 +1526,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(8), }, ], @@ -1573,7 +1578,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, // Right id column: all partitions (id 1..4) @@ -1595,7 +1600,7 @@ mod test { DATE_2025_03_01, ))), sum_value: Precision::Absent, - distinct_count: Precision::Absent, + distinct_count: Precision::Inexact(1), byte_size: Precision::Exact(16), }, ], diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 5a4c0bcdd514d..e5781ad68ddf4 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -52,6 +52,11 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +/// Minimum fraction of row groups that must report NDV statistics for the +/// merged result to be `Inexact` rather than `Absent`, as the estimate +/// would be too unreliable otherwise. +const PARTIAL_NDV_THRESHOLD: f64 = 0.75; + /// Handles fetching Parquet file schema, metadata and statistics /// from object store. /// @@ -297,6 +302,8 @@ impl<'a> DFParquetMetadata<'a> { vec![Some(true); logical_file_schema.fields().len()]; let mut is_min_value_exact = vec![Some(true); logical_file_schema.fields().len()]; + let mut distinct_counts_array = + vec![Precision::Absent; logical_file_schema.fields().len()]; logical_file_schema.fields().iter().enumerate().for_each( |(idx, field)| match StatisticsConverter::try_new( field.name(), @@ -311,8 +318,9 @@ impl<'a> DFParquetMetadata<'a> { is_min_value_exact: &mut is_min_value_exact, is_max_value_exact: &mut is_max_value_exact, column_byte_sizes: &mut column_byte_sizes, + distinct_counts_array: &mut distinct_counts_array, }; - summarize_min_max_null_counts( + summarize_column_statistics( file_metadata.schema_descr(), logical_file_schema, &physical_file_schema, @@ -330,15 +338,16 @@ impl<'a> DFParquetMetadata<'a> { }, ); - get_col_stats( - logical_file_schema, - &null_counts_array, - &mut max_accs, - &mut min_accs, - &mut is_max_value_exact, - &mut is_min_value_exact, - &column_byte_sizes, - ) + let mut accumulators = StatisticsAccumulators { + min_accs: &mut min_accs, + max_accs: &mut max_accs, + null_counts_array: &mut null_counts_array, + is_min_value_exact: &mut is_min_value_exact, + is_max_value_exact: &mut is_max_value_exact, + column_byte_sizes: &mut column_byte_sizes, + distinct_counts_array: &mut distinct_counts_array, + }; + accumulators.build_column_statistics(logical_file_schema) } else { // Record column sizes logical_file_schema @@ -411,53 +420,6 @@ fn create_max_min_accs( (max_values, min_values) } -fn get_col_stats( - schema: &Schema, - null_counts: &[Precision], - max_values: &mut [Option], - min_values: &mut [Option], - is_max_value_exact: &mut [Option], - is_min_value_exact: &mut [Option], - column_byte_sizes: &[Precision], -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match ( - max_values.get_mut(i).unwrap(), - is_max_value_exact.get(i).unwrap(), - ) { - (Some(max_value), Some(true)) => { - max_value.evaluate().ok().map(Precision::Exact) - } - (Some(max_value), Some(false)) | (Some(max_value), None) => { - max_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - let min_value = match ( - min_values.get_mut(i).unwrap(), - is_min_value_exact.get(i).unwrap(), - ) { - (Some(min_value), Some(true)) => { - min_value.evaluate().ok().map(Precision::Exact) - } - (Some(min_value), Some(false)) | (Some(min_value), None) => { - min_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - ColumnStatistics { - null_count: null_counts[i], - max_value: max_value.unwrap_or(Precision::Absent), - min_value: min_value.unwrap_or(Precision::Absent), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: column_byte_sizes[i], - } - }) - .collect() -} - /// Holds the accumulator state for collecting statistics from row groups struct StatisticsAccumulators<'a> { min_accs: &'a mut [Option], @@ -466,9 +428,52 @@ struct StatisticsAccumulators<'a> { is_min_value_exact: &'a mut [Option], is_max_value_exact: &'a mut [Option], column_byte_sizes: &'a mut [Precision], + distinct_counts_array: &'a mut [Precision], } -fn summarize_min_max_null_counts( +impl StatisticsAccumulators<'_> { + /// Converts the accumulated statistics into a vector of `ColumnStatistics` + fn build_column_statistics(&mut self, schema: &Schema) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match ( + self.max_accs.get_mut(i).unwrap(), + self.is_max_value_exact.get(i).unwrap(), + ) { + (Some(max_value), Some(true)) => { + max_value.evaluate().ok().map(Precision::Exact) + } + (Some(max_value), Some(false)) | (Some(max_value), None) => { + max_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + let min_value = match ( + self.min_accs.get_mut(i).unwrap(), + self.is_min_value_exact.get(i).unwrap(), + ) { + (Some(min_value), Some(true)) => { + min_value.evaluate().ok().map(Precision::Exact) + } + (Some(min_value), Some(false)) | (Some(min_value), None) => { + min_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + ColumnStatistics { + null_count: self.null_counts_array[i], + max_value: max_value.unwrap_or(Precision::Absent), + min_value: min_value.unwrap_or(Precision::Absent), + sum_value: Precision::Absent, + distinct_count: self.distinct_counts_array[i], + byte_size: self.column_byte_sizes[i], + } + }) + .collect() + } +} + +fn summarize_column_statistics( parquet_schema: &SchemaDescriptor, logical_file_schema: &Schema, physical_file_schema: &Schema, @@ -541,6 +546,39 @@ fn summarize_min_max_null_counts( ) .map(|(idx, _)| idx); + // Extract distinct counts from row group column statistics + accumulators.distinct_counts_array[logical_schema_index] = + if let Some(parquet_idx) = parquet_index { + let num_row_groups = row_groups_metadata.len(); + let distinct_counts: Vec = row_groups_metadata + .iter() + .filter_map(|rg| { + rg.columns() + .get(parquet_idx) + .and_then(|col| col.statistics()) + .and_then(|stats| stats.distinct_count_opt()) + }) + .collect(); + + let coverage = distinct_counts.len() as f64 / num_row_groups.max(1) as f64; + + if coverage < PARTIAL_NDV_THRESHOLD { + Precision::Absent + } else if distinct_counts.len() == 1 && num_row_groups == 1 { + // Single row group with distinct count - use exact value + Precision::Exact(distinct_counts[0] as usize) + } else { + // Multiple row groups - use max as a lower bound estimate + // (can't accurately merge NDV since duplicates may exist across row groups) + match distinct_counts.iter().max() { + Some(&max_ndv) => Precision::Inexact(max_ndv as usize), + None => Precision::Absent, + } + } + } else { + Precision::Absent + }; + let arrow_field = logical_file_schema.field(logical_schema_index); accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size( arrow_field.data_type(), @@ -805,4 +843,392 @@ mod tests { assert_eq!(result, Some(false)); } } + + mod ndv_tests { + use super::*; + use arrow::datatypes::Field; + use parquet::arrow::parquet_to_arrow_schema; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use parquet::file::statistics::Statistics as ParquetStatistics; + use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + use std::fs::File; + use std::path::PathBuf; + + fn create_schema_descr(num_columns: usize) -> Arc { + let fields: Vec> = (0..num_columns) + .map(|i| { + Arc::new( + SchemaType::primitive_type_builder( + &format!("col_{i}"), + PhysicalType::INT32, + ) + .build() + .unwrap(), + ) + }) + .collect(); + + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + + fn create_arrow_schema(num_columns: usize) -> SchemaRef { + let fields: Vec = (0..num_columns) + .map(|i| Field::new(format!("col_{i}"), DataType::Int32, true)) + .collect(); + Arc::new(Schema::new(fields)) + } + + fn create_row_group_with_stats( + schema_descr: &Arc, + column_stats: Vec>, + num_rows: i64, + ) -> RowGroupMetaData { + let columns: Vec = column_stats + .into_iter() + .enumerate() + .map(|(i, stats)| { + let mut builder = + ColumnChunkMetaData::builder(schema_descr.column(i)); + if let Some(s) = stats { + builder = builder.set_statistics(s); + } + builder.set_num_values(num_rows).build().unwrap() + }) + .collect(); + + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_total_byte_size(1000) + .set_column_metadata(columns) + .build() + .unwrap() + } + + fn create_parquet_metadata( + schema_descr: Arc, + row_groups: Vec, + ) -> ParquetMetaData { + use parquet::file::metadata::FileMetaData; + + let num_rows: i64 = row_groups.iter().map(|rg| rg.num_rows()).sum(); + let file_meta = FileMetaData::new( + 1, // version + num_rows, // num_rows + None, // created_by + None, // key_value_metadata + schema_descr, // schema_descr + None, // column_orders + ); + + ParquetMetaData::new(file_meta, row_groups) + } + + #[test] + fn test_distinct_count_single_row_group_with_ndv() { + // Single row group with distinct count should return Exact + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create statistics with distinct_count = 42 + let stats = ParquetStatistics::int32( + Some(1), // min + Some(100), // max + Some(42), // distinct_count + Some(0), // null_count + false, // is_deprecated + ); + + let row_group = + create_row_group_with_stats(&schema_descr, vec![Some(stats)], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Exact(42) + ); + } + + #[test] + fn test_distinct_count_multiple_row_groups_with_ndv() { + // Multiple row groups with distinct counts should return Inexact (sum) + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Row group 1: distinct_count = 10 + let stats1 = ParquetStatistics::int32( + Some(1), + Some(50), + Some(10), // distinct_count + Some(0), + false, + ); + + // Row group 2: distinct_count = 20 + let stats2 = ParquetStatistics::int32( + Some(51), + Some(100), + Some(20), // distinct_count + Some(0), + false, + ); + + let row_group1 = + create_row_group_with_stats(&schema_descr, vec![Some(stats1)], 500); + let row_group2 = + create_row_group_with_stats(&schema_descr, vec![Some(stats2)], 500); + let metadata = + create_parquet_metadata(schema_descr, vec![row_group1, row_group2]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + // Max of distinct counts (lower bound since we can't accurately merge NDV) + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); + } + + #[test] + fn test_distinct_count_no_ndv_available() { + // No distinct count in statistics should return Absent + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create statistics without distinct_count (None) + let stats = ParquetStatistics::int32( + Some(1), + Some(100), + None, // no distinct_count + Some(0), + false, + ); + + let row_group = + create_row_group_with_stats(&schema_descr, vec![Some(stats)], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent + ); + } + + #[test] + fn test_distinct_count_partial_ndv_below_threshold() { + // 1 of 2 row groups has NDV (50% < 75% threshold) -> Absent + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + let stats1 = + ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0), false); + let stats2 = + ParquetStatistics::int32(Some(51), Some(100), None, Some(0), false); + + let row_group1 = + create_row_group_with_stats(&schema_descr, vec![Some(stats1)], 500); + let row_group2 = + create_row_group_with_stats(&schema_descr, vec![Some(stats2)], 500); + let metadata = + create_parquet_metadata(schema_descr, vec![row_group1, row_group2]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent + ); + } + + #[test] + fn test_distinct_count_partial_ndv_above_threshold() { + // 3 of 4 row groups have NDV (75% >= 75% threshold) -> Inexact + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + let stats_with = |ndv| { + ParquetStatistics::int32(Some(1), Some(100), Some(ndv), Some(0), false) + }; + let stats_without = + ParquetStatistics::int32(Some(1), Some(100), None, Some(0), false); + + let rg1 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(10))], + 250, + ); + let rg2 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(20))], + 250, + ); + let rg3 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with(15))], + 250, + ); + let rg4 = create_row_group_with_stats( + &schema_descr, + vec![Some(stats_without)], + 250, + ); + let metadata = + create_parquet_metadata(schema_descr, vec![rg1, rg2, rg3, rg4]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); + } + + #[test] + fn test_distinct_count_multiple_columns() { + // Test with multiple columns, each with different NDV + let schema_descr = create_schema_descr(3); + let arrow_schema = create_arrow_schema(3); + + // col_0: distinct_count = 5 + let stats0 = + ParquetStatistics::int32(Some(1), Some(10), Some(5), Some(0), false); + // col_1: no distinct_count + let stats1 = + ParquetStatistics::int32(Some(1), Some(100), None, Some(0), false); + // col_2: distinct_count = 100 + let stats2 = + ParquetStatistics::int32(Some(1), Some(1000), Some(100), Some(0), false); + + let row_group = create_row_group_with_stats( + &schema_descr, + vec![Some(stats0), Some(stats1), Some(stats2)], + 1000, + ); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Exact(5) + ); + assert_eq!( + result.column_statistics[1].distinct_count, + Precision::Absent + ); + assert_eq!( + result.column_statistics[2].distinct_count, + Precision::Exact(100) + ); + } + + #[test] + fn test_distinct_count_no_statistics_at_all() { + // No statistics in row group should return Absent for all stats + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(1); + + // Create row group without any statistics + let row_group = create_row_group_with_stats(&schema_descr, vec![None], 1000); + let metadata = create_parquet_metadata(schema_descr, vec![row_group]); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &arrow_schema, + ) + .unwrap(); + + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent + ); + } + + /// Integration test that reads a real Parquet file with distinct_count statistics. + /// The test file was created with DuckDB and has known NDV values: + /// - id: NULL (high cardinality, not tracked) + /// - category: 10 distinct values + /// - name: 5 distinct values + #[test] + fn test_distinct_count_from_real_parquet_file() { + // Path to test file created by DuckDB with distinct_count statistics + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("src/test_data/ndv_test.parquet"); + + let file = File::open(&path).expect("Failed to open test parquet file"); + let reader = + SerializedFileReader::new(file).expect("Failed to create reader"); + let parquet_metadata = reader.metadata(); + + // Derive Arrow schema from parquet file metadata + let arrow_schema = Arc::new( + parquet_to_arrow_schema( + parquet_metadata.file_metadata().schema_descr(), + None, + ) + .expect("Failed to convert schema"), + ); + + let result = DFParquetMetadata::statistics_from_parquet_metadata( + parquet_metadata, + &arrow_schema, + ) + .expect("Failed to extract statistics"); + + // id: no distinct_count (high cardinality) + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Absent, + "id column should have Absent distinct_count" + ); + + // category: 10 distinct values + assert_eq!( + result.column_statistics[1].distinct_count, + Precision::Exact(10), + "category column should have Exact(10) distinct_count" + ); + + // name: 5 distinct values + assert_eq!( + result.column_statistics[2].distinct_count, + Precision::Exact(5), + "name column should have Exact(5) distinct_count" + ); + } + } } diff --git a/datafusion/datasource-parquet/src/test_data/ndv_test.parquet b/datafusion/datasource-parquet/src/test_data/ndv_test.parquet new file mode 100644 index 0000000000000..3ecbe320f506e Binary files /dev/null and b/datafusion/datasource-parquet/src/test_data/ndv_test.parquet differ diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dafcd6ee4014d..db550e7f147d1 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -49,7 +49,7 @@ use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::stats::Precision; +use datafusion_common::stats::{Precision, estimate_ndv_with_overlap}; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, @@ -886,100 +886,6 @@ fn union_distinct_count( Precision::Inexact(ndv_left + ndv_right) } -/// Estimates the distinct count for a union using range overlap, -/// following the approach used by Trino: -/// -/// Assumes values are distributed uniformly within each input's -/// `[min, max]` range (the standard assumption when only summary -/// statistics are available, classic for scalar-based statistics -/// propagation). Under uniformity the fraction of an input's -/// distinct values that land in a sub-range equals the fraction of -/// the range that sub-range covers. -/// -/// The combined value space is split into three disjoint regions: -/// -/// ```text -/// |-- only A --|-- overlap --|-- only B --| -/// ``` -/// -/// * **Only in A/B** – values outside the other input's range -/// contribute `(1 − overlap_a) · NDV_a` and `(1 − overlap_b) · NDV_b`. -/// * **Overlap** – both inputs may produce values here. We take -/// `max(overlap_a · NDV_a, overlap_b · NDV_b)` rather than the -/// sum because values in the same sub-range are likely shared -/// (the smaller set is assumed to be a subset of the larger). -/// This is conservative: it avoids inflating the NDV estimate, -/// which is safer for downstream join-order decisions. -/// -/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`, -/// from full overlap to no overlap. Boundary cases confirm this: -/// disjoint ranges → `NDV_a + NDV_b`, identical ranges → -/// `max(NDV_a, NDV_b)`. -/// -/// ```text -/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection] -/// + (1 - overlap_a) * NDV_a [only in A] -/// + (1 - overlap_b) * NDV_b [only in B] -/// ``` -fn estimate_ndv_with_overlap( - left: &ColumnStatistics, - right: &ColumnStatistics, - ndv_left: usize, - ndv_right: usize, -) -> Option { - let min_left = left.min_value.get_value()?; - let max_left = left.max_value.get_value()?; - let min_right = right.min_value.get_value()?; - let max_right = right.max_value.get_value()?; - - let range_left = max_left.distance(min_left)?; - let range_right = max_right.distance(min_right)?; - - // Constant columns (range == 0) can't use the proportional overlap - // formula below, so check interval overlap directly instead. - if range_left == 0 || range_right == 0 { - let overlaps = min_left <= max_right && min_right <= max_left; - return Some(if overlaps { - usize::max(ndv_left, ndv_right) - } else { - ndv_left + ndv_right - }); - } - - let overlap_min = if min_left >= min_right { - min_left - } else { - min_right - }; - let overlap_max = if max_left <= max_right { - max_left - } else { - max_right - }; - - // Short-circuit: when there's no overlap the formula naturally - // degrades to ndv_left + ndv_right (overlap_range = 0 gives - // overlap_left = overlap_right = 0), but returning early avoids - // the floating-point math and a fallible `distance()` call. - if overlap_min > overlap_max { - return Some(ndv_left + ndv_right); - } - - let overlap_range = overlap_max.distance(overlap_min)? as f64; - - let overlap_left = overlap_range / range_left as f64; - let overlap_right = overlap_range / range_right as f64; - - let intersection = f64::max( - overlap_left * ndv_left as f64, - overlap_right * ndv_right as f64, - ); - let only_left = (1.0 - overlap_left) * ndv_left as f64; - let only_right = (1.0 - overlap_right) * ndv_right as f64; - - Some((intersection + only_left + only_right).round() as usize) -} - fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { let Statistics { num_rows: right_num_rows,