Skip to content

Dev 0319 3#61495

Open
BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
BiteTheDDDDt:dev_0319_3
Open

Dev 0319 3#61495
BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
BiteTheDDDDt:dev_0319_3

Conversation

@BiteTheDDDDt
Copy link
Contributor

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

Copilot AI review requested due to automatic review settings March 18, 2026 16:06
@Thearas
Copy link
Contributor

Thearas commented Mar 18, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new “bucketed hash aggregation” optimization path that fuses local+global aggregation into a single operator for single-BE deployments, along with the required FE/BE plan and pipeline support.

Changes:

  • Add a new Thrift plan node type + payload (BUCKETED_AGGREGATION_NODE / TBucketedAggregationNode) and wire it into TPlanNode.
  • Add Nereids physical plan + translation + costing to generate and pick PhysicalBucketedHashAggregate.
  • Implement BE pipeline sink/source operators and shared-state to build per-instance bucketed hash tables and merge/output them without exchange.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
gensrc/thrift/PlanNodes.thrift Adds new plan node type and Thrift struct for bucketed aggregation
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java Adds session variables controlling the optimization and thresholds
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java Adds legacy planner node that serializes bucketed agg into Thrift
fe/fe-core/src/main/java/org/apache/doris/nereids/** Adds new physical node, visitor hooks, properties, stats, cost model, and implementation rule
be/src/exec/pipeline/pipeline_fragment_context.cpp Creates bucketed agg source/sink pipelines and registers shared state
be/src/exec/pipeline/dependency.{h,cpp} Adds BucketedAggSharedState and cleanup/destroy support
be/src/exec/operator/operator.cpp Registers new bucketed agg pipeline local states
be/src/exec/operator/bucketed_aggregation_* Implements bucketed agg sink/source operators
be/src/exec/common/hash_table/hash_map_context.h Adds reusable output buffer to hash method state
be/src/exec/common/agg_utils.h Factors agg hash-table variants and adds BucketedAggDataVariants

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +2933 to +2936
+ "消除 Exchange 开销和序列化/反序列化成本。默认关闭。",
"Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase "
+ "aggregation into a single operator on single-BE deployments, eliminating exchange overhead "
+ "and serialization/deserialization costs. Disabled by default."})
TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode();
bucketedAggNode.setGroupingExprs(groupingExprs);
bucketedAggNode.setAggregateFunctions(aggregateFunctions);
bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt());
Comment on lines +1078 to +1084
struct TBucketedAggregationNode {
1: optional list<Exprs.TExpr> grouping_exprs
2: required list<Exprs.TExpr> aggregate_functions
3: required Types.TTupleId intermediate_tuple_id
4: required Types.TTupleId output_tuple_id
5: required bool need_finalize
}
return ImmutableList.of();
}
// Only for single-BE deployments
int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true));
Comment on lines +1436 to +1437
op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
Comment on lines +1462 to +1468
for (int i = 0; i < _num_instances; i++) {
auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
"BUCKETED_AGG_SINK_DEPENDENCY");
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
}
shared_state->create_source_dependencies(_num_instances, op->operator_id(),
if (expr instanceof SlotReference) {
SlotReference slot = (SlotReference) expr;
if (slot.getOriginalColumn().isPresent()) {
groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
}
// All distribution columns must appear in the GROUP BY keys
for (Column column : distributionColumns) {
if (!groupByColumnNames.contains(column.getName().toLowerCase())) {
@hello-stephen
Copy link
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.11% (26915/36814)
Line Coverage 56.66% (288714/509559)
Region Coverage 53.86% (239706/445048)
Branch Coverage 55.60% (103754/186611)

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

1 similar comment
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Contributor Author

/review

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bucketed Hash Aggregation Review — Code Review Skill Checkpoint Analysis

Part 1.3 Critical Checkpoint Conclusions

Goal: Fuse two-phase (local + global) aggregation into a single operator for single-BE deployments using 256 per-bucket hash tables. The code achieves this goal structurally, but has critical correctness and safety issues.

Concurrency: Sink instances write to independent per_instance_data[task_idx] (safe). Source uses per-bucket CAS locking (safe). state_generation fence-based wakeup is correct. However, shared probe_expr_ctxs are called concurrently without per-instance cloning — see inline comment.

Memory Safety: get_reserve_mem_size() returns 0, completely bypassing pipeline memory reservation. With 256 hash tables × N instances, uncontrolled memory growth is possible. No spill path exists as fallback. No SCOPED_PEAK_MEM instrumentation. See inline comment.

Test Coverage: Zero tests — no regression tests, no BE unit tests, no FE unit tests for ~2700 lines of new production code across FE, BE, and Thrift.

Configuration: 3 new session variables added (enable_bucketed_hash_agg, etc.) — appropriately gated.

FE-BE Protocol: New TPlanNodeType::BUCKETED_AGGREGATION_NODE = 37 properly handled in both FE (BucketedAggregationNode) and BE (pipeline_fragment_context.cpp).

Incompatible Changes: None — new node type, feature-gated, no existing behavior modified.

Observability: Profile counters added for hash table size, memory, build/expr/compute/emplace timers. Adequate.

Other observations:

  • simple_count optimization uses reinterpret_cast<UInt64&> on AggregateDataPtr to store counters in pointer bits — correct but subtle, add a comment explaining the aliasing.
  • is_fixed_key member added to AggregatedDataVariants but never read in the diff.
  • close() properly releases held CAS locks (prevents deadlock on LIMIT-induced early termination) — good.
  • Source-side merge correctly handles the two-level fan-in from N instances × 256 buckets.

Summary: 1 Critical, 1 Significant, 1 Major gap

  1. Critical: Data race on shared probe_expr_ctxs (concurrent mutation of non-thread-safe VExprContext)
  2. Significant: Memory reservation bypassed (get_reserve_mem_size returns 0)
  3. Major gap: Zero test coverage for ~2700 lines of new code

SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(shared_state.probe_expr_ctxs[i]->execute(block, &result_column_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: Data race on shared probe_expr_ctxs

shared_state.probe_expr_ctxs[i]->execute() is called by ALL sink instances concurrently on the SAME shared VExprContext objects. VExprContext::execute() mutates _last_result_column_id (a non-atomic int) and potentially FunctionContext internal state.

The probe_expr_ctxs were cloned once in call_once (lines 88-94) into shared state, then used concurrently by all sink instances without per-instance cloning.

Note that _aggregate_evaluators are already correctly cloned per-instance (lines 113-115) with a comment explaining the same class of bug. The same treatment is needed here.

Fix: Each sink instance should clone its own probe_expr_ctxs in open() (similar to lines 113-115), and _execute_with_serialized_key should use the per-instance clones instead of shared_state.probe_expr_ctxs.

return DataDistribution(ExchangeType::NOOP);
}

size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { return 0; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Significant: Memory reservation completely bypassed

get_reserve_mem_size() returns 0, which disables the pipeline memory reservation protocol for this operator. For comparison, the existing AggSinkOperatorX::get_reserve_mem_size() returns hash_table->estimate_memory(batch_size) + _memory_usage_last_executing.

With 256 hash tables per instance × N pipeline instances, hash table resizes can cause massive uncontrolled memory growth with no back-pressure mechanism. There is also:

  • No SCOPED_PEAK_MEM instrumentation (unlike the regular agg operator)
  • No spill path as a fallback
  • No _memory_sufficient_dependency wiring (per be/src/exec/AGENTS.md requirements)

Even if spill support is deferred, the reservation protocol should still provide accurate estimates so the scheduler can apply back-pressure before OOM.

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

1 similar comment
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@hello-stephen
Copy link
Contributor

FE UT Coverage Report

Increment line coverage 34.83% (116/333) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.13% (26927/36821)
Line Coverage 56.68% (288868/509632)
Region Coverage 54.02% (240445/445129)
Branch Coverage 55.64% (103841/186634)

update plan

update simple count and ph string map

update fix

update refact

update

adjust default value

fmt

fix

fix

update

update

update
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@hello-stephen
Copy link
Contributor

FE UT Coverage Report

Increment line coverage 34.73% (116/334) 🎉
Increment coverage report
Complete coverage report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants