Skip to content

feat: Extract NDV (distinct_count) statistics from Parquet metadata#19957

Open
asolimando wants to merge 12 commits intoapache:mainfrom
asolimando:asolimando/ndv-parquet-support
Open

feat: Extract NDV (distinct_count) statistics from Parquet metadata#19957
asolimando wants to merge 12 commits intoapache:mainfrom
asolimando:asolimando/ndv-parquet-support

Conversation

@asolimando
Copy link
Member

Which issue does this PR close?

Related: #18628, #8227

(I am not sure if an new issue specifically for the scope of the PR is needed, happy to create it if needed)

Rationale for this change

This work originates from a discussion in datafusion-distributed about improving the TaskEstimator API:
datafusion-contrib/datafusion-distributed#296 (comment)

We agreed that improved statistics support in DataFusion would benefit both projects. For distributed-datafusion, better cardinality estimation helps decide how to split computation across network boundaries.

This also benefits DataFusion directly, as CBO is already in place, for example, join cardinality estimation (joins/utils.rs:586-646) uses distinct_count via max_distinct_count to compute join selectivity.

Currently this field is always Absent when reading from Parquet, so this PR fills that gap.

What changes are included in this PR?

Commit 1 - Reading NDV from Parquet files:

  • Extract distinct_count from Parquet row group column statistics
  • Single row group with NDV -> Precision::Exact(ndv)
  • Multiple row groups with NDV -> Precision::Inexact(max) as conservative lower bound
  • No NDV available -> Precision::Absent

Commit 2 - Statistics propagation (can be split to a separate PR, if preferred):

  • Statistics::try_merge(): use max as conservative lower bound instead of discarding NDV
  • Projection: preserve NDV for single-column expressions as upper bound

I'm including the second commit to showcase how I intend to use the statistics, but these changes can be split to a follow-up PR to keep review scope limited.

Are these changes tested?

Yes, 7 unit tests are added for NDV extraction:

  • Single/multiple row groups with NDV
  • Partial NDV availability across row groups
  • Multiple columns with different NDV values
  • Integration test reading a real Parquet file with distinct_count statistics (following the pattern in
    row_filter.rs:685-696, using parquet_to_arrow_schema to derive the schema from the file)

Are there any user-facing changes?

No breaking changes. Statistics consumers will now see populated distinct_count values when available in Parquet metadata.

Disclaimer: I used AI (Claude Code) to assist translating my ideas into code as I am still ramping up with the codebase and especially with Rust (guidance on both aspects is highly appreciated). I have a good understanding of the core concepts (statistics, CBO etc.) and have carefully double-checked that the PR matches my intentions and understanding.

cc: @gabotechs @jayshrivastava @NGA-TRAN @gene-bordegaray

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates common Related to common crate datasource Changes to the datasource crate labels Jan 23, 2026
Copy link
Contributor

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few minor comments but this looks good 💯

@github-actions github-actions bot added the core Core DataFusion crate label Jan 23, 2026
Copy link
Contributor

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great refactor, very clean

Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good! I just have one comment to explore what would it take to be a bit more forward looking and try to provide some more foundational tools for stats propagation across expressions. See #19957 (comment)

// TODO stats: estimate more statistics from expressions
// (expressions should compute their statistics themselves)
ColumnStatistics::new_unknown()
// TODO: expressions should compute their own statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO comment is right on point: if we wanted to compute statistics for all expressions here, it would become unmanageable pretty soon.

Even if the estimation about expressions referencing single columns here is an improvement, it would be awesome if we could try to be one step ahead and think how proper stats propagation across expressions would look like. That way, this PR, instead of special-handling one particular case, it could be setting the foundation on top of which more people can start contributing smarter stats calculation.

I see that some prior work has been done in order to improve stats propagation in expressions. Some context here:

By looking at the API for handling statistics in expressions shipped in #14699... I would not know how to use it in order to properly calculate NDV values. I get the impression that 1) we do not want to introduce yet another API for propagating stats across expressions and 2) the current "new" API shipped in #14699 is not suitable for propagating NDV values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any thoughts about how the current Distribution-based API could be used for propagating NDV stats across expressions?

Copy link
Member Author

@asolimando asolimando Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I have an idea of how this could evolve, based on my experience with Apache Calcite.

The idea is to make statistics propagation pluggable, with each relational operator having a default but configurable logic for how statistics propagate through it.

