From dd12820e3a351c4f74961d4770afe539ee323036 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 26 May 2026 14:34:17 +0800 Subject: [PATCH 1/3] perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged ensure_distribution was unconditionally calling plan.with_new_children after collecting the (possibly redistributed) children, even when none of the children were actually replaced. For nodes like ProjectionExec, that path runs through try_new and recomputes the schema, equivalence properties, and output ordering each time, which is pure overhead when the input Arcs are identical. Compare each new child Arc with the original via Arc::ptr_eq and reuse the existing plan Arc when nothing changed. The UnionExec to InterleaveExec special case still runs first because it intentionally produces a new node. On a representative deep ProjectionExec stack (30 layers over a sorted parquet scan, no join / aggregate / unmet ordering, 5000 iterations) this brings ensure_distribution from 170.55 us/call to 59.36 us/call, a ~2.87x speedup. Profiling on a real workload dominated by point queries showed ProjectionExec::with_new_children taking 1.94s out of a 2.87s ensure_distribution slice in a 60s sample, so this is the bulk of the rule's cost on that shape. Closes #22520 --- .../enforce_distribution.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 093a1ec14b680..5b58bcfbca80c 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -1333,6 +1333,22 @@ pub fn ensure_distribution( .map(|c| Arc::clone(&c.plan)) .collect::>(); + // Skip the (often expensive) `with_new_children` rebuild when none of + // the children were actually replaced above. For nodes like + // `ProjectionExec`, `with_new_children` calls `try_new` and recomputes + // schema / equivalence properties / output ordering even when the + // input Arcs are identical. Profiling on a representative deep + // ProjectionExec stack showed `with_new_children` dominating + // `ensure_distribution` time for plans where no distribution change + // applies (point queries with no join / aggregate / unmet ordering), + // so the rebuild is wasted on the common case. + let original_children = plan.children(); + let children_unchanged = children_plans.len() == original_children.len() + && children_plans + .iter() + .zip(original_children.iter()) + .all(|(new, old)| Arc::ptr_eq(new, *old)); + plan = if plan.is::() && !config.optimizer.prefer_existing_union && can_interleave(children_plans.iter()) @@ -1361,6 +1377,10 @@ pub fn ensure_distribution( // Repartition (hash): // Data Arc::new(InterleaveExec::try_new(children_plans)?) + } else if children_unchanged { + // Children are byte-identical Arcs as before; reuse the existing + // plan node and skip the schema/ordering recomputation. + plan } else { plan.with_new_children(children_plans)? }; From b5beced674b6f0c1f36335abf3a1c05974bbc176 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 27 May 2026 11:02:33 +0800 Subject: [PATCH 2/3] test: add regression test for ensure_distribution fast path Asserts that when no child of a node is replaced (no RepartitionExec / SortExec injection required), ensure_distribution reuses the input Arc via Arc::ptr_eq instead of going through with_new_children. Guards the fast path against a future refactor re-introducing the unconditional rebuild. --- .../enforce_distribution.rs | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index fb11657107b71..426e1fa745e54 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3971,3 +3971,38 @@ fn adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> { ); Ok(()) } + +/// Verifies the `ensure_distribution` fast path: when no child of a node is +/// replaced (no `RepartitionExec` or `SortExec` injection is required), +/// the rule must reuse the input `Arc` unchanged instead +/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a +/// single-partition scan with `target_partitions = 1`, every node hits this +/// fast path, so the root returned by `ensure_distribution` must be the +/// same `Arc` as the input. +/// +/// Regression test for the optimization that avoids +/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence +/// properties, output ordering, and partitioning) on the common point-query +/// plan shape. +#[test] +fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> Result<()> { + let scan = parquet_exec(); + let proj1 = projection_exec_with_alias( + scan, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ], + ); + let proj2 = + projection_exec_with_alias(proj1, vec![("a".to_string(), "a".to_string())]); + let plan: Arc = proj2; + + let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?; + + assert!( + Arc::ptr_eq(&result, &plan), + "ensure_distribution must reuse the input Arc when no children require redistribution" + ); + Ok(()) +} From 28a92a5cd9b30ac4411e5a5dba4d2c6d44192d05 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 27 May 2026 14:17:03 +0800 Subject: [PATCH 3/3] refactor: use existing with_new_children_if_necessary helper Per reviewer suggestion in #22521, replace the inline Arc::ptr_eq + with_new_children branch with a call to the existing datafusion_physical_plan::with_new_children_if_necessary helper. Same behavior, smaller surface area, keeps the optimization in one canonical place so future call sites elsewhere in the optimizer pick it up automatically. --- .../enforce_distribution.rs | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 5b58bcfbca80c..ada7b6d741cf2 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -65,7 +65,9 @@ use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave}; use datafusion_physical_plan::windows::WindowAggExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window}; -use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning}; +use datafusion_physical_plan::{ + Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary, +}; use itertools::izip; @@ -1333,22 +1335,6 @@ pub fn ensure_distribution( .map(|c| Arc::clone(&c.plan)) .collect::>(); - // Skip the (often expensive) `with_new_children` rebuild when none of - // the children were actually replaced above. For nodes like - // `ProjectionExec`, `with_new_children` calls `try_new` and recomputes - // schema / equivalence properties / output ordering even when the - // input Arcs are identical. Profiling on a representative deep - // ProjectionExec stack showed `with_new_children` dominating - // `ensure_distribution` time for plans where no distribution change - // applies (point queries with no join / aggregate / unmet ordering), - // so the rebuild is wasted on the common case. - let original_children = plan.children(); - let children_unchanged = children_plans.len() == original_children.len() - && children_plans - .iter() - .zip(original_children.iter()) - .all(|(new, old)| Arc::ptr_eq(new, *old)); - plan = if plan.is::() && !config.optimizer.prefer_existing_union && can_interleave(children_plans.iter()) @@ -1377,12 +1363,17 @@ pub fn ensure_distribution( // Repartition (hash): // Data Arc::new(InterleaveExec::try_new(children_plans)?) - } else if children_unchanged { - // Children are byte-identical Arcs as before; reuse the existing - // plan node and skip the schema/ordering recomputation. - plan } else { - plan.with_new_children(children_plans)? + // Route through `with_new_children_if_necessary` so the common + // case where no child was replaced above skips the expensive + // `with_new_children` rebuild. For nodes like `ProjectionExec`, + // `with_new_children` recomputes schema / equivalence properties / + // output ordering via `try_new` even when the input Arcs are + // identical, which dominates `ensure_distribution` time on deep + // projection stacks over plans where no distribution change + // applies (point queries with no join / aggregate / unmet + // ordering). + with_new_children_if_necessary(plan, children_plans)? }; Ok(Transformed::yes(DistributionContext::new(