Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 298 additions & 3 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,22 @@ impl Statistics {
.zip(column_statistics.iter_mut())
{
col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count);

// NDV must be computed before min/max update (needs pre-merge ranges)
col_stats.distinct_count = match (
col_stats.distinct_count.get_value(),
item_col_stats.distinct_count.get_value(),
) {
(Some(&l), Some(&r)) => Precision::Inexact(
estimate_ndv_with_overlap(col_stats, item_col_stats, l, r)
.unwrap_or_else(|| usize::max(l, r)),
),
_ => Precision::Absent,
};

col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value);
col_stats.distinct_count = Precision::Absent;
col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size);
}

Expand All @@ -672,6 +684,96 @@ impl Statistics {
}
}

/// Estimates the combined number of distinct values (NDV) when merging two
/// column statistics, using range overlap to avoid double-counting shared values.
///
/// Assumes values are distributed uniformly within each input's
/// `[min, max]` range (the standard assumption when only summary
/// statistics are available). Under uniformity the fraction of an input's
/// distinct values that land in a sub-range equals the fraction of
/// the range that sub-range covers.
///
/// The combined value space is split into three disjoint regions:
///
/// ```text
/// |-- only A --|-- overlap --|-- only B --|
/// ```
///
/// * **Only in A/B** - values outside the other input's range
/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`.
/// * **Overlap** - both inputs may produce values here. We take
/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the
/// sum because values in the same sub-range are likely shared
/// (the smaller set is assumed to be a subset of the larger).
///
/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
/// from full overlap to no overlap.
///
/// ```text
/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection]
/// + (1 - overlap_a) * NDV_a [only in A]
/// + (1 - overlap_b) * NDV_b [only in B]
/// ```
///
/// Returns `None` when min/max are absent or distance is unsupported
/// (e.g. strings), in which case the caller should fall back to a simpler
/// estimate.
pub fn estimate_ndv_with_overlap(
left: &ColumnStatistics,
right: &ColumnStatistics,
ndv_left: usize,
ndv_right: usize,
) -> Option<usize> {
let left_min = left.min_value.get_value()?;
let left_max = left.max_value.get_value()?;
let right_min = right.min_value.get_value()?;
let right_max = right.max_value.get_value()?;

let range_left = left_max.distance(left_min)?;
let range_right = right_max.distance(right_min)?;

// Constant columns (range == 0) can't use the proportional overlap
// formula below, so check interval overlap directly instead.
if range_left == 0 || range_right == 0 {
let overlaps = left_min <= right_max && right_min <= left_max;
return Some(if overlaps {
usize::max(ndv_left, ndv_right)
} else {
ndv_left + ndv_right
});
}

let overlap_min = if left_min >= right_min {
left_min
} else {
right_min
};
let overlap_max = if left_max <= right_max {
left_max
} else {
right_max
};

// Disjoint ranges: no overlap, NDVs are additive
if overlap_min > overlap_max {
return Some(ndv_left + ndv_right);
}

let overlap_range = overlap_max.distance(overlap_min)? as f64;

let overlap_left = overlap_range / range_left as f64;
let overlap_right = overlap_range / range_right as f64;

let intersection = f64::max(
overlap_left * ndv_left as f64,
overlap_right * ndv_right as f64,
);
let only_left = (1.0 - overlap_left) * ndv_left as f64;
let only_right = (1.0 - overlap_right) * ndv_right as f64;

Some((intersection + only_left + only_right).round() as usize)
}

/// Creates an estimate of the number of rows in the output using the given
/// optional value and exactness flag.
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
Expand Down Expand Up @@ -1380,8 +1482,201 @@ mod tests {
col_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(20)))
);
// Distinct count should be Absent after merge
assert_eq!(col_stats.distinct_count, Precision::Absent);
// Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10]
// range_left=9, range_right=15, overlap=5
// overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33
// result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10
assert_eq!(col_stats.distinct_count, Precision::Inexact(10));
}

#[test]
fn test_try_merge_ndv_disjoint_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(20))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(30))))
.with_distinct_count(Precision::Exact(8)),
);

let merged = stats1.try_merge(&stats2).unwrap();
// No overlap -> sum of NDVs
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(13)
);
}

#[test]
fn test_try_merge_ndv_identical_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(50)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(30)),
);

let merged = stats1.try_merge(&stats2).unwrap();
// Full overlap -> max(50, 30) = 50
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(50)
);
}

#[test]
fn test_try_merge_ndv_partial_overlap() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(80)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(50))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(150))))
.with_distinct_count(Precision::Exact(60)),
);

let merged = stats1.try_merge(&stats2).unwrap();
// overlap=[50,100], range_left=100, range_right=100, overlap_range=50
// overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30
// result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(110)
);
}

#[test]
fn test_try_merge_ndv_missing_min_max() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)),
);

let merged = stats1.try_merge(&stats2).unwrap();
// No min/max -> fallback to max(5, 8)
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}

#[test]
fn test_try_merge_ndv_non_numeric_types() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"aaa".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"zzz".to_string(),
))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"bbb".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"yyy".to_string(),
))))
.with_distinct_count(Precision::Exact(8)),
);

let merged = stats1.try_merge(&stats2).unwrap();
// distance() unsupported for strings -> fallback to max
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}

#[test]
fn test_try_merge_ndv_constant_columns() {
// Same constant: [5,5]+[5,5] -> max
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);

let merged = stats1.try_merge(&stats2).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(1)
);

// Different constants: [5,5]+[10,10] -> sum
let stats3 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats4 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(1)),
);

let merged = stats3.try_merge(&stats4).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(2)
);
}

#[test]
Expand Down
Loading
Loading