diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 108e8c5752017..0d8e825a892c5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,11 +24,12 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Schema}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use parquet::errors::ParquetError; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -56,13 +57,15 @@ use crate::sort::reverse_row_selection; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{Stream, StreamExt, TryStreamExt, ready}; +use futures::{Stream, StreamExt, ready}; use log::debug; +use parquet::DecodeResult; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; @@ -167,17 +170,6 @@ impl PreparedAccessPlan { Ok(self) } - - /// Apply this access plan to a ParquetRecordBatchStreamBuilder - fn apply_to_builder( - self, - mut builder: ParquetRecordBatchStreamBuilder>, - ) -> ParquetRecordBatchStreamBuilder> { - if let Some(row_selection) = self.row_selection { - builder = builder.with_row_selection(row_selection); - } - builder.with_row_groups(self.row_group_indexes) - } } impl FileOpener for ParquetOpener { @@ -267,6 +259,9 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let parquet_file_reader_factory = Arc::clone(&self.parquet_file_reader_factory); + let partition_index = self.partition_index; + let metrics = self.metrics.clone(); let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -444,57 +439,14 @@ impl FileOpener for ParquetOpener { metadata_timer.stop(); - // --------------------------------------------------------- - // Step: construct builder for the final RecordBatch stream - // --------------------------------------------------------- - - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - async_file_reader, - reader_metadata, - ); - - // --------------------------------------------------------------------- - // Step: optionally add row filter to the builder - // - // Row filter is used for late materialization in parquet decoding, see - // `row_filter` for details. - // --------------------------------------------------------------------- - - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; - }; - if force_filter_selections { - builder = - builder.with_row_selection_policy(RowSelectionPolicy::Selectors); - } - // ------------------------------------------------------------ // Step: prune row groups by range, predicate and bloom filter // ------------------------------------------------------------ // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query - let file_metadata = Arc::clone(builder.metadata()); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let file_metadata = Arc::clone(reader_metadata.metadata()); + let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read let access_plan = @@ -506,13 +458,13 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { + if let Some(pruning_pred) = pruning_pred.as_ref() { if enable_row_group_stats_pruning { row_groups.prune_by_statistics( &physical_file_schema, - builder.parquet_schema(), + reader_metadata.parquet_schema(), rg_metadata, - predicate, + pruning_pred, &file_metrics, ); } else { @@ -524,11 +476,27 @@ impl FileOpener for ParquetOpener { } if enable_bloom_filter && !row_groups.is_empty() { + // Use the existing reader for bloom filter I/O; + // replace with a fresh reader for decoding below. + let bf_reader = std::mem::replace( + &mut async_file_reader, + parquet_file_reader_factory.create_reader( + partition_index, + partitioned_file.clone(), + metadata_size_hint, + &metrics, + )?, + ); + let mut bf_builder = + ParquetRecordBatchStreamBuilder::new_with_metadata( + bf_reader, + reader_metadata.clone(), + ); row_groups .prune_by_bloom_filters( &physical_file_schema, - &mut builder, - predicate, + &mut bf_builder, + pruning_pred, &file_metrics, ) .await; @@ -570,7 +538,7 @@ impl FileOpener for ParquetOpener { access_plan = p.prune_plan_with_page_index( access_plan, &physical_file_schema, - builder.parquet_schema(), + reader_metadata.parquet_schema(), file_metadata.as_ref(), &file_metrics, ); @@ -588,8 +556,59 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + if prepared_plan.row_group_indexes.is_empty() { + return Ok(futures::stream::empty().boxed()); + } + + // --------------------------------------------------------- + // Step: construct builder for the final RecordBatch stream + // --------------------------------------------------------- + + let mut builder = + ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone()) + .with_batch_size(batch_size); + + // --------------------------------------------------------------------- + // Step: optionally add row filter to the builder + // + // Row filter is used for late materialization in parquet decoding, see + // `row_filter` for details. + // --------------------------------------------------------------------- + + // Filter pushdown: evaluate predicates during scan + if let Some(predicate) = + pushdown_filters.then_some(predicate.as_ref()).flatten() + { + let row_filter = row_filter::build_row_filter( + predicate, + &physical_file_schema, + file_metadata.as_ref(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{predicate:?}': {e}" + ); + } + }; + }; + if force_filter_selections { + builder = + builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } + // Apply the prepared plan to the builder - builder = prepared_plan.apply_to_builder(builder); + if let Some(row_selection) = prepared_plan.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder = builder.with_row_groups(prepared_plan.row_group_indexes); if let Some(limit) = limit { builder = builder.with_limit(limit) @@ -603,11 +622,11 @@ impl FileOpener for ParquetOpener { let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + let mask = + ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone()); - let stream = builder + let decoder = builder .with_projection(mask) - .with_batch_size(batch_size) .with_metrics(arrow_reader_metrics.clone()) .build()?; @@ -617,57 +636,39 @@ impl FileOpener for ParquetOpener { file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - let stream_schema = Arc::clone(stream.schema()); - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let replace_schema = !stream_schema.eq(&output_schema); - // Rebase column indices to match the narrowed stream schema. // The projection expressions have indices based on physical_file_schema, // but the stream only contains the columns selected by the ProjectionMask. + let stream_schema = Arc::new(physical_file_schema.project(&indices)?); + let replace_schema = stream_schema != output_schema; let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - let projector = projection.make_projector(&stream_schema)?; - - let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|mut b| { - copy_arrow_reader_metrics( - &arrow_reader_metrics, - &predicate_cache_inner_records, - &predicate_cache_records, - ); - b = projector.project_batch(&b)?; - if replace_schema { - // Ensure the output batch has the expected schema. - // This handles things like schema level and field level metadata, which may not be present - // in the physical file schema. - // It is also possible for nullability to differ; some writers create files with - // OPTIONAL fields even when there are no nulls in the data. - // In these cases it may make sense for the logical schema to be `NOT NULL`. - // RecordBatch::try_new_with_options checks that if the schema is NOT NULL - // the array cannot contain nulls, amongst other checks. - let (_stream_schema, arrays, num_rows) = b.into_parts(); - let options = - RecordBatchOptions::new().with_row_count(Some(num_rows)); - RecordBatch::try_new_with_options( - Arc::clone(&output_schema), - arrays, - &options, - ) - .map_err(Into::into) - } else { - Ok(b) - } - }) - }); + let stream = futures::stream::unfold( + PushDecoderStreamState { + decoder, + reader: async_file_reader, + projector, + output_schema, + replace_schema, + arrow_reader_metrics, + predicate_cache_inner_records, + predicate_cache_records, + }, + |mut state| async move { + let result = state.transition().await; + result.map(|r| (r, state)) + }, + ) + .fuse(); // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- if let Some(file_pruner) = file_pruner { + let boxed_stream = stream.boxed(); Ok(EarlyStoppingStream::new( - stream, + boxed_stream, file_pruner, files_ranges_pruned_statistics, ) @@ -679,19 +680,82 @@ impl FileOpener for ParquetOpener { } } -/// Copies metrics from ArrowReaderMetrics (the metrics collected by the -/// arrow-rs parquet reader) to the parquet file metrics for DataFusion -fn copy_arrow_reader_metrics( - arrow_reader_metrics: &ArrowReaderMetrics, - predicate_cache_inner_records: &Gauge, - predicate_cache_records: &Gauge, -) { - if let Some(v) = arrow_reader_metrics.records_read_from_inner() { - predicate_cache_inner_records.set(v); +/// State for a stream that decodes a single Parquet file using a push-based decoder. +/// +/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests +/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the +/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is +/// fully consumed. +struct PushDecoderStreamState { + decoder: ParquetPushDecoder, + reader: Box, + projector: Projector, + output_schema: Arc, + replace_schema: bool, + arrow_reader_metrics: ArrowReaderMetrics, + predicate_cache_inner_records: Gauge, + predicate_cache_records: Gauge, +} + +impl PushDecoderStreamState { + /// Advances the decoder state machine until the next [`RecordBatch`] is + /// produced, the file is fully consumed, or an error occurs. + /// + /// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]: + /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are + /// fetched from the [`AsyncFileReader`] and fed back into the decoder. + /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. + /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). + async fn transition(&mut self) -> Option> { + loop { + match self.decoder.try_decode() { + Ok(DecodeResult::NeedsData(ranges)) => { + let fetch = async { + let data = self.reader.get_byte_ranges(ranges.clone()).await?; + self.decoder.push_ranges(ranges, data)?; + Ok::<_, ParquetError>(()) + }; + if let Err(e) = fetch.await { + return Some(Err(DataFusionError::from(e))); + } + } + Ok(DecodeResult::Data(batch)) => { + self.copy_arrow_reader_metrics(); + return Some(self.project_batch(&batch)); + } + Ok(DecodeResult::Finished) => { + return None; + } + Err(e) => { + return Some(Err(DataFusionError::from(e))); + } + } + } } - if let Some(v) = arrow_reader_metrics.records_read_from_cache() { - predicate_cache_records.set(v); + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the + /// arrow-rs parquet reader) to the parquet file metrics for DataFusion + fn copy_arrow_reader_metrics(&self) { + if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() { + self.predicate_cache_inner_records.set(v); + } + if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() { + self.predicate_cache_records.set(v); + } + } + + fn project_batch(&self, batch: &RecordBatch) -> Result { + let mut batch = self.projector.project_batch(batch)?; + if self.replace_schema { + let (_schema, arrays, num_rows) = batch.into_parts(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + batch = RecordBatch::try_new_with_options( + Arc::clone(&self.output_schema), + arrays, + &options, + )?; + } + Ok(batch) } }