The default implementation would follow classic Selinger-style estimation (selectivity factors, independence assumptions), as seen in this PR. A nice intro to this can be found here. That's what most OSS databases implement by default.

Following DataFusion's philosophy as a customizable framework, users should be able to override and complement this logic when needed.

Proposed architecture:

  • StatisticsProvider: chain element that computes statistics for specific operators (returns Computed or Delegate)
  • StatisticsRegistry: chains providers, would live in SessionState
  • CardinalityEstimator: unified interface for metadata queries (row count, selectivity, NDV, ...) - similar to Calcite's RelMetadataProvider/RelMetadataQuery
  • ExtendedStatistics: Statistics with type-safe custom extensions for histograms, sketches, etc. (I am looking at type-erased maps for that but I am not sure that's the best way to implement it)
  • ExpressionAnalyzerRegistry+ExpressionAnalyzer: similar concept of chain for expression analyzers, equivalent to what detailed above for the operators, so that built-in and UDF can be covered

This follows the same chain-of-responsibility pattern that the https://datafusion.apache.org/blog/2026/01/12/extending-sql/ solved for custom syntax/relations. Built-in operators get default handling, custom ExecutionPlan nodes can plug in their own logic, and unknown cases delegate down the chain. To override the default estimation (e.g., with histogram-based approaches), you register your provider before the default one.

StatisticsV2 and Distribution-based are very advanced and interesting statistics, but I see them more as an extension via ExtendedStatistics than taking over the default implementation. What you are wondering about NDVs, for instance, is correct, it deals with distributions but as-is it can't answer question around number of distinct values.

If this sounds interesting and aligns with community interest, I can provide a more detailed design doc and an epic to break down the work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For StatisticsProvider, StatisticsRegistry and CardinalityEstimator. Do you think there is some overlap with the existing ExecutionPlan::partition_statistics() and PhysicalExpr::evaluate_statistics()? would it be appropriate to extend the functionality shipped in both methods? or are you considering StatisticsProvider, StatisticsRegistry and CardinalityEstimator to be a separate mechanism for providing statistics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new constructs are an extensibility layer that builds on top of the existing methods rather than replacing them.

For instance, the DefaultStatisticsProvider will simply delegate to partition_statistics().

The key difference is that by adopting StatisticsProvider/StatisticsRegistry pattern, the user can:

  • Register custom providers for custom operators without trait changes
  • Override default estimation for specific (even built-in) operators
  • Carry extended statistics (histograms, sketches) through ExtendedStatistics

Similarly, PhysicalExpr::evaluate_statistics() will be the default for ExpressionAnalyzer, but it will allow to override the behavior for built-in expressions, and it will easily provide an extension point for user-defined functions.

I totally understand the concern about the blast radius, but if there is consensus on doubling down on statistics and CBO (by making them more effective in driving planning decisions, when stats are available), it will become necessary to provide flexibility to override and extend for people implementing on top of DataFusion, to steer planning decisions as the user sees fit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply @adriangb, I have been off for a little while.

I have been following #19609 with lots of interest, and in my understanding it's dealing with statistics for what concerns filtering and predicate pruning, so the two approaches are orthogonal and complement each other.

This PR focuses on NDV, laying the foundation of improved cardinality estimation (which we will soon be able to precisely measure thanks to #20292).

For DataFusion, the benefit is for improving some existing configuration options, when statistics are available, a few examples for NDV:

  • prefer_hash_join: with NDV the optimizer could compare HashJoin vs SortMergeJoin cost, especially when downstream ordering is needed, enabling per-join cost-based decisions
  • default_filter_selectivity (20% hardcoded value): with NDV, equality filters become 1/NDV(col), IN-lists become list_size/NDV(col), etc.

This is also relevant for distributed DataFusion, where cardinality estimation plays an even larger role in physical planning (many planning choices can't be corrected via adaptive query processing in a distributed setup).

The PR got stale and I will need to rebase on current main branch, so interested reviewers can take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it would be better to split into two pull requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it would be better to split into two pull requests.

Agreed, I have created a separate branch (asolimando/ndv-expression-analyzer), I will open a second PR for the ExpressionAnalyzer where I will "port" and discuss in details your comments, so they don't slip through the cracks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this thread for my review:

I’d prefer to drop the new NDV propagation in projections from this PR. The heuristic “expression references a single column” is too broad to be correct for NDV. The thread already suggests handling this in a separate expression statistics design. For this PR I think derived expressions should stay unknown, then address the rest in a follow-up

Copy link
Member Author

@asolimando asolimando Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving as it seems the comments have been considered, and the discussion will continue in the follow-up PR for ExpressionAnalyzer, feel free to unresolve if you think otherwise.

EDIT: sorry the page didn't refresh and I missed your message. I agree, I have reverted it in 0e23ef1, expression-level statistics will be addressed in a follow-up, this was there just to show-case how one could use it, but not worth keeping here, thanks again for the reminder!

@xudong963 xudong963 self-requested a review January 26, 2026 13:48
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jan 29, 2026
@asolimando asolimando force-pushed the asolimando/ndv-parquet-support branch from d725cc0 to 2a67096 Compare March 2, 2026 17:41
Copy link
Contributor

@jonathanc-n jonathanc-n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cool change!

};

// Get input NDV
let Some(input_ndv) = DefaultExpressionAnalyzer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this API using DefaultExpressionAnalyzer here might not be what we want

If i had impl ExpressionAnalyzer for InsertNewNDV the problem is that I would have to handle all the cases with different column functions (ex. UPPER, LOWER, etc.), leading to a lot of duplicate functionality. Instead what if we pass in the list of analyzers here is that instead of DefaultExpressionAnalyzer.get_distinct_count we can call analyzer_list[0].get_distinct_count or something.

So if we add a custom analyzer to change the input NDV with our own statistics (ex. histogram), we do not need to reimplement the other functionality

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will address in the follow-up PR. The function analyzers should delegate through the chain rather than hard-coding DefaultExpressionAnalyzer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self for ExpressionAnalyzer: it addresses #992 which got closed recently as non-actionable, a new issue will be needed but the old one should be referenced

}

/// Create a registry with all built-in analyzers (string, math, datetime, default).
pub fn with_builtin_analyzers() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When called in projection.rs this is doing vec + 4 arc allocations. I think we can pass this in through sessioncontext to avoid initializing for every expression

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will move it to SessionContext/SessionState in the follow-up PR, thanks for your suggestion

}
}

