Use ParquetPushDecoder in ParquetOpener#20839
Conversation
|
run benchmarks |
|
🤖 |
| loop { | ||
| match decoder.try_decode() { | ||
| Ok(DecodeResult::NeedsData(ranges)) => { | ||
| match reader.get_byte_ranges(ranges.clone()).await { |
There was a problem hiding this comment.
This I think is what you meant @alamb this way instead of get_byte_ranges here, we can push this to a IO morsel queue instead (and possibly do prefetching / more coalescing / split IO requests / etc.)
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
🤖 |
1ca7dc5 to
3b9883f
Compare
3b9883f to
1c59959
Compare
| loop { | ||
| match state.decoder.try_decode() { | ||
| Ok(DecodeResult::NeedsData(ranges)) => { | ||
| match state.reader.get_byte_ranges(ranges.clone()).await { |
There was a problem hiding this comment.
It seems slightly faster to coalesce adjacent ranges (even for local storage) to remove some IO requests, mainly beneficial for TPC-DS it seems (see #20839 (comment)), and somehow for tpch_mem as well (I guess more batches will be batch_size sized).
But I kept it out of this diff.
There was a problem hiding this comment.
I think considering that optimization as a follow on is a good idea.
|
Checking this PR out now |
alamb
left a comment
There was a problem hiding this comment.
Thank you @Dandandan -- I think this looks great to me. I had some stylistic comments, but otherwise it is good to me
I think we should leave this PR open for at least another day in case anyone else would like to comment.
For anyone else reviewing this PR, the ParquetStreamDecoder that is currently used in DataFusion is just a wrapper over the ParquetPushDecoder so as @Dandandan I don't expect this to do anything except make it easier to control the decoding process more carefully in DataFusion
It would also be nice to move the bloom filter handling upstream to avoid having to use the ParquetStreamBuilder . I'll try and file a ticket upstream before we merge this one
| )?, | ||
| ); | ||
| let mut bf_builder = | ||
| ParquetRecordBatchStreamBuilder::new_with_metadata( |
There was a problem hiding this comment.
It is unfortunate that we need to create a new builder simply to read out the bloom filters (seems like because prune_by_bloom_filters calls ParquetRecordBatchStreamBuilder::get_row_group_column_bloom_filter
It seems like maybe upstream we should move the bloom filter reading into the ParquetMetadataDecoder 🤔 (as a follow on PR / cleanup)
There was a problem hiding this comment.
It turns out @ethe filed a very similar request here already:
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
I addressed the feedback, will merge tomorrow if no new comments. |
|
@mbutrovich FYI |
|
Thanks @Dandandan just yesterday I was looking into datafusion parquet reads takes more memory comparing to jvm+native and came to some hotspots, one of them as you mentioned in let me try to run my test case with this PR and see benefits |
In theory this change shouldn't change anything currently (pure refactoring to allow for future improvements), do you think it does? When running locally, I found we can make some queries a bit faster by coalescing adjacent ranges (this happens sometimes and saves IO requests / syscalls (and also combines a few allocations, but the latter effect will be small I think)), but left this for a future PR. Are you aware |
|
run benchmark clickbench_partitioned tpch tpcds |
|
🤖 |
Right, seems seeing the decoder currently being called
Initially I hoped the PR addresses some memory concerns we currently see 😄 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Thanks for the reviews! |
I think some of the changes we are planning with "Morsels" may make it easier to control memory usage (e.g. not buffer entire row groups). However, I don't have a concrete example to show yet |

Which issue does this PR close?
Rationale for this change
We want to split IO and CPU to allow for more (NUMA-aware) parallelism and utilizing IO and CPU better.
This allows for e.g. more coalescing, prefetching, parallel IO, more parallel / incremental decoding etc.
Also this allows doing morsels only on a CPU level and not doing IO multiple times for each morsel.
What changes are included in this PR?
Just refactor
ParquetOpenerto useParquetPushDecoder. I used claude to rewrite it and to keep changes small.Are these changes tested?
Existing tests. Nothing should change, the arrow-rs code also uses
ParquetPushDecoder.Are there any user-facing changes?