Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “bucketed hash aggregation” optimization path that fuses local+global aggregation into a single operator for single-BE deployments, along with the required FE/BE plan and pipeline support.
Changes:
- Add a new Thrift plan node type + payload (
BUCKETED_AGGREGATION_NODE/TBucketedAggregationNode) and wire it intoTPlanNode. - Add Nereids physical plan + translation + costing to generate and pick
PhysicalBucketedHashAggregate. - Implement BE pipeline sink/source operators and shared-state to build per-instance bucketed hash tables and merge/output them without exchange.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/PlanNodes.thrift | Adds new plan node type and Thrift struct for bucketed aggregation |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds session variables controlling the optimization and thresholds |
| fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java | Adds legacy planner node that serializes bucketed agg into Thrift |
| fe/fe-core/src/main/java/org/apache/doris/nereids/** | Adds new physical node, visitor hooks, properties, stats, cost model, and implementation rule |
| be/src/exec/pipeline/pipeline_fragment_context.cpp | Creates bucketed agg source/sink pipelines and registers shared state |
| be/src/exec/pipeline/dependency.{h,cpp} | Adds BucketedAggSharedState and cleanup/destroy support |
| be/src/exec/operator/operator.cpp | Registers new bucketed agg pipeline local states |
| be/src/exec/operator/bucketed_aggregation_* | Implements bucketed agg sink/source operators |
| be/src/exec/common/hash_table/hash_map_context.h | Adds reusable output buffer to hash method state |
| be/src/exec/common/agg_utils.h | Factors agg hash-table variants and adds BucketedAggDataVariants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| + "消除 Exchange 开销和序列化/反序列化成本。默认关闭。", | ||
| "Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase " | ||
| + "aggregation into a single operator on single-BE deployments, eliminating exchange overhead " | ||
| + "and serialization/deserialization costs. Disabled by default."}) |
| TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode(); | ||
| bucketedAggNode.setGroupingExprs(groupingExprs); | ||
| bucketedAggNode.setAggregateFunctions(aggregateFunctions); | ||
| bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt()); |
| struct TBucketedAggregationNode { | ||
| 1: optional list<Exprs.TExpr> grouping_exprs | ||
| 2: required list<Exprs.TExpr> aggregate_functions | ||
| 3: required Types.TTupleId intermediate_tuple_id | ||
| 4: required Types.TTupleId output_tuple_id | ||
| 5: required bool need_finalize | ||
| } |
| return ImmutableList.of(); | ||
| } | ||
| // Only for single-BE deployments | ||
| int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); |
| op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs); | ||
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| for (int i = 0; i < _num_instances; i++) { | ||
| auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), | ||
| "BUCKETED_AGG_SINK_DEPENDENCY"); | ||
| sink_dep->set_shared_state(shared_state.get()); | ||
| shared_state->sink_deps.push_back(sink_dep); | ||
| } | ||
| shared_state->create_source_dependencies(_num_instances, op->operator_id(), |
| if (expr instanceof SlotReference) { | ||
| SlotReference slot = (SlotReference) expr; | ||
| if (slot.getOriginalColumn().isPresent()) { | ||
| groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); |
| } | ||
| // All distribution columns must appear in the GROUP BY keys | ||
| for (Column column : distributionColumns) { | ||
| if (!groupByColumnNames.contains(column.getName().toLowerCase())) { |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
1 similar comment
|
run buildall |
|
/review |
There was a problem hiding this comment.
Bucketed Hash Aggregation Review — Code Review Skill Checkpoint Analysis
Part 1.3 Critical Checkpoint Conclusions
Goal: Fuse two-phase (local + global) aggregation into a single operator for single-BE deployments using 256 per-bucket hash tables. The code achieves this goal structurally, but has critical correctness and safety issues.
Concurrency: Sink instances write to independent per_instance_data[task_idx] (safe). Source uses per-bucket CAS locking (safe). state_generation fence-based wakeup is correct. However, shared probe_expr_ctxs are called concurrently without per-instance cloning — see inline comment.
Memory Safety: get_reserve_mem_size() returns 0, completely bypassing pipeline memory reservation. With 256 hash tables × N instances, uncontrolled memory growth is possible. No spill path exists as fallback. No SCOPED_PEAK_MEM instrumentation. See inline comment.
Test Coverage: Zero tests — no regression tests, no BE unit tests, no FE unit tests for ~2700 lines of new production code across FE, BE, and Thrift.
Configuration: 3 new session variables added (enable_bucketed_hash_agg, etc.) — appropriately gated.
FE-BE Protocol: New TPlanNodeType::BUCKETED_AGGREGATION_NODE = 37 properly handled in both FE (BucketedAggregationNode) and BE (pipeline_fragment_context.cpp).
Incompatible Changes: None — new node type, feature-gated, no existing behavior modified.
Observability: Profile counters added for hash table size, memory, build/expr/compute/emplace timers. Adequate.
Other observations:
simple_countoptimization usesreinterpret_cast<UInt64&>onAggregateDataPtrto store counters in pointer bits — correct but subtle, add a comment explaining the aliasing.is_fixed_keymember added toAggregatedDataVariantsbut never read in the diff.close()properly releases held CAS locks (prevents deadlock on LIMIT-induced early termination) — good.- Source-side merge correctly handles the two-level fan-in from N instances × 256 buckets.
Summary: 1 Critical, 1 Significant, 1 Major gap
- Critical: Data race on shared
probe_expr_ctxs(concurrent mutation of non-thread-safeVExprContext) - Significant: Memory reservation bypassed (
get_reserve_mem_sizereturns 0) - Major gap: Zero test coverage for ~2700 lines of new code
| SCOPED_TIMER(_expr_timer); | ||
| for (size_t i = 0; i < key_size; ++i) { | ||
| int result_column_id = -1; | ||
| RETURN_IF_ERROR(shared_state.probe_expr_ctxs[i]->execute(block, &result_column_id)); |
There was a problem hiding this comment.
Critical: Data race on shared probe_expr_ctxs
shared_state.probe_expr_ctxs[i]->execute() is called by ALL sink instances concurrently on the SAME shared VExprContext objects. VExprContext::execute() mutates _last_result_column_id (a non-atomic int) and potentially FunctionContext internal state.
The probe_expr_ctxs were cloned once in call_once (lines 88-94) into shared state, then used concurrently by all sink instances without per-instance cloning.
Note that _aggregate_evaluators are already correctly cloned per-instance (lines 113-115) with a comment explaining the same class of bug. The same treatment is needed here.
Fix: Each sink instance should clone its own probe_expr_ctxs in open() (similar to lines 113-115), and _execute_with_serialized_key should use the per-instance clones instead of shared_state.probe_expr_ctxs.
| return DataDistribution(ExchangeType::NOOP); | ||
| } | ||
|
|
||
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { return 0; } |
There was a problem hiding this comment.
Significant: Memory reservation completely bypassed
get_reserve_mem_size() returns 0, which disables the pipeline memory reservation protocol for this operator. For comparison, the existing AggSinkOperatorX::get_reserve_mem_size() returns hash_table->estimate_memory(batch_size) + _memory_usage_last_executing.
With 256 hash tables per instance × N pipeline instances, hash table resizes can cause massive uncontrolled memory growth with no back-pressure mechanism. There is also:
- No
SCOPED_PEAK_MEMinstrumentation (unlike the regular agg operator) - No spill path as a fallback
- No
_memory_sufficient_dependencywiring (perbe/src/exec/AGENTS.mdrequirements)
Even if spill support is deferred, the reservation protocol should still provide accurate estimates so the scheduler can apply back-pressure before OOM.
5e132d9 to
5773305
Compare
|
run buildall |
1 similar comment
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
update plan update simple count and ph string map update fix update refact update adjust default value fmt fix fix update update update
15e1a5a to
142e1da
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)