Skip to content

Commit 8d7da46

Browse files
use leaf-level projection mask for parquet projections using ParquetReadPlan
1 parent b6e54b3 commit 8d7da46

2 files changed

Lines changed: 61 additions & 3 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! [`ParquetOpener`] for opening Parquet files
1919
2020
use crate::page_filter::PagePruningAccessPlanFilter;
21+
use crate::row_filter::build_projection_read_plan;
2122
use crate::row_group_filter::RowGroupAccessPlanFilter;
2223
use crate::{
2324
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -621,9 +622,16 @@ impl FileOpener for ParquetOpener {
621622
// metrics from the arrow reader itself
622623
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
623624

624-
let indices = projection.column_indices();
625-
let mask =
626-
ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone());
625+
let read_plan = build_projection_read_plan(
626+
projection.expr_iter(),
627+
&physical_file_schema,
628+
reader_metadata.parquet_schema(),
629+
);
630+
631+
let mask = ProjectionMask::leaves(
632+
reader_metadata.parquet_schema(),
633+
read_plan.leaf_indices.iter().copied(),
634+
);
627635

628636
let decoder = builder
629637
.with_projection(mask)

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,56 @@ pub(crate) fn build_parquet_read_plan(
583583
}))
584584
}
585585

586+
/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions
587+
///
588+
/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and
589+
/// returns `None` when an expression references unsupported nested types or
590+
/// missing columns), this function always succeeds. It collects every column
591+
/// that *can* be resolved in the file and produces a leaf-level projection
592+
/// mask. Columns missing from the file are silently skipped since the projection
593+
/// layer handles those by inserting nulls.
594+
pub(crate) fn build_projection_read_plan(
595+
exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
596+
file_schema: &Schema,
597+
schema_descr: &SchemaDescriptor,
598+
) -> ParquetReadPlan {
599+
let mut all_root_indices = Vec::new();
600+
let mut all_struct_accesses = Vec::new();
601+
602+
for expr in exprs {
603+
let mut checker = PushdownChecker::new(file_schema, true);
604+
let _ = expr.visit(&mut checker);
605+
let columns = checker.into_sorted_columns();
606+
607+
all_root_indices.extend_from_slice(&columns.required_columns);
608+
all_struct_accesses.extend(columns.struct_field_accesses);
609+
}
610+
611+
all_root_indices.sort_unstable();
612+
all_root_indices.dedup();
613+
614+
let leaf_indices = {
615+
let mut out =
616+
leaf_indices_for_roots(all_root_indices.iter().copied(), schema_descr);
617+
let struct_leaf_indices =
618+
resolve_struct_field_leaves(&all_struct_accesses, file_schema, schema_descr);
619+
620+
out.extend_from_slice(&struct_leaf_indices);
621+
out.sort_unstable();
622+
out.dedup();
623+
624+
out
625+
};
626+
627+
let projected_schema =
628+
build_filter_schema(file_schema, &all_root_indices, &all_struct_accesses);
629+
630+
ParquetReadPlan {
631+
leaf_indices,
632+
projected_schema,
633+
}
634+
}
635+
586636
fn leaf_indices_for_roots<I>(
587637
root_indices: I,
588638
schema_descr: &SchemaDescriptor,

0 commit comments

Comments
 (0)