Skip to content

Commit c59c746

Browse files
unify parquet read plan
1 parent 13199cf commit c59c746

1 file changed

Lines changed: 77 additions & 53 deletions

File tree

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl DatafusionArrowPredicate {
137137
time: metrics::Time,
138138
) -> Result<Self> {
139139
let physical_expr =
140-
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
140+
reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?;
141141

142142
Ok(Self {
143143
physical_expr,
@@ -146,7 +146,7 @@ impl DatafusionArrowPredicate {
146146
// can properly project and filter nested structures.
147147
projection_mask: ProjectionMask::leaves(
148148
metadata.file_metadata().schema_descr(),
149-
candidate.projection.leaf_indices.iter().copied(),
149+
candidate.read_plan.leaf_indices.iter().copied(),
150150
),
151151
rows_pruned,
152152
rows_matched,
@@ -199,22 +199,22 @@ pub(crate) struct FilterCandidate {
199199
required_bytes: usize,
200200
/// Can this filter use an index (e.g. a page index) to prune rows?
201201
can_use_index: bool,
202-
/// Column indices into the parquet file schema required to evaluate this filter.
203-
projection: LeafProjection,
204-
/// The Arrow schema containing only the columns required by this filter,
205-
/// projected from the file's Arrow schema.
206-
filter_schema: SchemaRef,
202+
/// The resolved Parquet read plan (leaf indices + projected schema).
203+
read_plan: ParquetReadPlan,
207204
}
208205

209-
/// Projection specification for nested columns using Parquet leaf column indices.
206+
/// The result of resolving which Parquet leaf columns and Arrow schema fields
207+
/// are needed to evaluate an expression against a Parquet file
210208
///
211-
/// For nested types like List and Struct, Parquet stores data in leaf columns
212-
/// (the primitive fields). This struct tracks which leaf columns are needed
213-
/// to evaluate a filter expression.
209+
/// This is the shared output of the column resolution pipeline used by both
210+
/// the row fiter to build `ArrowPredicate`s and the opener to build `ProjectionMask`s
214211
#[derive(Debug, Clone)]
215-
struct LeafProjection {
216-
/// Leaf column indices in the Parquet schema descriptor.
217-
leaf_indices: Vec<usize>,
212+
pub(crate) struct ParquetReadPlan {
213+
/// Leaf column indices in the Parquet schema descriptor to decode
214+
pub leaf_indices: Vec<usize>,
215+
/// The projected Arrow schema containing only the columns/fields required
216+
/// Struct types are pruned to include only the accessed sub-fields
217+
pub projected_schema: SchemaRef,
218218
}
219219

220220
/// Helper to build a `FilterCandidate`.
@@ -246,41 +246,18 @@ impl FilterCandidateBuilder {
246246
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
247247
/// * `Err(e)` if an error occurs while building the candidate
248248
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
249-
let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)?
250-
else {
251-
return Ok(None);
252-
};
253-
254249
let schema_descr = metadata.file_metadata().schema_descr();
255-
let root_indices: Vec<_> =
256-
required_columns.required_columns.into_iter().collect();
257-
258-
let mut leaf_indices = leaf_indices_for_roots(&root_indices, schema_descr);
259-
260-
let struct_leaf_indices = resolve_struct_field_leaves(
261-
&required_columns.struct_field_accesses,
262-
&self.file_schema,
263-
schema_descr,
264-
);
265-
leaf_indices.extend_from_slice(&struct_leaf_indices);
266-
leaf_indices.sort_unstable();
267-
leaf_indices.dedup();
268-
269-
let projected_schema = build_filter_schema(
270-
&self.file_schema,
271-
&root_indices,
272-
&required_columns.struct_field_accesses,
273-
);
274-
275-
let required_bytes = size_of_columns(&leaf_indices, metadata)?;
276-
let can_use_index = columns_sorted(&leaf_indices, metadata)?;
250+
let read_plan =
251+
match build_parquet_read_plan(&self.expr, &self.file_schema, schema_descr)? {
252+
Some(plan) => plan,
253+
None => return Ok(None),
254+
};
277255

278256
Ok(Some(FilterCandidate {
279257
expr: self.expr,
280-
required_bytes,
281-
can_use_index,
282-
projection: LeafProjection { leaf_indices },
283-
filter_schema: projected_schema,
258+
required_bytes: size_of_columns(&read_plan.leaf_indices, metadata)?,
259+
can_use_index: columns_sorted(&read_plan.leaf_indices, metadata)?,
260+
read_plan,
284261
}))
285262
}
286263
}
@@ -562,17 +539,64 @@ fn pushdown_columns(
562539
Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns()))
563540
}
564541

565-
fn leaf_indices_for_roots(
566-
root_indices: &[usize],
542+
/// Resolves which Parquet leaf columns and Arrow schema fields are needed
543+
/// to evaluate `expr` against a Parquet file
544+
///
545+
/// Returns `Ok(Some(plan))` when the expression can be evaluated using only
546+
/// pushdown-compatible columns. `Ok(None)` when it can not (it references
547+
/// whole struct columns or columns missing from disk)
548+
///
549+
/// Note: this is a shared entry point used by both row filter construction and
550+
/// the opener's projection logic
551+
pub(crate) fn build_parquet_read_plan(
552+
expr: &Arc<dyn PhysicalExpr>,
553+
file_schema: &Schema,
567554
schema_descr: &SchemaDescriptor,
568-
) -> Vec<usize> {
555+
) -> Result<Option<ParquetReadPlan>> {
556+
let Some(required_columns) = pushdown_columns(expr, file_schema)? else {
557+
return Ok(None);
558+
};
559+
560+
let root_indices = &required_columns.required_columns;
561+
562+
let mut leaf_indices =
563+
leaf_indices_for_roots(root_indices.iter().copied(), schema_descr);
564+
565+
let struct_leaf_indices = resolve_struct_field_leaves(
566+
&required_columns.struct_field_accesses,
567+
file_schema,
568+
schema_descr,
569+
);
570+
leaf_indices.extend_from_slice(&struct_leaf_indices);
571+
leaf_indices.sort_unstable();
572+
leaf_indices.dedup();
573+
574+
let projected_schema = build_filter_schema(
575+
file_schema,
576+
root_indices,
577+
&required_columns.struct_field_accesses,
578+
);
579+
580+
Ok(Some(ParquetReadPlan {
581+
leaf_indices,
582+
projected_schema,
583+
}))
584+
}
585+
586+
fn leaf_indices_for_roots<I>(
587+
root_indices: I,
588+
schema_descr: &SchemaDescriptor,
589+
) -> Vec<usize>
590+
where
591+
I: IntoIterator<Item = usize>,
592+
{
569593
// Always map root (Arrow) indices to Parquet leaf indices via the schema
570594
// descriptor. Arrow root indices only equal Parquet leaf indices when the
571595
// schema has no group columns (Struct, Map, etc.); when group columns
572596
// exist, their children become separate leaves and shift all subsequent
573597
// leaf indices.
574598
// Struct columns are unsupported.
575-
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
599+
let root_set: BTreeSet<_> = root_indices.into_iter().collect();
576600

577601
(0..schema_descr.num_columns())
578602
.filter(|leaf_idx| {
@@ -983,7 +1007,7 @@ mod test {
9831007
.expect("building candidate")
9841008
.expect("list pushdown should be supported");
9851009

986-
assert_eq!(candidate.projection.leaf_indices, vec![list_index]);
1010+
assert_eq!(candidate.read_plan.leaf_indices, vec![list_index]);
9871011
}
9881012

9891013
#[test]
@@ -1441,7 +1465,7 @@ mod test {
14411465

14421466
// col_b is Parquet leaf 3 (shifted by struct_col's two children).
14431467
assert_eq!(
1444-
candidate.projection.leaf_indices,
1468+
candidate.read_plan.leaf_indices,
14451469
vec![3],
14461470
"leaf_indices should be [3] for col_b"
14471471
);
@@ -1591,7 +1615,7 @@ mod test {
15911615
// The filter accesses only s.value, so only Parquet leaf 1 is needed.
15921616
// Leaf 2 (s.label) is not read, reducing unnecessary I/O.
15931617
assert_eq!(
1594-
candidate.projection.leaf_indices,
1618+
candidate.read_plan.leaf_indices,
15951619
vec![1],
15961620
"leaf_indices should contain only the accessed struct field leaf"
15971621
);
@@ -1714,7 +1738,7 @@ mod test {
17141738

17151739
// Only s.outer.inner (leaf 2) should be projected,
17161740
assert_eq!(
1717-
candidate.projection.leaf_indices,
1741+
candidate.read_plan.leaf_indices,
17181742
vec![2],
17191743
"leaf_indices should be [2] for s.outer.inner, skipping sibling and cousin leaves"
17201744
);

0 commit comments

Comments
 (0)