impl ExpressionAnalyzer for DefaultExpressionAnalyzer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could move all of this to its own folder later on to declutter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will do in the follow-up PR for the ExpressionAnalyzer


if distinct_counts.is_empty() {
Precision::Absent
} else if distinct_counts.len() == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are merging multiple row groups and only one of them have NDV, then it will trigger this path and assign that distinct_count to statistics despite having many unknown NDVs.

There are two scenarios here:

large fraction of ndvs missing from row groups -> return Absent
only a small fraction NDVs missing from row groups -> return inexact

What the fraction should be is debatable, maybe like 1/4 or something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. As a first step I agree on a threshold + configuration for this.

That said, since DataFusion is a framework, how to propagate statistics is really dependent on the system, data, etc. I'm working towards a fully customizable solution for statistics propagation, not just at the expression level (as shown with ExpressionAnalyzer in this PR), but also at the operator level.

The WIP branch is here:
https://github.com/asolimando/datafusion/tree/asolimando/statistics-planner-prototype - it introduces a StatisticsProvider/StatisticsRegistry chain-of-responsibility pattern for operator-level stats, similar in spirit to ExpressionAnalyzer but for ExecutionPlan nodes. I will open PRs for it, but any feedback on the general approach is welcome. For instance, the shortcoming you highlighted for ExpressionAnalyzer (hard-coded delegation instead of going through the chain) does not affect the operator-level design, but there might be other limitations I'm not seeing.

A few things are still pending as I'd like to make use of #20184 (unmerged) and #20570 (that just got merged).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I will try to get #20184 out today ot tomorrow now that #20570 is out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the partial-coverage case still needs tightening.

If we merge multiple row groups and only some of them expose NDV returning NDV as Exact for the whole file doesn't seem accurrate to me. Could we return make it Inexact?

The test test_distinct_count_partial_ndv_in_row_groups seems wrong asserting Exact(15) for a file where another row group has unknown NDV.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder @gene-bordegaray, I have followed the advise of @jonathanc-n in faf5211.

Added a PARTIAL_NDV_THRESHOLD constant (0.75): if fewer than 75% of row groups report NDV, return Absent. Kept as a constant rather than a config option since the statistics extraction path has no config access today. It would require updating a lot of signatures, we can consider that for later.

