From aae8982cd02c408d88f80922529c315b47daaaa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Mar 2026 11:23:28 +0100 Subject: [PATCH 1/5] Rewrite ParquetOpener to use push-based ParquetPushDecoder Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource-parquet/src/opener.rs | 274 ++++++++++++-------- 1 file changed, 169 insertions(+), 105 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 108e8c5752017..20f44e5402a50 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -56,13 +56,15 @@ use crate::sort::reverse_row_selection; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{Stream, StreamExt, TryStreamExt, ready}; +use futures::{Stream, StreamExt, ready}; use log::debug; +use parquet::DecodeResult; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; @@ -167,17 +169,6 @@ impl PreparedAccessPlan { Ok(self) } - - /// Apply this access plan to a ParquetRecordBatchStreamBuilder - fn apply_to_builder( - self, - mut builder: ParquetRecordBatchStreamBuilder>, - ) -> ParquetRecordBatchStreamBuilder> { - if let Some(row_selection) = self.row_selection { - builder = builder.with_row_selection(row_selection); - } - builder.with_row_groups(self.row_group_indexes) - } } impl FileOpener for ParquetOpener { @@ -267,6 +258,9 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let parquet_file_reader_factory = Arc::clone(&self.parquet_file_reader_factory); + let partition_index = self.partition_index; + let metrics = self.metrics.clone(); let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -444,57 +438,14 @@ impl FileOpener for ParquetOpener { metadata_timer.stop(); - // --------------------------------------------------------- - // Step: construct builder for the final RecordBatch stream - // --------------------------------------------------------- - - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - async_file_reader, - reader_metadata, - ); - - // --------------------------------------------------------------------- - // Step: optionally add row filter to the builder - // - // Row filter is used for late materialization in parquet decoding, see - // `row_filter` for details. - // --------------------------------------------------------------------- - - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; - }; - if force_filter_selections { - builder = - builder.with_row_selection_policy(RowSelectionPolicy::Selectors); - } - // ------------------------------------------------------------ // Step: prune row groups by range, predicate and bloom filter // ------------------------------------------------------------ // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query - let file_metadata = Arc::clone(builder.metadata()); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let file_metadata = Arc::clone(reader_metadata.metadata()); + let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read let access_plan = @@ -506,13 +457,13 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { + if let Some(pruning_pred) = pruning_pred.as_ref() { if enable_row_group_stats_pruning { row_groups.prune_by_statistics( &physical_file_schema, - builder.parquet_schema(), + reader_metadata.parquet_schema(), rg_metadata, - predicate, + pruning_pred, &file_metrics, ); } else { @@ -524,11 +475,27 @@ impl FileOpener for ParquetOpener { } if enable_bloom_filter && !row_groups.is_empty() { + // Use the existing reader for bloom filter I/O; + // replace with a fresh reader for decoding below. + let bf_reader = std::mem::replace( + &mut async_file_reader, + parquet_file_reader_factory.create_reader( + partition_index, + partitioned_file.clone(), + metadata_size_hint, + &metrics, + )?, + ); + let mut bf_builder = + ParquetRecordBatchStreamBuilder::new_with_metadata( + bf_reader, + reader_metadata.clone(), + ); row_groups .prune_by_bloom_filters( &physical_file_schema, - &mut builder, - predicate, + &mut bf_builder, + pruning_pred, &file_metrics, ) .await; @@ -570,7 +537,7 @@ impl FileOpener for ParquetOpener { access_plan = p.prune_plan_with_page_index( access_plan, &physical_file_schema, - builder.parquet_schema(), + reader_metadata.parquet_schema(), file_metadata.as_ref(), &file_metrics, ); @@ -588,8 +555,59 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + if prepared_plan.row_group_indexes.is_empty() { + return Ok(futures::stream::empty().boxed()); + } + + // --------------------------------------------------------- + // Step: construct builder for the final RecordBatch stream + // --------------------------------------------------------- + + let mut builder = + ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone()) + .with_batch_size(batch_size); + + // --------------------------------------------------------------------- + // Step: optionally add row filter to the builder + // + // Row filter is used for late materialization in parquet decoding, see + // `row_filter` for details. + // --------------------------------------------------------------------- + + // Filter pushdown: evaluate predicates during scan + if let Some(predicate) = + pushdown_filters.then_some(predicate.as_ref()).flatten() + { + let row_filter = row_filter::build_row_filter( + predicate, + &physical_file_schema, + file_metadata.as_ref(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{predicate:?}': {e}" + ); + } + }; + }; + if force_filter_selections { + builder = + builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } + // Apply the prepared plan to the builder - builder = prepared_plan.apply_to_builder(builder); + if let Some(row_selection) = prepared_plan.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder = builder.with_row_groups(prepared_plan.row_group_indexes); if let Some(limit) = limit { builder = builder.with_limit(limit) @@ -603,11 +621,11 @@ impl FileOpener for ParquetOpener { let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + let mask = + ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone()); - let stream = builder + let decoder = builder .with_projection(mask) - .with_batch_size(batch_size) .with_metrics(arrow_reader_metrics.clone()) .build()?; @@ -617,57 +635,76 @@ impl FileOpener for ParquetOpener { file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - let stream_schema = Arc::clone(stream.schema()); - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let replace_schema = !stream_schema.eq(&output_schema); - // Rebase column indices to match the narrowed stream schema. // The projection expressions have indices based on physical_file_schema, // but the stream only contains the columns selected by the ProjectionMask. + let stream_schema = Arc::new(physical_file_schema.project(&indices)?); + let replace_schema = stream_schema != output_schema; let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - let projector = projection.make_projector(&stream_schema)?; - - let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|mut b| { - copy_arrow_reader_metrics( - &arrow_reader_metrics, - &predicate_cache_inner_records, - &predicate_cache_records, - ); - b = projector.project_batch(&b)?; - if replace_schema { - // Ensure the output batch has the expected schema. - // This handles things like schema level and field level metadata, which may not be present - // in the physical file schema. - // It is also possible for nullability to differ; some writers create files with - // OPTIONAL fields even when there are no nulls in the data. - // In these cases it may make sense for the logical schema to be `NOT NULL`. - // RecordBatch::try_new_with_options checks that if the schema is NOT NULL - // the array cannot contain nulls, amongst other checks. - let (_stream_schema, arrays, num_rows) = b.into_parts(); - let options = - RecordBatchOptions::new().with_row_count(Some(num_rows)); - RecordBatch::try_new_with_options( - Arc::clone(&output_schema), - arrays, - &options, - ) - .map_err(Into::into) - } else { - Ok(b) + let stream = futures::stream::unfold( + PushDecoderStreamState { + decoder, + reader: async_file_reader, + projector, + output_schema, + replace_schema, + arrow_reader_metrics, + predicate_cache_inner_records, + predicate_cache_records, + }, + |mut state| async move { + loop { + match state.decoder.try_decode() { + Ok(DecodeResult::NeedsData(ranges)) => { + match state.reader.get_byte_ranges(ranges.clone()).await { + Ok(data) => { + if let Err(e) = + state.decoder.push_ranges(ranges, data) + { + return Some(( + Err(DataFusionError::from(e)), + state, + )); + } + } + Err(e) => { + return Some(( + Err(DataFusionError::from(e)), + state, + )); + } + } + } + Ok(DecodeResult::Data(batch)) => { + copy_arrow_reader_metrics( + &state.arrow_reader_metrics, + &state.predicate_cache_inner_records, + &state.predicate_cache_records, + ); + let result = state.project_batch(&batch); + return Some((result, state)); + } + Ok(DecodeResult::Finished) => { + return None; + } + Err(e) => { + return Some((Err(DataFusionError::from(e)), state)); + } + } } - }) - }); + }, + ) + .fuse(); // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- if let Some(file_pruner) = file_pruner { + let boxed_stream = stream.boxed(); Ok(EarlyStoppingStream::new( - stream, + boxed_stream, file_pruner, files_ranges_pruned_statistics, ) @@ -679,6 +716,33 @@ impl FileOpener for ParquetOpener { } } +struct PushDecoderStreamState { + decoder: parquet::arrow::push_decoder::ParquetPushDecoder, + reader: Box, + projector: datafusion_physical_expr::projection::Projector, + output_schema: Arc, + replace_schema: bool, + arrow_reader_metrics: ArrowReaderMetrics, + predicate_cache_inner_records: Gauge, + predicate_cache_records: Gauge, +} + +impl PushDecoderStreamState { + fn project_batch(&self, batch: &RecordBatch) -> Result { + let mut batch = self.projector.project_batch(batch)?; + if self.replace_schema { + let (_schema, arrays, num_rows) = batch.into_parts(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + batch = RecordBatch::try_new_with_options( + Arc::clone(&self.output_schema), + arrays, + &options, + )?; + } + Ok(batch) + } +} + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion fn copy_arrow_reader_metrics( From 4a294ac43512fc0b73b772283ca29f4091148f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Mar 2026 13:10:47 +0100 Subject: [PATCH 2/5] Review 1 --- datafusion/datasource-parquet/src/opener.rs | 74 ++++++++++----------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 20f44e5402a50..b427fd13ccb00 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -655,45 +655,8 @@ impl FileOpener for ParquetOpener { predicate_cache_records, }, |mut state| async move { - loop { - match state.decoder.try_decode() { - Ok(DecodeResult::NeedsData(ranges)) => { - match state.reader.get_byte_ranges(ranges.clone()).await { - Ok(data) => { - if let Err(e) = - state.decoder.push_ranges(ranges, data) - { - return Some(( - Err(DataFusionError::from(e)), - state, - )); - } - } - Err(e) => { - return Some(( - Err(DataFusionError::from(e)), - state, - )); - } - } - } - Ok(DecodeResult::Data(batch)) => { - copy_arrow_reader_metrics( - &state.arrow_reader_metrics, - &state.predicate_cache_inner_records, - &state.predicate_cache_records, - ); - let result = state.project_batch(&batch); - return Some((result, state)); - } - Ok(DecodeResult::Finished) => { - return None; - } - Err(e) => { - return Some((Err(DataFusionError::from(e)), state)); - } - } - } + let result = state.transition().await; + result.map(|r| (r, state)) }, ) .fuse(); @@ -728,6 +691,39 @@ struct PushDecoderStreamState { } impl PushDecoderStreamState { + async fn transition(&mut self) -> Option> { + loop { + match self.decoder.try_decode() { + Ok(DecodeResult::NeedsData(ranges)) => { + match self.reader.get_byte_ranges(ranges.clone()).await { + Ok(data) => { + if let Err(e) = self.decoder.push_ranges(ranges, data) { + return Some(Err(DataFusionError::from(e))); + } + } + Err(e) => { + return Some(Err(DataFusionError::from(e))); + } + } + } + Ok(DecodeResult::Data(batch)) => { + copy_arrow_reader_metrics( + &self.arrow_reader_metrics, + &self.predicate_cache_inner_records, + &self.predicate_cache_records, + ); + return Some(self.project_batch(&batch)); + } + Ok(DecodeResult::Finished) => { + return None; + } + Err(e) => { + return Some(Err(DataFusionError::from(e))); + } + } + } + } + fn project_batch(&self, batch: &RecordBatch) -> Result { let mut batch = self.projector.project_batch(batch)?; if self.replace_schema { From acc8b49c6dae9601dc29ae6fa7096f330848676f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Mar 2026 13:13:19 +0100 Subject: [PATCH 3/5] Style --- datafusion/datasource-parquet/src/opener.rs | 49 ++++++++------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b427fd13ccb00..3806aed23560e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -695,23 +695,17 @@ impl PushDecoderStreamState { loop { match self.decoder.try_decode() { Ok(DecodeResult::NeedsData(ranges)) => { - match self.reader.get_byte_ranges(ranges.clone()).await { - Ok(data) => { - if let Err(e) = self.decoder.push_ranges(ranges, data) { - return Some(Err(DataFusionError::from(e))); - } - } - Err(e) => { - return Some(Err(DataFusionError::from(e))); - } + let fetch = async { + let data = self.reader.get_byte_ranges(ranges.clone()).await?; + self.decoder.push_ranges(ranges, data)?; + Ok::<_, parquet::errors::ParquetError>(()) + }; + if let Err(e) = fetch.await { + return Some(Err(DataFusionError::from(e))); } } Ok(DecodeResult::Data(batch)) => { - copy_arrow_reader_metrics( - &self.arrow_reader_metrics, - &self.predicate_cache_inner_records, - &self.predicate_cache_records, - ); + self.copy_arrow_reader_metrics(); return Some(self.project_batch(&batch)); } Ok(DecodeResult::Finished) => { @@ -724,6 +718,17 @@ impl PushDecoderStreamState { } } + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the + /// arrow-rs parquet reader) to the parquet file metrics for DataFusion + fn copy_arrow_reader_metrics(&self) { + if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() { + self.predicate_cache_inner_records.set(v); + } + if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() { + self.predicate_cache_records.set(v); + } + } + fn project_batch(&self, batch: &RecordBatch) -> Result { let mut batch = self.projector.project_batch(batch)?; if self.replace_schema { @@ -739,22 +744,6 @@ impl PushDecoderStreamState { } } -/// Copies metrics from ArrowReaderMetrics (the metrics collected by the -/// arrow-rs parquet reader) to the parquet file metrics for DataFusion -fn copy_arrow_reader_metrics( - arrow_reader_metrics: &ArrowReaderMetrics, - predicate_cache_inner_records: &Gauge, - predicate_cache_records: &Gauge, -) { - if let Some(v) = arrow_reader_metrics.records_read_from_inner() { - predicate_cache_inner_records.set(v); - } - - if let Some(v) = arrow_reader_metrics.records_read_from_cache() { - predicate_cache_records.set(v); - } -} - type ConstantColumns = HashMap; /// Extract constant column values from statistics, keyed by column name in the logical file schema. From 870c72ab8acd670f05a397a1db09dd2e078c79e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Mar 2026 13:34:00 +0100 Subject: [PATCH 4/5] Replace qualified with use --- datafusion/datasource-parquet/src/opener.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3806aed23560e..497146be78abe 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,11 +24,12 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Schema}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use parquet::errors::ParquetError; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -64,7 +65,7 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; +use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; @@ -680,10 +681,10 @@ impl FileOpener for ParquetOpener { } struct PushDecoderStreamState { - decoder: parquet::arrow::push_decoder::ParquetPushDecoder, + decoder: ParquetPushDecoder, reader: Box, - projector: datafusion_physical_expr::projection::Projector, - output_schema: Arc, + projector: Projector, + output_schema: Arc, replace_schema: bool, arrow_reader_metrics: ArrowReaderMetrics, predicate_cache_inner_records: Gauge, @@ -698,7 +699,7 @@ impl PushDecoderStreamState { let fetch = async { let data = self.reader.get_byte_ranges(ranges.clone()).await?; self.decoder.push_ranges(ranges, data)?; - Ok::<_, parquet::errors::ParquetError>(()) + Ok::<_, ParquetError>(()) }; if let Err(e) = fetch.await { return Some(Err(DataFusionError::from(e))); From 853d1db5e3c235ee730fdf3403cef34a6228384a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Mar 2026 13:42:00 +0100 Subject: [PATCH 5/5] Add doc comments to PushDecoderStreamState and transition method Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource-parquet/src/opener.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 497146be78abe..0d8e825a892c5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -680,6 +680,12 @@ impl FileOpener for ParquetOpener { } } +/// State for a stream that decodes a single Parquet file using a push-based decoder. +/// +/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests +/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the +/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is +/// fully consumed. struct PushDecoderStreamState { decoder: ParquetPushDecoder, reader: Box, @@ -692,6 +698,14 @@ struct PushDecoderStreamState { } impl PushDecoderStreamState { + /// Advances the decoder state machine until the next [`RecordBatch`] is + /// produced, the file is fully consumed, or an error occurs. + /// + /// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]: + /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are + /// fetched from the [`AsyncFileReader`] and fed back into the decoder. + /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. + /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). async fn transition(&mut self) -> Option> { loop { match self.decoder.try_decode() {