@asolimando asolimando force-pushed the asolimando/ndv-parquet-support branch from 2a67096 to 436df5a Compare March 10, 2026 09:16
@github-actions github-actions bot removed the physical-plan Changes to the physical-plan crate label Mar 10, 2026
Comment on lines +640 to +641
// Use max as a conservative lower bound for distinct count
// (can't accurately merge NDV since duplicates may exist across partitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that the conservative lower-bound estimation could cause huge inaccuracy.

For example, we have 100 parts, and each part has 100 rows, and they are linearly increasing. The reality is that the ndv should be 100 * 100, but now we evaluate it as 100. The result could cause inaccuracies to propagate throughout subsequent cost estimation algorithms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyperLogLog is a classic way to process this, but for a large dataset, it takes a long time to process.

IIRC, the merge method may be called during query execution, so using HyperLogLog here isn't ideal.

So I'm looking forward to hearing some other thoughts!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that the conservative lower-bound estimation could cause huge inaccuracy.

For example, we have 100 parts, and each part has 100 rows, and they are linearly increasing. The reality is that the ndv should be 100 * 100, but now we evaluate it as 100. The result could cause inaccuracies to propagate throughout subsequent cost estimation algorithms.

You raise a good point, the true NDV across partitions lies in [max(partition NDVs), sum(partition NDVs)]: max when values fully overlap (e.g., a status column), sum when completely disjoint (e.g., order_id), no single strategy is optimal.

This is how different major OSS systems deal with the problem:

  • Trino: max across partitions (source). Known to be problematic with disjoint domains: trinodb/trino#50 shows 62K estimated vs 150M actual. This is what I considered in the current proposal.

  • Spark: ANALYZE TABLE runs a full-table HyperLogLogPlusPlus aggregation (source), so there are no per-partition NDVs to merge. But this strategy doesn't seem to be fitting here.

  • Hive: stores HLL bitvector sketches per partition in the metastore, merges them at planning time (HLL merge, aggregator merge), and interpolates for missing partitions by merging adjacent bitvectors and extrapolating. Note that this is exactly what trinodb/trino#50 suggests, but in this first PR I aimed for simplicity.

  • DuckDB: HLL merge with Good-Turing sampling correction (merge, Good-Turing), but only for its native format. No HLL data available when reading Parquet. Good-Turing won't help here but it could be used for feat: Extract NDV (distinct_count) statistics from Parquet metadata #19957 (comment), to extrapolate NDVs from a sample of the whole population, even if sampling should be random (cc: @jonathanc-n, as a complement to the threshold strategy?).

HyperLogLog is a classic way to process this, but for a large dataset, it takes a long time to process.

The core constraint is that Parquet stores NDV as a single optional i64 (spec), not a sketch, and most writers don't even set it (see apache/arrow-rs#8608). So when merging row groups, we're interpolating from scalar values.

Regarding HLL cost: sketches are compact (~1.6KB for 2% error), merge is O(registers), so it's cheap at planning time. The cost is at ingestion. Data sketches are standard practice at scale: Hive uses HLL for NDV and KLL for histograms/quantiles (e.g., HIVE-26243, HIVE-26221). In extreme cases sketch size might be a concern, but it's not the general case in my experience.

IIRC, the merge method may be called during query execution, so using HyperLogLog here isn't ideal.

To clarify: try_merge is only called at planning time (by partition_statistics(None) in optimizer rules like JoinSelection, and during file group metadata collection), not during query execution. This is orthogonal to statistics-based pruning work like #19609.

I mentioned JoinSelection as a concrete use of NDV, but NDV can be used to drive other planning decisions (#20766 covers many of them). Some examples:

  • Aggregation pushdown: NDV estimates the grouping reduction ratio: overestimating NDV makes the optimizer skip beneficial pushdowns, underestimating makes it push down aggregation expecting good compression that doesn't materialize.
  • Hash table sizing via NDV: underestimate causes resizing/spilling, overestimate wastes memory.
  • Filters (NDV for IN/equality): underestimating makes a filter push-down look more appealing than it is, overestimating might make us not push the filter down.

Bottom line is: there's no universally "safe" direction, the impact depends on how the statistic is consumed/used, since the consumers of NDV aren't fully defined yet, there is no strong guidance available.

Longer term, if we invest in CBO and statistics (as #8227 and #20766 suggest), the "one size fits all" problem will be exacerbated for downstream projects. Systems like Hive, Spark, or DuckDB, being end-user databases/warehouses, can make hard choices (e.g., Hive mandating HLL in the metastore). DataFusion is a framework, so we can't take one single route with no way to override. Decision points need to be tunable: what and how you read stats, how to propagate them through operators and expressions, and how to use them for planning decisions, for both built-in and custom stats, expressions, and operators.

I'm working on this via a StatisticsProvider/StatisticsRegistry pattern (WIP branch), including ExtendedStatistics for custom data (histograms, sketches, etc.). For reading custom stats from Parquet, user-defined Parquet indexes could provide the ingestion mechanism: embed HLL sketches alongside Parquet data and read them at planning time through the provider chain.

Until we have that, maybe we should gate this novel propagation mechanism with a configuration property, so we are free to experiment without causing unexpected changes in existing planning? For NDV I haven't done it because, as discussed, it's not usually set by standard writers, so I don't expect any impact unless you make a conscious effort to write them and use them, but if we feel unsure I can add that safeguard.

Apologies for the wall of text! This discussion deserves its own space, but I wanted to address your concern here, happy to take the discussion to an RFC or issue, as you see fit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @jonathanc-n in #20846 (comment) proposes to use what Trino has for updating NDV when min and max are available, which is quite elegant (quoting from his message):

// for merging A + B when min/max are available

overlap_a = (overlap range) / (A's range)  // fraction of A's range that overlaps with B
overlap_b = (overlap range) / (B's range)  // fraction of B's range that overlaps with A

new_ndv = max(overlap_a * NDV_a, overlap_b * NDV_b)  // NDV in the overlapping range
        + (1 - overlap_a) * NDV_a                     // NDV unique to A's range
        + (1 - overlap_b) * NDV_b                     // NDV unique to B's range

The formula ranges between [max(ndvs), sum(ndvs)], from full overlap to no overlap (under the uniform distribution of NDV values in the [min, max] range, which is classic for scalar-based statistics propagation).

When min/max are not available, we can fall back to max, as currently implemented.

I can update try_merge accordingly, if you agree, @xudong963 @jonathanc-n.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me, thanks for the sweet reply.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you approved #20846 already, and the code in union.rs::col_stats_union is exactly what we want here too, I plan to wait for that PR to get merged, then turn estimate_ndv_with_overlap into a utility function in datafusion-common/src/stats.rs for reuse (all the types it needs already live there).

The formula is binary, but try_merge folds over many row groups, iterative pairwise application works naturally. No extra heap allocations: distance() returns a stack usize, the rest is f64 arithmetic on top of the min/max comparisons try_merge already does.

Note: in #20846 the fallback when min/max are absent is sum (sensible for Union). For row group merging I'll keep max as fallback, since row groups from the same file are more likely to have overlapping values. I will make sure to generalize the current code to handle both cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the #20846 is merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep max() as the merge rule, I think we should document that merged NDV is treated as a lower-bound-ish estimate in public facing comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 72d3be3. Thanks for the heads up on #20846!

I have finally replaced max(left, right) with the overlap-based formula from that PR, now extracted into a shared estimate_ndv_with_overlap in stats.rs used by both try_merge and Union, as agreed with @xudong963.

I have added tests covering disjoint, identical, partial overlap, missing min/max, non-numeric, and constant column cases. No Union tests changed, as expected from a pure extraction/refactoring.

Is this matching your expectations, @gene-bordegaray?

Apologies again for the force push but building on #20846 seemed really worth it, apart from SHAs, the only changes are the two topmost, addressing pending points.

@asolimando asolimando force-pushed the asolimando/ndv-parquet-support branch 2 times, most recently from e85c430 to e352c28 Compare March 10, 2026 15:16
@asolimando
Copy link
Member Author

asolimando commented Mar 10, 2026

Rebased on latest main to fix the branch drift. No code changes, just new SHAs from the rebase.

@gene-bordegaray: the code you reviewed is unchanged. The only addition since your approval is commit e352c28, which updates test expectations in test_hash_join_partition_statistics (added by #20711) to account for NDV propagation through merges. No new logic was added.

@jonathanc-n: as you suggested, the ExpressionAnalyzer commits have been removed from this PR and will come in a follow-up.

@xudong963: I replied to your comments on NDV merge strategy, let me know if you have further concerns or if we should take the discussion to an RFC/issue. EDIT: in #19957 (comment) a possible improvement over the max formula.

Copy link
Contributor

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few follow up questions and asks to tighten scope of this initial PR further

// TODO stats: estimate more statistics from expressions
// (expressions should compute their statistics themselves)
ColumnStatistics::new_unknown()
// TODO: expressions should compute their own statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this thread for my review:

I’d prefer to drop the new NDV propagation in projections from this PR. The heuristic “expression references a single column” is too broad to be correct for NDV. The thread already suggests handling this in a separate expression statistics design. For this PR I think derived expressions should stay unknown, then address the rest in a follow-up


if distinct_counts.is_empty() {
Precision::Absent
} else if distinct_counts.len() == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the partial-coverage case still needs tightening.

If we merge multiple row groups and only some of them expose NDV returning NDV as Exact for the whole file doesn't seem accurrate to me. Could we return make it Inexact?

The test test_distinct_count_partial_ndv_in_row_groups seems wrong asserting Exact(15) for a file where another row group has unknown NDV.

Comment on lines +640 to +641
// Use max as a conservative lower bound for distinct count
// (can't accurately merge NDV since duplicates may exist across partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep max() as the merge rule, I think we should document that merged NDV is treated as a lower-bound-ish estimate in public facing comments.

This change adds support for reading Number of Distinct Values (NDV)
statistics from Parquet file metadata when available.

Previously, `distinct_count` in `ColumnStatistics` was always set to
`Precision::Absent`. Now it is populated from parquet row group
column statistics when present:

- Single row group with NDV: `Precision::Exact(ndv)`
- Multiple row groups with NDV: `Precision::Inexact(max)` as lower bound
  (we can't accurately merge NDV since duplicates may exist across
  row groups; max is more conservative than sum for join cardinality
  estimation)
- No NDV available: `Precision::Absent`

This provides foundation for improved join cardinality estimation
and other statistics-based optimizations.

Relates to apache#15265
- Statistics merge: use max as conservative lower bound instead of
  discarding NDV (duplicates may exist across partitions)
- Projection: preserve NDV for single-column expressions as upper bound
Partition columns now preserve distinct_count as Inexact(1) when
merging statistics, reflecting that each partition file has a single
distinct partition value.
Use get_value().max() chain instead of verbose match statement for
merging NDV in Statistics::try_merge()
Encapsulate get_col_stats parameters by adding build_column_statistics()
method to StatisticsAccumulators, removing the standalone function.
Move imports to module level in ndv_tests since they're in their own
module anyway.
Replace the max(left, right) heuristic in try_merge with overlap-based
NDV estimation, which accounts for range overlap between partitions
instead of blindly picking the larger value.

Move estimate_ndv_with_overlap from union.rs (apache#20846) to stats.rs as a
pub function and reuse it in try_merge for row group/partition merging.

The merge loop is reordered so distinct_count is computed before min/max
updates (needs pre-merge ranges for the overlap formula). Fallback to
max(left, right) when min/max are absent or distance is unsupported.
When only a fraction of row groups report NDV statistics, the merged
estimate can be misleading. Require at least 75% of row groups to have
NDV before reporting an Inexact value, otherwise return Absent.
@asolimando asolimando force-pushed the asolimando/ndv-parquet-support branch from e352c28 to faf5211 Compare March 12, 2026 11:20
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 12, 2026
Remove the single-column expression heuristic for preserving NDV
in projections, as it is too broad to be correct. Expression-level
statistics propagation will be addressed in a follow-up design.
@github-actions github-actions bot removed the physical-expr Changes to the physical-expr crates label Mar 12, 2026
@asolimando
Copy link
Member Author

asolimando commented Mar 12, 2026

Rebased on latest main (includes #20846) and addressed all pending review comments (new changes are in the 4 topmost commits only, the rest is untouched modulo SHA changes for the rebase).

@xudong963: replaced the max(left, right) merge heuristic with the overlap-based formula from #20846 (comment with details)

@gene-bordegaray:

  • Reverted NDV propagation in projections as suggested, will address in a follow-up with proper expression statistics design (comment with details)
  • Added 75% threshold for partial NDV extraction from Parquet: if fewer than 75% of row groups report NDV, return Absent (comment with details)
  • The max() merge rule concern is also addressed by the overlap formula above

@jonathanc-n: partial NDV threshold addresses your comment as well.

Looking forward to hearing your thoughts!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants