diff --git a/be/src/exec/common/agg_utils.h b/be/src/exec/common/agg_utils.h index fada1b49e30095..b613a68a5ab09f 100644 --- a/be/src/exec/common/agg_utils.h +++ b/be/src/exec/common/agg_utils.h @@ -47,13 +47,20 @@ using AggregatedDataWithNullableUInt32KeyPhase2 = using AggregatedDataWithNullableUInt64KeyPhase2 = DataWithNullKey; using AggregatedDataWithNullableShortStringKey = DataWithNullKey; - -using AggregatedMethodVariants = std::variant< +using AggregatedDataWithNullableStringKey = DataWithNullKey; + +/// Parameterized method variant for aggregation hash tables. +/// StringData / NullableStringData control which hash map is used for string keys: +/// - AggregatedDataVariants uses StringHashMap (AggregatedDataWithShortStringKey) +/// - BucketedAggDataVariants uses PHHashMap (AggregatedDataWithStringKey) +/// to avoid StringHashMap's sub-table complexity and unify the emplace interface. +template +using AggMethodVariantsBase = std::variant< std::monostate, MethodSerialized, MethodOneNumber>, MethodOneNumber>, MethodOneNumber>, MethodOneNumber>, - MethodStringNoCache, - MethodOneNumber>, MethodOneNumber>, + MethodStringNoCache, MethodOneNumber>, + MethodOneNumber>, MethodOneNumber, MethodOneNumber, MethodSingleNullableColumn>>, @@ -66,89 +73,134 @@ using AggregatedMethodVariants = std::variant< MethodOneNumber>, MethodSingleNullableColumn>>, MethodSingleNullableColumn>>, - MethodSingleNullableColumn>, + MethodSingleNullableColumn>, MethodKeysFixed>, MethodKeysFixed>, MethodKeysFixed>, MethodKeysFixed>, MethodKeysFixed>, MethodKeysFixed>, MethodKeysFixed>>; -struct AggregatedDataVariants - : public DataVariants { - AggregatedDataWithoutKey without_key = nullptr; - - void init(const std::vector& data_types, HashKeyType type) { +using AggregatedMethodVariants = AggMethodVariantsBase; + +/// Bucketed agg uses PHHashMap for string keys instead of StringHashMap. +/// This avoids StringHashMap's sub-table complexity and unifies the emplace interface +/// (3-arg PHHashMap::emplace), while still using HashMethodString for correct +/// single-column string key extraction. +using BucketedAggMethodVariants = + AggMethodVariantsBase; + +/// Intermediate base that adds the shared init logic for aggregation data +/// variants. Only the string_key case differs between AggregatedDataVariants +/// and BucketedAggDataVariants; all other key types are identical. The +/// StringData/NullableStringData template parameters control which hash map +/// type is emplaced for string_key. +template +struct AggDataVariantsBase : public DataVariants { + void init_agg_data(const std::vector& data_types, HashKeyType type) { bool nullable = data_types.size() == 1 && data_types[0]->is_nullable(); switch (type) { case HashKeyType::without_key: break; case HashKeyType::serialized: - method_variant.emplace>(); + this->method_variant.template emplace>(); break; case HashKeyType::int8_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::int16_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::int32_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::int32_key_phase2: - emplace_single(nullable); + this->template emplace_single(nullable); break; case HashKeyType::int64_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::int64_key_phase2: - emplace_single(nullable); + this->template emplace_single(nullable); break; case HashKeyType::int128_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::int256_key: - emplace_single>(nullable); + this->template emplace_single>(nullable); break; case HashKeyType::string_key: if (nullable) { - method_variant.emplace>>(); + this->method_variant.template emplace< + MethodSingleNullableColumn>>(); } else { - method_variant.emplace>(); + this->method_variant.template emplace>(); } break; case HashKeyType::fixed64: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed72: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed96: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed104: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed128: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed136: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; case HashKeyType::fixed256: - method_variant.emplace>>(get_key_sizes(data_types)); + this->method_variant.template emplace>>( + get_key_sizes(data_types)); break; default: - throw Exception(ErrorCode::INTERNAL_ERROR, - "AggregatedDataVariants meet invalid key type, type={}", type); + throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid agg key type, type={}", type); } } }; +struct AggregatedDataVariants + : public AggDataVariantsBase { + AggregatedDataWithoutKey without_key = nullptr; + + bool is_fixed_key = true; + + void init(const std::vector& data_types, HashKeyType type) { + is_fixed_key = !(type == HashKeyType::without_key || type == HashKeyType::EMPTY || + type == HashKeyType::serialized || type == HashKeyType::string_key); + this->init_agg_data(data_types, type); + } +}; + using AggregatedDataVariantsUPtr = std::unique_ptr; using ArenaUPtr = std::unique_ptr; +/// Data variants for bucketed hash aggregation. +/// Uses BucketedAggMethodVariants (PHHashMap for string keys). +struct BucketedAggDataVariants + : public AggDataVariantsBase { + void init(const std::vector& data_types, HashKeyType type) { + this->init_agg_data(data_types, type); + } +}; + +using BucketedAggDataVariantsUPtr = std::unique_ptr; + struct AggregateDataContainer { public: AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states) diff --git a/be/src/exec/common/hash_table/hash_map_context.h b/be/src/exec/common/hash_table/hash_map_context.h index ad0763162e101c..ad2de9f4dae4f4 100644 --- a/be/src/exec/common/hash_table/hash_map_context.h +++ b/be/src/exec/common/hash_table/hash_map_context.h @@ -54,6 +54,11 @@ struct MethodBaseInner { Arena arena; DorisVector hash_values; + /// Reusable buffer for source-side output iteration to avoid per-batch + /// heap allocation of std::vector. Callers use resize() + direct + /// element assignment, so the capacity is retained across batches. + std::vector output_keys; + // use in join case DorisVector bucket_nums; diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp new file mode 100644 index 00000000000000..16027dce7efcdd --- /dev/null +++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp @@ -0,0 +1,519 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/operator/bucketed_aggregation_sink_operator.h" + +#include +#include + +#include "common/status.h" +#include "exec/common/hash_table/hash.h" +#include "exec/operator/operator.h" +#include "exprs/vectorized_agg_fn.h" +#include "runtime/runtime_profile.h" +#include "runtime/thread_context.h" + +namespace doris { +#include "common/compile_check_begin.h" + +BucketedAggSinkLocalState::BucketedAggSinkLocalState(DataSinkOperatorXBase* parent, + RuntimeState* state) + : Base(parent, state) {} + +Status BucketedAggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_init_timer); + + _instance_idx = info.task_idx; + + // Sink dependencies start as ready=false by default. We must explicitly + // set them to ready so the pipeline task can execute (call sink()). + // This follows the same pattern as HashJoinBuildSinkLocalState::init(). + _dependency->set_ready(); + + _hash_table_size_counter = ADD_COUNTER(custom_profile(), "HashTableSize", TUnit::UNIT); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); + + _build_timer = ADD_TIMER(Base::custom_profile(), "BuildTime"); + _expr_timer = ADD_TIMER(Base::custom_profile(), "ExprTime"); + _hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime"); + _hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime"); + _hash_table_input_counter = + ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT); + _memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", TUnit::BYTES); + + return Status::OK(); +} + +Status BucketedAggSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_open_timer); + RETURN_IF_ERROR(Base::open(state)); + + auto& p = Base::_parent->template cast(); + auto& shared_state = *Base::_shared_state; + + // Initialize per-instance data and shared metadata. Multiple sink instances call open() + // concurrently, so all shared-state writes must be inside call_once to avoid data races. + Status init_status; + shared_state.init_instances(state->task_num(), [&]() { + // Copy metadata to shared state (once, from the first instance to reach here). + shared_state.align_aggregate_states = p._align_aggregate_states; + shared_state.total_size_of_aggregate_states = p._total_size_of_aggregate_states; + shared_state.offsets_of_aggregate_states = p._offsets_of_aggregate_states; + shared_state.make_nullable_keys = p._make_nullable_keys; + + shared_state.probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); + for (size_t i = 0; i < shared_state.probe_expr_ctxs.size(); i++) { + auto st = p._probe_expr_ctxs[i]->clone(state, shared_state.probe_expr_ctxs[i]); + if (!st) { + init_status = st; + return; + } + } + + for (auto& evaluator : p._aggregate_evaluators) { + shared_state.aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); + } + + // Detect simple_count: exactly one COUNT(*) with no args, with GROUP BY present. + // Bucketed agg always has GROUP BY (without-key not supported). + if (p._aggregate_evaluators.size() == 1 && + p._aggregate_evaluators[0]->function()->get_name() == "count" && + p._aggregate_evaluators[0]->function()->get_argument_types().empty()) { + shared_state.use_simple_count = true; + } + }); + RETURN_IF_ERROR(init_status); + + // Now safe to access per_instance_data since init_instances has been called. + auto& inst = shared_state.per_instance_data[_instance_idx]; + _arena = inst.arena.get(); + + // Clone probe expression contexts for this sink instance. Each instance needs + // its own VExprContext because VExprContext::execute() mutates _last_result_column_id + // and FunctionContext internal state, causing data races when multiple sink + // instances call execute() concurrently on shared VExprContext objects. + _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); + for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { + RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); + } + + // Clone aggregate evaluators for this sink instance. Each instance needs + // its own evaluators because AggFnEvaluator::_calc_argument_columns() + // mutates internal state (_agg_columns), causing data races when multiple + // sink instances call execute_batch_add() concurrently on shared evaluators. + for (auto& evaluator : p._aggregate_evaluators) { + _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); + } + + // Set up _bucket_agg_data as 256 pointers into this instance's bucket hash tables. + _bucket_agg_data.resize(BUCKETED_AGG_NUM_BUCKETS); + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + _bucket_agg_data[b] = inst.bucket_agg_data[b].get(); + } + + // Initialize hash method for all 256 bucket hash tables. Each bucket uses + // the same hash key type. We use per-instance _probe_expr_ctxs here (read-only + // for data types), which is safe since they were cloned above. + RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs)); + + return Status::OK(); +} + +Status BucketedAggSinkLocalState::_create_agg_status(AggregateDataPtr data) { + auto& shared_state = *Base::_shared_state; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + try { + _aggregate_evaluators[i]->create(data + shared_state.offsets_of_aggregate_states[i]); + } catch (...) { + for (int j = 0; j < i; ++j) { + _aggregate_evaluators[j]->destroy(data + + shared_state.offsets_of_aggregate_states[j]); + } + throw; + } + } + return Status::OK(); +} + +Status BucketedAggSinkLocalState::_destroy_agg_status(AggregateDataPtr data) { + auto& shared_state = *Base::_shared_state; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->destroy(data + + shared_state.offsets_of_aggregate_states[i]); + } + return Status::OK(); +} + +Status BucketedAggSinkLocalState::_execute_with_serialized_key(Block* block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_probe_expr_ctxs.empty()); + _memory_usage_last_executing = 0; + SCOPED_PEAK_MEM(&_memory_usage_last_executing); + + auto& shared_state = *Base::_shared_state; + auto& p = Base::_parent->template cast(); + size_t key_size = _probe_expr_ctxs.size(); + ColumnRawPtrs key_columns(key_size); + + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id)); + block->get_by_position(result_column_id).column = + block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = block->get_by_position(result_column_id).column.get(); + key_columns[i]->assume_mutable()->replace_float_special_values(); + } + } + + auto rows = (uint32_t)block->rows(); + if (_places.size() < rows) { + _places.resize(rows); + } + + _emplace_into_hash_table(_places.data(), key_columns, rows); + + if (!shared_state.use_simple_count) { + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + block, p._offsets_of_aggregate_states[i], _places.data(), *_arena)); + } + } + + return Status::OK(); +} + +void BucketedAggSinkLocalState::_emplace_into_hash_table(AggregateDataPtr* places, + ColumnRawPtrs& key_columns, + uint32_t num_rows) { + auto& p = Base::_parent->template cast(); + + // Use bucket 0's method to compute keys and hash values for all rows. + // All 256 buckets use the same hash key type, so the hash function is identical. + std::visit( + Overload {[&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + using HashMethodType = std::decay_t; + using AggState = typename HashMethodType::State; + AggState state(key_columns); + + { + SCOPED_TIMER(_hash_table_compute_timer); + agg_method.init_serialized_keys(key_columns, num_rows); + } + + const bool use_simple_count = Base::_shared_state->use_simple_count; + + auto creator = [this, &p, use_simple_count](const auto& ctor, auto& key, + auto& origin) { + HashMethodType::try_presis_key_and_origin(key, origin, *_arena); + if (use_simple_count) { + AggregateDataPtr mapped = nullptr; + ctor(key, mapped); + } else { + auto mapped = + _arena->aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + ctor(key, mapped); + } + }; + + auto creator_for_null_key = [this, &p, use_simple_count](auto& mapped) { + if (use_simple_count) { + mapped = nullptr; + } else { + mapped = _arena->aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + } + }; + + { + SCOPED_TIMER(_hash_table_emplace_timer); + + // Phase 1: Pre-group rows by bucket index. + // Single pass over hash_values to partition row indices + // into 256 bucket groups. This converts random 256-way scatter + // into sequential per-bucket batches with good cache locality. + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + _bucket_row_indices[b].clear(); + } + for (uint32_t i = 0; i < num_rows; ++i) { + int bucket = static_cast((agg_method.hash_values[i] >> 24) & + 0xFF); + _bucket_row_indices[bucket].push_back(i); + } + + // Phase 2: Process each bucket's rows as a batch. + // All rows in a bucket hit the same hash table, enabling + // effective prefetch and cache-friendly access patterns. + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + auto& indices = _bucket_row_indices[b]; + if (indices.empty()) { + continue; + } + + auto& bucket_method = std::get( + _bucket_agg_data[b]->method_variant); + auto& bucket_ht = *bucket_method.hash_table; + + // Batch emplace with prefetch into this bucket's hash table. + for (size_t j = 0; j < indices.size(); ++j) { + // Prefetch ahead within this bucket's hash table. + if (j + HASH_MAP_PREFETCH_DIST < indices.size()) { + uint32_t prefetch_row = + indices[j + HASH_MAP_PREFETCH_DIST]; + bucket_ht.template prefetch( + agg_method.keys[prefetch_row], + agg_method.hash_values[prefetch_row]); + } + uint32_t row = indices[j]; + auto* mapped = state.lazy_emplace_key( + bucket_ht, row, agg_method.keys[row], + agg_method.hash_values[row], creator, + creator_for_null_key); + if (use_simple_count) { + ++reinterpret_cast(*mapped); + } else { + places[row] = *mapped; + } + } + } + } + + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + }}, + _bucket_agg_data[0]->method_variant); +} + +size_t BucketedAggSinkLocalState::_get_hash_table_size() const { + size_t total = 0; + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + total += std::visit(Overload {[&](std::monostate& arg) -> size_t { return 0; }, + [&](auto& agg_method) -> size_t { + return agg_method.hash_table->size(); + }}, + _bucket_agg_data[b]->method_variant); + } + return total; +} + +Status BucketedAggSinkLocalState::_init_hash_method(const VExprContextSPtrs& probe_exprs) { + // Initialize all 256 bucket hash tables with the same hash key type. + auto data_types = get_data_types(probe_exprs); + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + RETURN_IF_ERROR(init_hash_method(_bucket_agg_data[b], data_types, + true /* is_first_phase */)); + } + return Status::OK(); +} + +void BucketedAggSinkLocalState::_update_memusage() { + int64_t hash_table_memory_usage = 0; + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + std::visit(Overload {[&](std::monostate& arg) -> void {}, + [&](auto& agg_method) -> void { + hash_table_memory_usage += + agg_method.hash_table->get_buffer_size_in_bytes(); + }}, + _bucket_agg_data[b]->method_variant); + } + int64_t arena_memory_usage = _arena->size(); + + COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); + COUNTER_SET(_memory_usage_arena, arena_memory_usage); + COUNTER_SET(_memory_used_counter, hash_table_memory_usage + arena_memory_usage); +} + +size_t BucketedAggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) const { + // Estimate the memory needed for the next batch across all 256 bucket hash tables. + // Each bucket's hash table may need to resize, so we take the max estimate across + // all buckets and multiply conservatively. In practice, data is distributed across + // buckets, so the per-bucket growth is batch_size/256 on average, but we use the + // full batch_size estimate from the largest bucket for safety. + size_t size_to_reserve = 0; + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + size_to_reserve += std::visit( + Overload {[&](std::monostate& arg) -> size_t { return 0; }, + [&](auto& agg_method) -> size_t { + // estimate_memory returns the delta needed if we insert + // num_elem more entries. Since rows are distributed across + // 256 buckets, each bucket sees ~batch_size/256 rows on average. + // However, skew is possible, so we use a conservative per-bucket + // estimate of batch_size/128 (2x average). + auto per_bucket_rows = std::max( + 1, state->batch_size() / (BUCKETED_AGG_NUM_BUCKETS / 2)); + return agg_method.hash_table->estimate_memory(per_bucket_rows); + }}, + _bucket_agg_data[b]->method_variant); + } + size_to_reserve += _memory_usage_last_executing; + return size_to_reserve; +} + +Status BucketedAggSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_close_timer); + if (Base::_closed) { + return Status::OK(); + } + PODArray tmp_places; + _places.swap(tmp_places); + return Base::close(state, exec_status); +} + +// ============ BucketedAggSinkOperatorX ============ + +BucketedAggSinkOperatorX::BucketedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : DataSinkOperatorX(operator_id, tnode, dest_id), + _intermediate_tuple_id(tnode.bucketed_agg_node.intermediate_tuple_id), + _output_tuple_id(tnode.bucketed_agg_node.output_tuple_id), + _pool(pool) {} + +Status BucketedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); + + RETURN_IF_ERROR( + VExpr::create_expr_trees(tnode.bucketed_agg_node.grouping_exprs, _probe_expr_ctxs)); + + _aggregate_evaluators.reserve(tnode.bucketed_agg_node.aggregate_functions.size()); + TSortInfo dummy; + for (int i = 0; i < tnode.bucketed_agg_node.aggregate_functions.size(); ++i) { + AggFnEvaluator* evaluator = nullptr; + RETURN_IF_ERROR(AggFnEvaluator::create( + _pool, tnode.bucketed_agg_node.aggregate_functions[i], dummy, + tnode.bucketed_agg_node.grouping_exprs.empty(), false, &evaluator)); + _aggregate_evaluators.push_back(evaluator); + } + + return Status::OK(); +} + +Status BucketedAggSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); + + _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); + + RETURN_IF_ERROR( + VExpr::prepare(_probe_expr_ctxs, state, + DataSinkOperatorX::_child->row_desc())); + RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); + + size_t j = _probe_expr_ctxs.size(); + for (size_t i = 0; i < j; ++i) { + auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable(); + auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); + if (nullable_output != nullable_input) { + DCHECK(nullable_output); + _make_nullable_keys.emplace_back(i); + } + } + + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { + SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; + SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; + RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( + state, DataSinkOperatorX::_child->row_desc(), + intermediate_slot_desc, output_slot_desc)); + _aggregate_evaluators[i]->set_version(state->be_exec_version()); + } + + for (auto& evaluator : _aggregate_evaluators) { + RETURN_IF_ERROR(evaluator->open(state)); + } + + // Compute aggregate state layout. + _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; + const auto& agg_function = _aggregate_evaluators[i]->function(); + _align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data()); + _total_size_of_aggregate_states += agg_function->size_of_data(); + if (i + 1 < _aggregate_evaluators.size()) { + size_t alignment_of_next_state = + _aggregate_evaluators[i + 1]->function()->align_of_data(); + if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { + return Status::RuntimeError("Logical error: align_of_data is not 2^N"); + } + _total_size_of_aggregate_states = + (_total_size_of_aggregate_states + alignment_of_next_state - 1) / + alignment_of_next_state * alignment_of_next_state; + } + } + + return Status::OK(); +} + +Status BucketedAggSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._shared_state->input_num_rows += in_block->rows(); + + if (in_block->rows() > 0) { + RETURN_IF_ERROR(local_state._execute_with_serialized_key(in_block)); + local_state._update_memusage(); + COUNTER_SET(local_state._hash_table_size_counter, + (int64_t)local_state._get_hash_table_size()); + } + + if (eos) { + // Mark this sink instance as finished and try to become the merge target. + // The first sink to finish becomes the merge target — its bucket hash tables + // are the destination for all other sinks' data during source-side merge. + auto* ss = local_state._shared_state; + int expected = -1; + ss->merge_target_instance.compare_exchange_strong(expected, local_state._instance_idx); + // Mark this instance's data as safe to read by source. + ss->sink_finished[local_state._instance_idx].store(true, std::memory_order_release); + // Increment finished count, bump state generation, and unblock all source instances. + // Every sink completion triggers source wakeup so sources can merge + // newly-available data immediately, rather than waiting for all sinks. + ss->num_sinks_finished.fetch_add(1, std::memory_order_release); + ss->state_generation.fetch_add(1, std::memory_order_release); + for (int i = 0; i < static_cast(ss->source_deps.size()); i++) { + local_state._dependency->set_ready_to_read(i); + } + } + return Status::OK(); +} + +size_t BucketedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { + auto& local_state = get_local_state(state); + return local_state.get_reserve_mem_size(state, eos); +} + +} // namespace doris diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.h b/be/src/exec/operator/bucketed_aggregation_sink_operator.h new file mode 100644 index 00000000000000..a4f3b907902cf3 --- /dev/null +++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.h @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "exec/operator/operator.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_profile.h" + +namespace doris { +#include "common/compile_check_begin.h" + +class BucketedAggSinkOperatorX; + +/// Sink-side local state for bucketed hash aggregation. +/// Each pipeline instance builds 256 per-bucket hash tables (two-level hash table). +/// No locking: each instance writes to per_instance_data[_instance_idx]. +class BucketedAggSinkLocalState : public PipelineXSinkLocalState { +public: + ENABLE_FACTORY_CREATOR(BucketedAggSinkLocalState); + using Base = PipelineXSinkLocalState; + BucketedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + ~BucketedAggSinkLocalState() override = default; + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; + + size_t get_reserve_mem_size(RuntimeState* state, bool eos) const; + +private: + friend class BucketedAggSinkOperatorX; + + Status _execute_with_serialized_key(Block* block); + void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, + uint32_t num_rows); + Status _create_agg_status(AggregateDataPtr data); + Status _destroy_agg_status(AggregateDataPtr data); + Status _init_hash_method(const VExprContextSPtrs& probe_exprs); + size_t _get_hash_table_size() const; + void _update_memusage(); + + int _instance_idx = 0; + /// Pointers into shared_state->per_instance_data[_instance_idx]. + std::vector _bucket_agg_data; // [256] + Arena* _arena = nullptr; + + /// Per-instance clones of probe expression contexts. Required because + /// VExprContext::execute() mutates _last_result_column_id and FunctionContext + /// internal state, causing data races when multiple sink instances share the + /// same VExprContext and call execute() concurrently. + VExprContextSPtrs _probe_expr_ctxs; + + /// Per-instance clones of aggregate evaluators. Required because + /// AggFnEvaluator::_calc_argument_columns() mutates internal state + /// (_agg_columns), which causes data races when multiple sink instances + /// share the same evaluator and call execute_batch_add() concurrently. + std::vector _aggregate_evaluators; + + PODArray _places; + + /// Pre-grouped row indices by bucket, reused across blocks. + /// _bucket_row_indices[b] holds row indices that map to bucket b. + DorisVector _bucket_row_indices[BUCKETED_AGG_NUM_BUCKETS]; + + RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _build_timer = nullptr; + RuntimeProfile::Counter* _expr_timer = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; + RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _memory_usage_arena = nullptr; + + /// Peak memory consumed during the last _execute_with_serialized_key call. + /// Used by get_reserve_mem_size() so the pipeline scheduler can apply + /// back-pressure before OOM. + int64_t _memory_usage_last_executing = 0; +}; + +/// Bucketed hash aggregation sink operator. +/// Fuses local + global aggregation for single-BE deployments. +/// Each pipeline instance builds 256 per-bucket hash tables from raw input. +/// The source operator then merges across instances per-bucket (second-phase agg). +class BucketedAggSinkOperatorX final : public DataSinkOperatorX { +public: + BucketedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs); + ~BucketedAggSinkOperatorX() override = default; + + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TDataSink", + DataSinkOperatorX::_name); + } + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + Status sink(RuntimeState* state, Block* in_block, bool eos) override; + + // No local exchange needed — each instance builds its own hash tables independently. + DataDistribution required_data_distribution(RuntimeState* state) const override { + return DataDistribution(ExchangeType::NOOP); + } + + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; + + using DataSinkOperatorX::node_id; + using DataSinkOperatorX::operator_id; + using DataSinkOperatorX::get_local_state; + +private: + friend class BucketedAggSinkLocalState; + + std::vector _aggregate_evaluators; + + TupleId _intermediate_tuple_id; + TupleDescriptor* _intermediate_tuple_desc = nullptr; + TupleId _output_tuple_id; + TupleDescriptor* _output_tuple_desc = nullptr; + + size_t _align_aggregate_states = 1; + Sizes _offsets_of_aggregate_states; + size_t _total_size_of_aggregate_states = 0; + + VExprContextSPtrs _probe_expr_ctxs; + ObjectPool* _pool = nullptr; + std::vector _make_nullable_keys; +}; + +} // namespace doris +#include "common/compile_check_end.h" diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.cpp b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp new file mode 100644 index 00000000000000..0479fc581a57d5 --- /dev/null +++ b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/operator/bucketed_aggregation_source_operator.h" + +#include +#include + +#include "common/exception.h" +#include "core/column/column_vector.h" +#include "exec/common/hash_table/hash.h" +#include "exec/common/util.hpp" +#include "exec/operator/operator.h" +#include "exprs/vectorized_agg_fn.h" +#include "runtime/runtime_profile.h" +#include "runtime/thread_context.h" + +namespace doris { +#include "common/compile_check_begin.h" + +// Helper to set/get null key data on hash tables that support it (DataWithNullKey). +// For hash tables without nullable key support (PHHashMap), these are no-ops. +// This is needed because in nested std::visit lambdas, the outer hash table type is already +// resolved and doesn't depend on the inner template parameter, so `if constexpr` inside the +// inner lambda cannot suppress compilation of code that accesses has_null_key_data() on the +// outer (non-dependent) type. +template +constexpr bool has_nullable_key_v = + std::is_assignable_v().has_null_key_data()), bool>; + +template +void set_null_key_flag(HashTable& ht, bool val) { + if constexpr (has_nullable_key_v) { + ht.has_null_key_data() = val; + } +} + +template +bool get_null_key_flag(const HashTable& ht) { + if constexpr (has_nullable_key_v) { + return ht.has_null_key_data(); + } else { + return false; + } +} + +template +AggregateDataPtr get_null_key_agg_data(HashTable& ht) { + if constexpr (has_nullable_key_v) { + return ht.template get_null_key_data(); + } else { + return nullptr; + } +} + +template +void set_null_key_agg_data(HashTable& ht, AggregateDataPtr val) { + if constexpr (has_nullable_key_v) { + ht.template get_null_key_data() = val; + } +} + +// Returns a REFERENCE to the null key's AggregateDataPtr slot. +// Critical for simple_count merge: writing through a copy would lose the update (Bug #30). +template +AggregateDataPtr& get_null_key_agg_data_ref(HashTable& ht) { + static_assert(has_nullable_key_v, + "get_null_key_agg_data_ref requires a nullable hash table"); + return ht.template get_null_key_data(); +} + +// Helper for emplace that works with PHHashMap (3-arg). +template +auto hash_table_emplace(HashTable& ht, const Key& key, typename HashTable::LookupResult& it, + bool& inserted) -> decltype(ht.emplace(key, it, inserted), void()) { + ht.emplace(key, it, inserted); +} + +/// Merge src aggregate state into dst_ref (a reference to the mapped slot). +/// For simple_count, adds UInt64 counters directly via the reference. +/// For regular aggregates, calls merge() on each function then destroys src state. +/// After return, src is consumed and must not be used. +static void merge_agg_states(AggregateDataPtr& dst_ref, AggregateDataPtr src, bool use_simple_count, + const std::vector& evaluators, const Sizes& offsets, + Arena& arena) { + if (use_simple_count) { + // simple_count: mapped slots hold UInt64 counters. MUST use reference + // to write back correctly. + reinterpret_cast(dst_ref) += reinterpret_cast(src); + } else { + const size_t num_fns = evaluators.size(); + for (size_t i = 0; i < num_fns; ++i) { + evaluators[i]->function()->merge(dst_ref + offsets[i], src + offsets[i], arena); + } + for (size_t i = 0; i < num_fns; ++i) { + evaluators[i]->function()->destroy(src + offsets[i]); + } + } +} + +/// Merge a source null key into a destination null key slot. Handles three cases: +/// 1. Dst has no null key yet: move src's null key to dst (no merge needed). +/// 2. Dst already has a null key: merge src into dst using merge_agg_states. +/// 3. Src has no null key: no-op. +/// After merge, clears the src null key slot. +template +static void merge_null_key(HashTable& dst_data, HashTable& src_data, bool use_simple_count, + const std::vector& evaluators, const Sizes& offsets, + Arena& arena) { + if constexpr (has_nullable_key_v) { + if (!get_null_key_flag(src_data)) { + return; + } + auto src_null = get_null_key_agg_data(src_data); + if (!src_null) { + return; + } + if (!get_null_key_flag(dst_data)) { + // Dst has no null key yet — move src's null key to dst. + set_null_key_flag(dst_data, true); + set_null_key_agg_data(dst_data, src_null); + } else { + // Both have null keys — merge src into dst. + auto& dst_null_ref = get_null_key_agg_data_ref(dst_data); + merge_agg_states(dst_null_ref, src_null, use_simple_count, evaluators, offsets, arena); + } + set_null_key_agg_data(src_data, nullptr); + set_null_key_flag(src_data, false); + } +} + +BucketedAggLocalState::BucketedAggLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + +Status BucketedAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + + _task_idx = info.task_idx; + + _get_results_timer = ADD_TIMER(custom_profile(), "GetResultsTime"); + _hash_table_iterate_timer = ADD_TIMER(custom_profile(), "HashTableIterateTime"); + _insert_keys_to_column_timer = ADD_TIMER(custom_profile(), "InsertKeysToColumnTime"); + _insert_values_to_column_timer = ADD_TIMER(custom_profile(), "InsertValuesToColumnTime"); + _merge_timer = ADD_TIMER(custom_profile(), "MergeTime"); + + return Status::OK(); +} + +Status BucketedAggLocalState::close(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_close_timer); + if (_closed) { + return Status::OK(); + } + + // Release any held per-bucket CAS lock. This can happen when the source + // is closed prematurely (e.g., LIMIT reached via reached_limit() while + // we were mid-output on a bucket). Without this, the other source instance + // would spin forever trying to acquire this bucket's lock. + if (_current_output_bucket >= 0) { + auto& bs = _shared_state->bucket_states[_current_output_bucket]; + bs.output_done.store(true, std::memory_order_release); + bs.merge_in_progress.store(false, std::memory_order_release); + _current_output_bucket = -1; + _shared_state->state_generation.fetch_add(1, std::memory_order_release); + _wake_up_other_sources(); + } + + return Base::close(state); +} + +void BucketedAggLocalState::_make_nullable_output_key(Block* block) { + if (block->rows() != 0) { + for (auto cid : _shared_state->make_nullable_keys) { + block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } + } +} + +void BucketedAggLocalState::_wake_up_other_sources() { + auto& shared_state = *_shared_state; + for (int i = 0; i < static_cast(shared_state.source_deps.size()); ++i) { + shared_state.source_deps[i]->set_ready(); + } +} + +int BucketedAggLocalState::_merge_bucket(int bucket, int merge_target) { + SCOPED_TIMER(_merge_timer); + auto& shared_state = *_shared_state; + auto& bs = shared_state.bucket_states[bucket]; + + // Merge target's bucket is the destination. + auto& dst_agg_data = *shared_state.per_instance_data[merge_target].bucket_agg_data[bucket]; + int merged_count = 0; + + std::visit( + Overload { + [&](std::monostate& arg) -> void { + // uninited — no data to merge + }, + [&](auto& dst_method) -> void { + using AggMethodType = std::decay_t; + auto& dst_data = *dst_method.hash_table; + + // Merge all finished sink instances (except merge_target itself) + // into the merge target's bucket. + for (int inst_idx = 0; inst_idx < shared_state.num_sink_instances; + ++inst_idx) { + if (inst_idx == merge_target) { + continue; + } + // Skip instances already merged for this bucket. + if (bs.merged_instances[inst_idx]) { + continue; + } + // Only merge sinks that have finished. + if (!shared_state.sink_finished[inst_idx].load( + std::memory_order_acquire)) { + continue; + } + + auto& src_inst = shared_state.per_instance_data[inst_idx]; + auto& src_agg_data = *src_inst.bucket_agg_data[bucket]; + + std::visit( + Overload { + [&](std::monostate& arg) -> void { + // Mark as merged even if monostate (no data). + bs.merged_instances[inst_idx] = true; + }, + [&](auto& src_method) -> void { + using SrcMethodType = + std::decay_t; + if constexpr (std::is_same_v) { + auto& src_data = *src_method.hash_table; + + ++merged_count; + + // Direct merge: iterate source hash table + // entries, emplace into destination, and null + // out source entries in one pass. This avoids + // allocating intermediate vectors (keys, + // mappeds, hashes) and eliminates the separate + // null-out traversal. + const bool use_simple_count = + shared_state.use_simple_count; + src_data.for_each([&](const auto& key, + auto& mapped) { + if (!mapped) { + return; + } + auto src_mapped = mapped; + mapped = nullptr; + + typename std::remove_reference_t< + decltype(dst_data)>::LookupResult + dst_it; + bool inserted = false; + hash_table_emplace(dst_data, key, dst_it, + inserted); + + if (inserted) { + *::lookup_result_get_mapped(dst_it) = + src_mapped; + } else { + auto& dst_mapped = + *::lookup_result_get_mapped( + dst_it); + merge_agg_states( + dst_mapped, src_mapped, + use_simple_count, + shared_state + .aggregate_evaluators, + shared_state + .offsets_of_aggregate_states, + *src_inst.arena); + } + }); + + merge_null_key( + dst_data, src_data, use_simple_count, + shared_state.aggregate_evaluators, + shared_state + .offsets_of_aggregate_states, + *src_inst.arena); + + // Mark this instance as merged for this bucket. + bs.merged_instances[inst_idx] = true; + } + }}, + src_agg_data.method_variant); + } + }}, + dst_agg_data.method_variant); + + return merged_count; +} + +void BucketedAggLocalState::_build_output_block(Block* block, MutableColumns& key_columns, + const std::vector& values, + uint32_t num_rows, bool mem_reuse) { + SCOPED_TIMER(_insert_values_to_column_timer); + auto& shared_state = *_shared_state; + auto& p = _parent->template cast(); + size_t key_size = shared_state.probe_expr_ctxs.size(); + size_t agg_size = shared_state.aggregate_evaluators.size(); + + if (p._needs_finalize) { + auto columns_with_schema = + VectorizedUtils::create_columns_with_type_and_name(p.row_descriptor()); + MutableColumns value_columns; + for (size_t i = key_size; i < columns_with_schema.size(); ++i) { + if (mem_reuse) { + value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); + } else { + value_columns.emplace_back(columns_with_schema[i].type->create_column()); + } + } + + if (shared_state.use_simple_count) { + // simple_count: mapped slots hold UInt64 counters directly. + // Write them to ColumnInt64. + DCHECK_EQ(agg_size, 1); + auto* col = assert_cast(value_columns[0].get()); + for (uint32_t r = 0; r < num_rows; ++r) { + col->insert_value(static_cast(reinterpret_cast(values[r]))); + } + } else { + for (size_t i = 0; i < agg_size; ++i) { + shared_state.aggregate_evaluators[i]->insert_result_info_vec( + values, shared_state.offsets_of_aggregate_states[i], value_columns[i].get(), + num_rows); + } + } + + if (!mem_reuse) { + ColumnsWithTypeAndName result_columns; + for (size_t i = 0; i < key_size; ++i) { + result_columns.emplace_back(std::move(key_columns[i]), + shared_state.probe_expr_ctxs[i]->root()->data_type(), + shared_state.probe_expr_ctxs[i]->root()->expr_name()); + } + for (size_t i = 0; i < agg_size; ++i) { + result_columns.emplace_back(std::move(value_columns[i]), + columns_with_schema[key_size + i].type, ""); + } + *block = Block(result_columns); + } + } else { + // Serialize path. simple_count should always finalize. + DCHECK(!shared_state.use_simple_count); + MutableColumns value_columns(agg_size); + DataTypes value_data_types(agg_size); + + for (size_t i = 0; i < agg_size; ++i) { + value_data_types[i] = + shared_state.aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns[i] = std::move(*block->get_by_position(key_size + i).column).mutate(); + } else { + value_columns[i] = + shared_state.aggregate_evaluators[i]->function()->create_serialize_column(); + } + shared_state.aggregate_evaluators[i]->function()->serialize_to_column( + values, shared_state.offsets_of_aggregate_states[i], value_columns[i], + num_rows); + } + + if (!mem_reuse) { + ColumnsWithTypeAndName result_columns; + for (size_t i = 0; i < key_size; ++i) { + result_columns.emplace_back(std::move(key_columns[i]), + shared_state.probe_expr_ctxs[i]->root()->data_type(), + shared_state.probe_expr_ctxs[i]->root()->expr_name()); + } + for (size_t i = 0; i < agg_size; ++i) { + result_columns.emplace_back(std::move(value_columns[i]), value_data_types[i], ""); + } + *block = Block(result_columns); + } + } +} + +Status BucketedAggLocalState::_output_bucket(RuntimeState* state, Block* block, int bucket, + int merge_target, uint32_t* rows_output) { + auto& shared_state = *_shared_state; + size_t key_size = shared_state.probe_expr_ctxs.size(); + + auto& bucket_data = *shared_state.per_instance_data[merge_target].bucket_agg_data[bucket]; + + return std::visit( + Overload {[&](std::monostate& arg) -> Status { + *rows_output = 0; + return Status::OK(); + }, + [&](auto& agg_method) -> Status { + bool mem_reuse = + shared_state.make_nullable_keys.empty() && block->mem_reuse(); + + // Initialize iterator (once per bucket — idempotent). + agg_method.init_iterator(); + auto& it = agg_method.begin; + auto& it_end = agg_method.end; + + uint32_t batch_size = state->batch_size(); + auto table_size = agg_method.hash_table->size(); + auto alloc_size = std::min(static_cast(batch_size), + static_cast(table_size)); + + // Reuse member buffers to avoid per-call heap allocation. + _output_values.resize(alloc_size); + agg_method.output_keys.resize(alloc_size); + + uint32_t num_rows = 0; + { + SCOPED_TIMER(_hash_table_iterate_timer); + while (it != it_end && num_rows < batch_size) { + auto key = it.get_first(); + auto mapped = it.get_second(); + ++it; + if (mapped) { + agg_method.output_keys[num_rows] = key; + _output_values[num_rows] = mapped; + ++num_rows; + } + } + } + + if (num_rows == 0) { + *rows_output = 0; + return Status::OK(); + } + + // Build key columns. + MutableColumns key_columns; + for (size_t i = 0; i < key_size; ++i) { + if (mem_reuse) { + key_columns.emplace_back( + std::move(*block->get_by_position(i).column).mutate()); + } else { + key_columns.emplace_back(shared_state.probe_expr_ctxs[i] + ->root() + ->data_type() + ->create_column()); + } + } + + { + SCOPED_TIMER(_insert_keys_to_column_timer); + agg_method.insert_keys_into_columns(agg_method.output_keys, + key_columns, num_rows); + } + + _build_output_block(block, key_columns, _output_values, num_rows, + mem_reuse); + + *rows_output = num_rows; + return Status::OK(); + }}, + bucket_data.method_variant); +} + +Status BucketedAggLocalState::_merge_and_output_null_keys(RuntimeState* state, Block* block) { + auto& shared_state = *_shared_state; + size_t key_size = shared_state.probe_expr_ctxs.size(); + int merge_target = shared_state.merge_target_instance.load(std::memory_order_relaxed); + + // Merge null keys from all 256 buckets (in merge target) into bucket 0. + // After per-bucket merge, each bucket in merge target may have its own null key data. + // We merge them all into bucket 0's null key. + auto& target_inst = shared_state.per_instance_data[merge_target]; + + // We need to visit bucket 0's method variant to know the type. + std::visit(Overload {[&](std::monostate& arg) -> void { + // No data + }, + [&](auto& dst_method) -> void { + using AggMethodType = std::decay_t; + auto& dst_data = *dst_method.hash_table; + + if constexpr (has_nullable_key_v< + std::remove_reference_t>) { + const bool use_simple_count = shared_state.use_simple_count; + // Merge null keys from buckets [1..255] into bucket 0. + for (int b = 1; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + auto& src_method = std::get( + target_inst.bucket_agg_data[b]->method_variant); + auto& src_data = *src_method.hash_table; + + merge_null_key(dst_data, src_data, use_simple_count, + shared_state.aggregate_evaluators, + shared_state.offsets_of_aggregate_states, + *target_inst.arena); + } + } + }}, + target_inst.bucket_agg_data[0]->method_variant); + + // Now output the merged null key from bucket 0 (if any). + return std::visit( + Overload { + [&](std::monostate& arg) -> Status { return Status::OK(); }, + [&](auto& agg_method) -> Status { + auto& data = *agg_method.hash_table; + + if constexpr (has_nullable_key_v>) { + if (!get_null_key_flag(data)) { + return Status::OK(); + } + auto null_data = get_null_key_agg_data(data); + if (!null_data) { + return Status::OK(); + } + + bool mem_reuse = + shared_state.make_nullable_keys.empty() && block->mem_reuse(); + + MutableColumns key_columns; + for (size_t i = 0; i < key_size; ++i) { + if (mem_reuse) { + key_columns.emplace_back( + std::move(*block->get_by_position(i).column).mutate()); + } else { + key_columns.emplace_back(shared_state.probe_expr_ctxs[i] + ->root() + ->data_type() + ->create_column()); + } + } + + DCHECK_EQ(key_columns.size(), 1); + DCHECK(key_columns[0]->is_nullable()); + key_columns[0]->insert_data(nullptr, 0); + + _output_values.resize(1); + _output_values[0] = null_data; + _build_output_block(block, key_columns, _output_values, 1, mem_reuse); + } + return Status::OK(); + }}, + target_inst.bucket_agg_data[0]->method_variant); +} + +Status BucketedAggLocalState::_get_results(RuntimeState* state, Block* block, bool* eos) { + SCOPED_TIMER(_get_results_timer); + auto& shared_state = *_shared_state; + + // If merge_target is not yet set, no sink has finished yet — shouldn't happen + // because source dependency is only set_ready after first sink finishes. + int merge_target = shared_state.merge_target_instance.load(std::memory_order_acquire); + DCHECK_GE(merge_target, 0); + + // If we have a bucket in progress (output didn't finish in previous get_block), + // continue outputting from it. The per-bucket CAS lock (merge_in_progress) is + // still held from the previous call. + if (_current_output_bucket >= 0) { + uint32_t rows_output = 0; + RETURN_IF_ERROR( + _output_bucket(state, block, _current_output_bucket, merge_target, &rows_output)); + if (rows_output > 0) { + return Status::OK(); + } + // This bucket is fully output. Release the CAS lock and wake other sources. + auto& bs = shared_state.bucket_states[_current_output_bucket]; + bs.output_done.store(true, std::memory_order_release); + bs.merge_in_progress.store(false, std::memory_order_release); + _current_output_bucket = -1; + shared_state.state_generation.fetch_add(1, std::memory_order_release); + _wake_up_other_sources(); + } + + // Snapshot the state generation BEFORE scanning — used to detect races with block(). + // Any state change (bucket release, sink finish) bumps this counter. If it changes + // between our snapshot and the re-check after block(), we know we might have missed + // work and must unblock immediately. This eliminates the TOCTOU gap where the + // per-bucket scan in the re-check could miss a bucket that was released and + // immediately re-acquired by another source. + uint64_t gen_before = shared_state.state_generation.load(std::memory_order_acquire); + bool all_sinks_done = shared_state.num_sinks_finished.load(std::memory_order_acquire) == + shared_state.num_sink_instances; + bool did_work = false; + + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + auto& bs = shared_state.bucket_states[b]; + + // Skip buckets already fully output. + if (bs.output_done.load(std::memory_order_acquire)) { + continue; + } + + // Try to acquire the per-bucket CAS lock. + bool expected = false; + if (!bs.merge_in_progress.compare_exchange_strong(expected, true, + std::memory_order_acquire)) { + // Another source is working on this bucket — skip. + continue; + } + + // Under the lock: merge all finished sink instances into merge target's bucket. + _merge_bucket(b, merge_target); + + if (all_sinks_done) { + // All sinks are done — this bucket is fully merged. Output it. + // Keep the lock while outputting to prevent other sources from + // touching this bucket. + uint32_t rows_output = 0; + auto st = _output_bucket(state, block, b, merge_target, &rows_output); + if (!st.ok()) { + bs.merge_in_progress.store(false, std::memory_order_release); + shared_state.state_generation.fetch_add(1, std::memory_order_release); + _wake_up_other_sources(); + return st; + } + + if (rows_output > 0) { + // Bucket has more data than one batch — keep the CAS lock held + // and remember where we left off. We'll resume on the next call. + _current_output_bucket = b; + return Status::OK(); + } + + // Bucket fully output in one batch. Release lock and wake others. + bs.output_done.store(true, std::memory_order_release); + bs.merge_in_progress.store(false, std::memory_order_release); + shared_state.state_generation.fetch_add(1, std::memory_order_release); + _wake_up_other_sources(); + did_work = true; + } else { + // Not all sinks done yet — release the lock. We'll come back later + // when more sinks finish. + bs.merge_in_progress.store(false, std::memory_order_release); + shared_state.state_generation.fetch_add(1, std::memory_order_release); + _wake_up_other_sources(); + did_work = true; + } + } + + // Check if all 256 buckets have been output. + { + int actually_done = 0; + for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) { + if (shared_state.bucket_states[b].output_done.load(std::memory_order_acquire)) { + ++actually_done; + } + } + if (actually_done == BUCKETED_AGG_NUM_BUCKETS) { + // All buckets output. Handle null key. + bool expected = false; + if (shared_state.null_key_output_claimed.compare_exchange_strong(expected, true)) { + RETURN_IF_ERROR(_merge_and_output_null_keys(state, block)); + if (block->rows() > 0) { + return Status::OK(); + } + } + // No more data. + *eos = true; + return Status::OK(); + } + } + + // No work done in this call. Some buckets are still pending — either locked + // by another source instance or waiting for more sinks to finish. + // Block ourselves so we don't spin at 100% CPU. We will be woken up when: + // - A sink finishes (bumps state_generation and calls set_ready_to_read), OR + // - Another source releases a bucket lock (bumps state_generation and calls + // _wake_up_other_sources). + if (!did_work) { + _dependency->block(); + // Re-check for missed wakeups using the generation counter. If any state + // change occurred since our scan started (gen_before), unblock immediately. + // This is infallible: unlike scanning for unlocked buckets (which has a + // TOCTOU gap where a bucket can be released and re-acquired between the + // block() and the scan), the generation counter monotonically increases + // and captures ALL state changes. + uint64_t gen_after = shared_state.state_generation.load(std::memory_order_acquire); + if (gen_after != gen_before) { + _dependency->set_ready(); + } + } + + // Return empty block — scheduler will re-run us when dependency becomes ready. + return Status::OK(); +} + +// ============ BucketedAggSourceOperatorX ============ + +BucketedAggSourceOperatorX::BucketedAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + int operator_id, const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _needs_finalize(tnode.bucketed_agg_node.need_finalize) {} + +Status BucketedAggSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + + RETURN_IF_ERROR(local_state._get_results(state, block, eos)); + + local_state._make_nullable_output_key(block); + + // Apply HAVING clause conjuncts (if any). + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); + + // Apply LIMIT. + if (!*eos) { + local_state.reached_limit(block, eos); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.h b/be/src/exec/operator/bucketed_aggregation_source_operator.h new file mode 100644 index 00000000000000..9057e5bb89d2db --- /dev/null +++ b/be/src/exec/operator/bucketed_aggregation_source_operator.h @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "common/status.h" +#include "exec/operator/operator.h" + +namespace doris { +#include "common/compile_check_begin.h" + +class BucketedAggSourceOperatorX; + +/// Source-side local state for bucketed hash aggregation. +/// +/// Pipelined merge model: source instances are woken up each time a sink instance +/// finishes. They scan buckets [0..255], acquire per-bucket CAS locks, and merge +/// data from finished sink instances into the merge target's bucket hash table. +/// When all sinks have finished and a bucket is fully merged, it is output. +/// After all 256 buckets are output, one source instance handles null key output. +class BucketedAggLocalState final : public PipelineXLocalState { +public: + using Base = PipelineXLocalState; + ENABLE_FACTORY_CREATOR(BucketedAggLocalState); + BucketedAggLocalState(RuntimeState* state, OperatorXBase* parent); + ~BucketedAggLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + +private: + friend class BucketedAggSourceOperatorX; + + /// Main output function. Scans buckets, merges available data, outputs when ready. + /// Returns with block filled if data is available, or with eos=true when done. + /// If no work is available (sinks still running, no unprocessed buckets), blocks + /// the source dependency and returns an empty block. + Status _get_results(RuntimeState* state, Block* block, bool* eos); + + /// Merge finished sink instances' bucket B data into the merge target's bucket B. + /// Called under the per-bucket CAS lock. Returns the number of sink instances + /// that were actually merged in this call. + int _merge_bucket(int bucket, int merge_target); + + /// Output entries from a merged bucket's hash table. Returns the number of rows output. + /// Resumes from the current iterator position if a previous get_block call + /// didn't finish the bucket. + Status _output_bucket(RuntimeState* state, Block* block, int bucket, int merge_target, + uint32_t* rows_output); + + /// Merge null keys from all 256 buckets (in merge target) into one, and output. + Status _merge_and_output_null_keys(RuntimeState* state, Block* block); + + /// Build value columns (finalize or serialize), combine with key_columns, + /// and write the result into *block. Shared by _output_bucket and + /// _merge_and_output_null_keys to avoid duplicating the finalize/serialize + /// and Block assembly logic. + void _build_output_block(Block* block, MutableColumns& key_columns, + const std::vector& values, uint32_t num_rows, + bool mem_reuse); + + void _make_nullable_output_key(Block* block); + + /// Wake up all source instances (including self) by setting their dependencies ready. + /// Called when this source releases a bucket CAS lock, so that blocked + /// source instances can re-check for available work. + void _wake_up_other_sources(); + + // Bucket currently being output (-1 means no active bucket). + // If a bucket has too many rows for one batch, we resume output here. + int _current_output_bucket = -1; + + // This source instance's task index [0, num_instances). + // Used to wake up other source instances when a bucket lock is released. + int _task_idx = 0; + + /// Reusable buffer for _output_bucket() to avoid per-call heap allocation. + /// Holds AggregateDataPtr entries for the current batch. + std::vector _output_values; + + RuntimeProfile::Counter* _get_results_timer = nullptr; + RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; + RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; + RuntimeProfile::Counter* _insert_values_to_column_timer = nullptr; + RuntimeProfile::Counter* _merge_timer = nullptr; +}; + +class BucketedAggSourceOperatorX : public OperatorX { +public: + using Base = OperatorX; + BucketedAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); + ~BucketedAggSourceOperatorX() override = default; + + Status get_block(RuntimeState* state, Block* block, bool* eos) override; + + bool is_source() const override { return true; } + +private: + friend class BucketedAggLocalState; + + bool _needs_finalize; +}; + +} // namespace doris +#include "common/compile_check_end.h" diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index 2f813e3d00cce8..9c5ca925057a34 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -27,6 +27,8 @@ #include "exec/operator/analytic_source_operator.h" #include "exec/operator/assert_num_rows_operator.h" #include "exec/operator/blackhole_sink_operator.h" +#include "exec/operator/bucketed_aggregation_sink_operator.h" +#include "exec/operator/bucketed_aggregation_source_operator.h" #include "exec/operator/cache_sink_operator.h" #include "exec/operator/cache_source_operator.h" #include "exec/operator/datagen_operator.h" @@ -821,6 +823,7 @@ DECLARE_OPERATOR(SortSinkLocalState) DECLARE_OPERATOR(SpillSortSinkLocalState) DECLARE_OPERATOR(LocalExchangeSinkLocalState) DECLARE_OPERATOR(AggSinkLocalState) +DECLARE_OPERATOR(BucketedAggSinkLocalState) DECLARE_OPERATOR(PartitionedAggSinkLocalState) DECLARE_OPERATOR(ExchangeSinkLocalState) DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState) @@ -852,6 +855,7 @@ DECLARE_OPERATOR(SortLocalState) DECLARE_OPERATOR(SpillSortLocalState) DECLARE_OPERATOR(LocalMergeSortLocalState) DECLARE_OPERATOR(AggLocalState) +DECLARE_OPERATOR(BucketedAggLocalState) DECLARE_OPERATOR(PartitionedAggLocalState) DECLARE_OPERATOR(TableFunctionLocalState) DECLARE_OPERATOR(ExchangeLocalState) @@ -898,6 +902,7 @@ template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; +template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; @@ -916,6 +921,7 @@ template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; +template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; diff --git a/be/src/exec/pipeline/dependency.cpp b/be/src/exec/pipeline/dependency.cpp index 6697352eaeed80..9b20115e1414bc 100644 --- a/be/src/exec/pipeline/dependency.cpp +++ b/be/src/exec/pipeline/dependency.cpp @@ -348,6 +348,13 @@ void AggSharedState::_destroy_agg_status(AggregateDataPtr data) { } } +void BucketedAggSharedState::_destroy_agg_status(AggregateDataPtr data) { + DCHECK(!use_simple_count) << "should not call _destroy_agg_status when use_simple_count"; + for (int i = 0; i < aggregate_evaluators.size(); ++i) { + aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]); + } +} + LocalExchangeSharedState::~LocalExchangeSharedState() = default; Status SetSharedState::update_build_not_ignore_null(const VExprContextSPtrs& ctxs) { diff --git a/be/src/exec/pipeline/dependency.h b/be/src/exec/pipeline/dependency.h index 33c2ebfa2f3a33..fbdbbdbdbe92f0 100644 --- a/be/src/exec/pipeline/dependency.h +++ b/be/src/exec/pipeline/dependency.h @@ -419,6 +419,192 @@ struct AggSharedState : public BasicSharedState { void _destroy_agg_status(AggregateDataPtr data); }; +static constexpr int BUCKETED_AGG_NUM_BUCKETS = 256; + +/// Shared state for BucketedAggSinkOperatorX / BucketedAggSourceOperatorX. +/// +/// Each sink pipeline instance owns 256 per-bucket hash tables (two-level hash table +/// approach, inspired by ClickHouse). During sink, each row is routed to bucket +/// (hash >> 24) & 0xFF. +/// +/// Source-side merge is pipelined with sink completion: as each sink instance finishes, +/// it unblocks all source dependencies. Source instances scan buckets and merge data +/// from finished sink instances into the merge target (the first sink to finish). +/// Each bucket has a CAS lock so only one source works on a bucket at a time. +/// After all sinks finish and all buckets are merged + output, one source handles +/// null key merge and the pipeline completes. +/// +/// Thread safety model: +/// - Sink phase: each instance writes only to its own per_instance_data[task_idx]. No locking. +/// - Source phase: per-bucket CAS lock (merge_in_progress). Under the lock, a source +/// scans all finished sink instances and merges their bucket data into the merge +/// target's bucket. Already-merged entries are nulled out to prevent re-processing. +/// Output is only done when all sinks have finished and the bucket is fully merged. +struct BucketedAggSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(BucketedAggSharedState) +public: + BucketedAggSharedState() = default; + ~BucketedAggSharedState() override { _close(); } + + /// Per-instance data. One per sink pipeline instance. + /// Each instance has 256 bucket hash tables + 1 shared arena. + struct PerInstanceData { + /// 256 per-bucket hash tables. Each bucket has its own BucketedAggDataVariants. + /// Uses PHHashMap for string keys instead of StringHashMap. + std::vector bucket_agg_data; + ArenaUPtr arena; + + PerInstanceData() : arena(std::make_unique()) { + bucket_agg_data.resize(BUCKETED_AGG_NUM_BUCKETS); + for (auto& p : bucket_agg_data) { + p = std::make_unique(); + } + } + }; + + /// Per-bucket merge state for pipelined source-side processing. + struct BucketMergeState { + /// CAS lock: only one source instance can merge/output this bucket at a time. + std::atomic merge_in_progress {false}; + /// Set to true once the bucket is fully merged and all rows have been output. + std::atomic output_done {false}; + /// Tracks which sink instances have been merged into the merge target + /// for this bucket. Accessed only under merge_in_progress CAS lock. + /// Element i is true when instance i's data for this bucket has been merged. + /// Sized to num_sink_instances in init_instances(). + std::vector merged_instances; + }; + + std::vector per_instance_data; + int num_sink_instances = 0; + + /// Tracks how many sinks have finished. Incremented by each sink on EOS. + std::atomic num_sinks_finished = 0; + + /// Per-sink completion flags. Set to true when each sink instance finishes. + /// Source instances read these to know which sinks' data is safe to merge. + std::unique_ptr[]> sink_finished; + + /// Index of the first sink instance to finish. Its bucket hash tables serve + /// as the merge target — all other sinks' data is merged into it. + /// Initialized to -1; the first sink to finish CAS-sets it to its instance idx. + std::atomic merge_target_instance = -1; + + /// Per-bucket merge state. Indexed by bucket id [0, 256). + std::array bucket_states; + + // Aggregate function metadata (shared, read-only after init). + std::vector aggregate_evaluators; + VExprContextSPtrs probe_expr_ctxs; + size_t total_size_of_aggregate_states = 0; + size_t align_aggregate_states = 1; + Sizes offsets_of_aggregate_states; + std::vector make_nullable_keys; + + std::atomic input_num_rows {0}; + + /// When true, the aggregate has exactly one COUNT(*) function with no args. + /// In this case, mapped values in the hash table store a UInt64 counter + /// directly (reinterpret_cast) instead of a pointer to + /// allocated aggregate state. This eliminates create/merge/destroy overhead. + bool use_simple_count = false; + + // ---- Source-side fields ---- + + // Null key handling: null keys are stored separately (not in any bucket). + // After all buckets are processed, one source instance merges and outputs + // all null key data. This atomic ensures exactly one source instance does it. + std::atomic null_key_output_claimed {false}; + + /// Monotonically increasing counter bumped on every state change (bucket lock + /// release, sink finish). Used by source instances to detect missed wakeups: + /// if the generation changed between scan start and post-block() re-check, + /// something happened and the source should unblock immediately. + std::atomic state_generation {0}; + + /// Initialize per-instance data and optionally run a metadata init callback. + /// The callback runs exactly once (under std::call_once) and should populate + /// shared metadata like probe_expr_ctxs, aggregate_evaluators, etc. + template + void init_instances(int num_instances, Func&& metadata_init) { + std::call_once(_init_once, [&]() { + num_sink_instances = num_instances; + per_instance_data.resize(num_instances); + sink_finished = std::make_unique[]>(num_instances); + for (int i = 0; i < num_instances; ++i) { + sink_finished[i].store(false, std::memory_order_relaxed); + } + for (auto& bs : bucket_states) { + bs.merged_instances.resize(num_instances, false); + } + std::forward(metadata_init)(); + }); + } + +private: + std::once_flag _init_once; + + void _close() { + for (auto& inst : per_instance_data) { + for (auto& bucket_data : inst.bucket_agg_data) { + _close_one_agg_data(*bucket_data); + } + } + } + + void _close_one_agg_data(BucketedAggDataVariants& agg_data) { + std::visit( + Overload {[&](std::monostate& arg) -> void { + // Do nothing + }, + [&](auto& agg_method) -> void { + if (use_simple_count) { + // simple_count: mapped slots hold UInt64 counters, + // not real agg state pointers. Skip destroy. + return; + } + auto& data = *agg_method.hash_table; + data.for_each_mapped([&](auto& mapped) { + if (mapped) { + _destroy_agg_status(mapped); + mapped = nullptr; + } + }); + if constexpr (std::is_assignable_v) { + if (data.has_null_key_data()) { + _destroy_agg_status( + data.template get_null_key_data()); + } + } + }}, + agg_data.method_variant); + } + + void _destroy_agg_status(AggregateDataPtr data); +}; + +struct BasicSpillSharedState { + virtual ~BasicSpillSharedState() = default; + + // These two counters are shared to spill source operators as the initial value + // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'. + + // Total bytes of spill data written to disk file(after serialized) + RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; + RuntimeProfile::Counter* _spill_file_total_count = nullptr; + + void setup_shared_profile(RuntimeProfile* sink_profile) { + _spill_file_total_count = + ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1); + _spill_write_file_total_size = + ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileBytes", TUnit::BYTES, 1); + } + + virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) = 0; +}; + +struct AggSpillPartition; struct PartitionedAggSharedState : public BasicSharedState, public std::enable_shared_from_this { ENABLE_FACTORY_CREATOR(PartitionedAggSharedState) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index b667a4b7274ac6..9495d805fa0c85 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -49,6 +49,8 @@ #include "exec/operator/analytic_source_operator.h" #include "exec/operator/assert_num_rows_operator.h" #include "exec/operator/blackhole_sink_operator.h" +#include "exec/operator/bucketed_aggregation_sink_operator.h" +#include "exec/operator/bucketed_aggregation_source_operator.h" #include "exec/operator/cache_sink_operator.h" #include "exec/operator/cache_source_operator.h" #include "exec/operator/datagen_operator.h" @@ -1423,6 +1425,53 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } break; } + case TPlanNodeType::BUCKETED_AGGREGATION_NODE: { + if (tnode.bucketed_agg_node.grouping_exprs.empty()) { + return Status::InternalError( + "Bucketed aggregation node {} should not be used without group by keys", + tnode.node_id); + } + + // Create source operator (goes on the current / downstream pipeline). + op = std::make_shared(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + // Create a new pipeline for the sink side. + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + + // Create sink operator. + sink_ops.push_back(std::make_shared( + pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); + + // Pre-register a single shared state for ALL instances so that every + // sink instance writes its per-instance hash table into the same + // BucketedAggSharedState and every source instance can merge across + // all of them. + { + auto shared_state = BucketedAggSharedState::create_shared(); + shared_state->id = op->operator_id(); + shared_state->related_op_ids.insert(op->operator_id()); + + for (int i = 0; i < _num_instances; i++) { + auto sink_dep = std::make_shared(op->operator_id(), op->node_id(), + "BUCKETED_AGG_SINK_DEPENDENCY"); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + } + shared_state->create_source_dependencies(_num_instances, op->operator_id(), + op->node_id(), "BUCKETED_AGG_SOURCE"); + _op_id_to_shared_state.insert( + {op->operator_id(), {shared_state, shared_state->sink_deps}}); + } + break; + } case TPlanNodeType::HASH_JOIN_NODE: { const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java index e942e71d064e64..76ffdcf4909308 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.PlanNodeAndHash; import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; @@ -389,6 +390,18 @@ public Cost visitPhysicalHashAggregate( } } + @Override + public Cost visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate aggregate, PlanContext context) { + // Bucketed agg is similar to one-phase agg: all computation on a single BE, + // but avoids exchange overhead. Cost is comparable to one-phase agg. + Statistics inputStatistics = context.getChildStatistics(0); + double exprCost = expressionTreeCost(aggregate.getExpressions()); + return Cost.of(context.getSessionVariable(), + exprCost / 100 + inputStatistics.getRowCount(), + inputStatistics.getRowCount(), 0); + } + @Override public Cost visitPhysicalHashJoin( PhysicalHashJoin physicalHashJoin, PlanContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 4100fab5c23327..ddb870724b60da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -120,6 +120,7 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; @@ -188,6 +189,7 @@ import org.apache.doris.planner.AssertNumRowsNode; import org.apache.doris.planner.BackendPartitionedSchemaScanNode; import org.apache.doris.planner.BlackholeSink; +import org.apache.doris.planner.BucketedAggregationNode; import org.apache.doris.planner.CTEScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataStreamSink; @@ -1191,14 +1193,7 @@ public PlanFragment visitPhysicalHashAggregate( // 1. generate slot reference for each group expression List groupSlots = collectGroupBySlots(groupByExpressions, outputExpressions); - ArrayList execGroupingExpressions = Lists.newArrayListWithCapacity(groupByExpressions.size()); - for (Expression e : groupByExpressions) { - Expr result = ExpressionTranslator.translate(e, context); - if (result == null) { - throw new RuntimeException("translate " + e + " failed"); - } - execGroupingExpressions.add(result); - } + ArrayList execGroupingExpressions = translateGroupByExprs(groupByExpressions, context); // 2. collect agg expressions and generate agg function to slot reference map List aggFunctionOutput = Lists.newArrayList(); ArrayList execAggregateFunctions = Lists.newArrayListWithCapacity(outputExpressions.size()); @@ -1246,20 +1241,10 @@ public PlanFragment visitPhysicalHashAggregate( boolean isPartial = hasPartialInAggFunc.get(); // 3. generate output tuple - List slotList = Lists.newArrayList(); - TupleDescriptor outputTupleDesc; - slotList.addAll(groupSlots); - slotList.addAll(aggFunctionOutput); - outputTupleDesc = generateTupleDesc(slotList, null, context); - - List aggFunOutputIds = ImmutableList.of(); - if (!aggFunctionOutput.isEmpty()) { - aggFunOutputIds = Lists.newArrayListWithCapacity(outputTupleDesc.getSlots().size() - groupSlots.size()); - ArrayList slots = outputTupleDesc.getSlots(); - for (int i = groupSlots.size(); i < slots.size(); i++) { - aggFunOutputIds.add(slots.get(i).getId().asInt()); - } - } + Pair> tupleAndIds = + buildAggOutputTuple(groupSlots, aggFunctionOutput, context); + TupleDescriptor outputTupleDesc = tupleAndIds.first; + List aggFunOutputIds = tupleAndIds.second; AggregateInfo aggInfo = AggregateInfo.create(execGroupingExpressions, execAggregateFunctions, aggFunOutputIds, isPartial, outputTupleDesc, aggregate.getAggPhase().toExec()); AggregationNode aggregationNode = new AggregationNode(context.nextPlanNodeId(), @@ -1336,6 +1321,77 @@ public PlanFragment visitPhysicalHashAggregate( return inputPlanFragment; } + @Override + public PlanFragment visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate aggregate, + PlanTranslatorContext context) { + + PlanFragment inputPlanFragment = aggregate.child(0).accept(this, context); + + List groupByExpressions = aggregate.getGroupByExpressions(); + List outputExpressions = aggregate.getOutputExpressions(); + + // 1. generate slot reference for each group expression + List groupSlots = collectGroupBySlots(groupByExpressions, outputExpressions); + ArrayList execGroupingExpressions = translateGroupByExprs(groupByExpressions, context); + + // 2. collect agg expressions + List aggFunctionOutput = Lists.newArrayList(); + ArrayList execAggregateFunctions = Lists.newArrayListWithCapacity(outputExpressions.size()); + Set processedAggregateExpressions = Sets.newIdentityHashSet(); + for (NamedExpression o : outputExpressions) { + if (o.containsType(AggregateExpression.class)) { + aggFunctionOutput.add(o.toSlot()); + + o.foreach(c -> { + if (c instanceof AggregateExpression) { + AggregateExpression aggregateExpression = (AggregateExpression) c; + if (processedAggregateExpressions.add(aggregateExpression)) { + execAggregateFunctions.add( + (FunctionCallExpr) ExpressionTranslator.translate(aggregateExpression, context) + ); + } + return true; + } + return false; + }); + } + } + + // 3. generate output tuple + Pair> tupleAndIds = + buildAggOutputTuple(groupSlots, aggFunctionOutput, context); + TupleDescriptor outputTupleDesc = tupleAndIds.first; + List aggFunOutputIds = tupleAndIds.second; + + // Bucketed agg uses AggPhase.FIRST (update semantics): raw input -> final result. + // Not partial — always needsFinalize. + AggregateInfo aggInfo = AggregateInfo.create(execGroupingExpressions, execAggregateFunctions, + aggFunOutputIds, false /* isPartial */, outputTupleDesc, + AggregateInfo.AggPhase.FIRST); + + BucketedAggregationNode bucketedAggNode = new BucketedAggregationNode( + context.nextPlanNodeId(), inputPlanFragment.getPlanRoot(), aggInfo, true /* needsFinalize */); + + bucketedAggNode.setNereidsId(aggregate.getId()); + context.getNereidsIdToPlanNodeIdMap().put(aggregate.getId(), bucketedAggNode.getId()); + + // Bucketed agg runs entirely within a single fragment. No exchange needed. + // Do NOT set hasColocatePlanNode — bucketed agg does not require colocate + // semantics (one-instance-per-bucket). Bucket assignment is done at the BE + // level via hash partitioning. Leaving this unset allows the fragment to + // route through UnassignedScanSingleOlapTableJob, which respects + // parallel_pipeline_task_num as an upper bound on parallelism. + + setPlanRoot(inputPlanFragment, bucketedAggNode, aggregate); + if (aggregate.getStats() != null) { + bucketedAggNode.setCardinality((long) aggregate.getStats().getRowCount()); + } + updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), aggregate); + + return inputPlanFragment; + } + @Override public PlanFragment visitPhysicalStorageLayerAggregate( PhysicalStorageLayerAggregate storageLayerAggregate, PlanTranslatorContext context) { @@ -3045,6 +3101,48 @@ private PlanFragment connectJoinNode(HashJoinNode hashJoinNode, PlanFragment lef return leftFragment; } + /** + * Translate group-by expressions from Nereids Expression to legacy Expr. + * Shared by visitPhysicalHashAggregate and visitPhysicalBucketedHashAggregate. + */ + private ArrayList translateGroupByExprs(List groupByExpressions, + PlanTranslatorContext context) { + ArrayList execGroupingExpressions = Lists.newArrayListWithCapacity(groupByExpressions.size()); + for (Expression e : groupByExpressions) { + Expr result = ExpressionTranslator.translate(e, context); + if (result == null) { + throw new RuntimeException("translate " + e + " failed"); + } + execGroupingExpressions.add(result); + } + return execGroupingExpressions; + } + + /** + * Build output tuple descriptor and aggregate function output slot IDs. + * Returns Pair(outputTupleDesc, aggFunOutputIds). + * Shared by visitPhysicalHashAggregate and visitPhysicalBucketedHashAggregate. + */ + private Pair> buildAggOutputTuple( + List groupSlots, List aggFunctionOutput, + PlanTranslatorContext context) { + List slotList = Lists.newArrayList(); + slotList.addAll(groupSlots); + slotList.addAll(aggFunctionOutput); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, null, context); + + List aggFunOutputIds = ImmutableList.of(); + if (!aggFunctionOutput.isEmpty()) { + aggFunOutputIds = Lists.newArrayListWithCapacity( + outputTupleDesc.getSlots().size() - groupSlots.size()); + ArrayList slots = outputTupleDesc.getSlots(); + for (int i = groupSlots.size(); i < slots.size(); i++) { + aggFunOutputIds.add(slots.get(i).getId().asInt()); + } + } + return Pair.of(outputTupleDesc, aggFunOutputIds); + } + private List collectGroupBySlots(List groupByExpressions, List outputExpressions) { List groupSlots = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java index 1f3ed8c0be7448..c9bb5c6a7c87db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; @@ -56,9 +57,26 @@ public class ProjectAggregateExpressionsForCse extends PlanPostProcessor { @Override public Plan visitPhysicalHashAggregate(PhysicalHashAggregate aggregate, CascadesContext ctx) { - aggregate = (PhysicalHashAggregate) super.visit(aggregate, ctx); + return projectAggregateCse( + (PhysicalHashAggregate) super.visit(aggregate, ctx)); + } + + @Override + public Plan visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate aggregate, CascadesContext ctx) { + return projectAggregateCse( + (PhysicalBucketedHashAggregate) super.visit(aggregate, ctx)); + } - // for multi-phases aggregate, only process the 1st phase aggregate + /** + * Shared CSE projection logic for both PhysicalHashAggregate and + * PhysicalBucketedHashAggregate. Extracts common sub-expressions from + * aggregate function arguments into a project node beneath the aggregate. + */ + private > + Plan projectAggregateCse(T aggregate) { + // For multi-phase aggregates, only process the 1st phase. + // Bucketed agg is always single-phase, but keep the same guard for safety. if (aggregate.child() instanceof PhysicalDistribute || aggregate.child() instanceof Aggregate) { return aggregate; } @@ -75,7 +93,6 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate agg inputSlots.addAll(expr.getInputSlots()); } if (cseCandidates.isEmpty()) { - // no opportunity to generate cse return aggregate; } CommonSubExpressionCollector collector = new CommonSubExpressionCollector(); @@ -83,7 +100,6 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate agg collector.collect(expr); } if (collector.commonExprByDepth.isEmpty()) { - // no opportunity to generate cse return aggregate; } if (aggregate.child() instanceof PhysicalProject) { @@ -151,8 +167,7 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate agg PhysicalProperties projectPhysicalProperties = ChildOutputPropertyDeriver.computeProjectOutputProperties( project.getProjects(), ((PhysicalPlan) project.child()).getPhysicalProperties()); project = project.withPhysicalPropertiesAndStats(projectPhysicalProperties, project.getStats()); - aggregate = (PhysicalHashAggregate) aggregate - .withAggOutput(aggOutputReplaced) + return (Plan) aggregate.withAggOutput(aggOutputReplaced) .withChildren(project); } else { List projections = new ArrayList<>(); @@ -174,11 +189,9 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate agg projectPhysicalProperties, child.getStats(), aggregate.child()); - aggregate = (PhysicalHashAggregate) aggregate - .withAggOutput(aggOutputReplaced) + return (Plan) aggregate.withAggOutput(aggOutputReplaced) .withChildren(project); } - return aggregate; } private void getCseCandidatesFromAggregateFunction(Expression expr, Map result, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java index 594c35966fbdc2..02de73b9d0d7b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; @@ -256,14 +257,29 @@ public PhysicalAssertNumRows visitPhysicalAssertNumRows(PhysicalAssertNumRows aggregate, CascadesContext context) { + return propagateEffectiveSrc(aggregate, context); + } + + @Override + public PhysicalBucketedHashAggregate visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate aggregate, CascadesContext context) { + return propagateEffectiveSrc(aggregate, context); + } + + /** + * Visit child and propagate effective source type if applicable. + * Shared by visitPhysicalHashAggregate and visitPhysicalBucketedHashAggregate. + * + * Note: agg is not regarded as an effective source itself. For example: + * q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x + * q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z + * RF on q1 is not effective, but RF on q2 is. Let RF judge by ndv. + */ + private T propagateEffectiveSrc(T aggregate, CascadesContext context) { RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - aggregate.child().accept(this, context); - // q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x - // q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z - // RF on q1 is not effective, but RF on q2 is. But q1 is a more generous pattern, and hence agg is not - // regarded as an effective source. Let this RF judge by ndv. + aggregate.child(0).accept(this, context); if (ctx.isEffectiveSrcNode(aggregate.child(0))) { - RuntimeFilterContext.EffectiveSrcType childType = ctx.getEffectiveSrcType(aggregate.child()); + RuntimeFilterContext.EffectiveSrcType childType = ctx.getEffectiveSrcType(aggregate.child(0)); ctx.addEffectiveSrcNode(aggregate, childType); } return aggregate; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java index c7b89519f1c353..a8d6a54c347c74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; @@ -193,6 +194,12 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate agg return aggregate; } + @Override + public Plan visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate aggregate, Context context) { + return aggregate; + } + @Override public Plan visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer, Context context) { return cteConsumer; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 06c3080b9427b1..1de4f5e9a82e31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; @@ -223,6 +224,14 @@ public PhysicalProperties visitPhysicalHashAggregate( } } + @Override + public PhysicalProperties visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate agg, PlanContext context) { + Preconditions.checkState(childrenOutputProperties.size() == 1); + PhysicalProperties childOutputProperty = childrenOutputProperties.get(0); + return new PhysicalProperties(childOutputProperty.getDistributionSpec()); + } + @Override public PhysicalProperties visitPhysicalAssertNumRows(PhysicalAssertNumRows assertNumRows, PlanContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index d79d4590fa51af..fb242415befd47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink; @@ -504,6 +505,14 @@ public Void visitPhysicalHashAggregate(PhysicalHashAggregate agg return null; } + @Override + public Void visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate agg, PlanContext context) { + // Bucketed agg runs entirely on a single BE — no exchange needed. + addRequestPropertyToChildren(PhysicalProperties.ANY); + return null; + } + private boolean shouldUseParent(List parentHashExprIds, PhysicalHashAggregate agg, PlanContext context) { if (!context.getConnectContext().getSessionVariable().aggShuffleUseParentKey) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java index de9526005d0983..112e4b68cf6cfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java @@ -17,6 +17,13 @@ package org.apache.doris.nereids.rules.implementation; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.Group; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.AggregateExpression; @@ -24,26 +31,36 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SessionVarGuardExpr; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase; +import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.util.AggregateUtils; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /**SplitAgg * only process agg without distinct function, split Agg into 2 phase: local agg and global agg @@ -60,21 +77,22 @@ public Rule build() { } private List rewrite(LogicalAggregate aggregate, ConnectContext ctx) { + ImmutableList.Builder candidates = ImmutableList.builder(); switch (ctx.getSessionVariable().aggPhase) { case 1: - return ImmutableList.builder() - .addAll(implementOnePhase(aggregate)) - .build(); + candidates.addAll(implementOnePhase(aggregate)); + break; case 2: - return ImmutableList.builder() - .addAll(splitTwoPhase(aggregate)) - .build(); + candidates.addAll(splitTwoPhase(aggregate)); + break; default: - return ImmutableList.builder() - .addAll(implementOnePhase(aggregate)) - .addAll(splitTwoPhase(aggregate)) - .build(); + candidates.addAll(implementOnePhase(aggregate)); + candidates.addAll(splitTwoPhase(aggregate)); + break; } + // Add bucketed agg candidate when enabled and on single BE with GROUP BY keys + candidates.addAll(implementBucketedPhase(aggregate, ctx)); + return candidates.build(); } /** @@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr expr, Map implementBucketedPhase(LogicalAggregate aggregate, ConnectContext ctx) { + if (!ctx.getSessionVariable().enableBucketedHashAgg) { + return ImmutableList.of(); + } + // Only for single-BE deployments + int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true); + if (beNumber != 1) { + return ImmutableList.of(); + } + // Without-key aggregation not supported in initial version + if (aggregate.getGroupByExpressions().isEmpty()) { + return ImmutableList.of(); + } + // Must support two-phase execution (same check as splitTwoPhase) + if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) { + return ImmutableList.of(); + } + // Skip aggregates with no aggregate functions (pure GROUP BY dedup). + // These are produced by DistinctAggregateRewriter as the bottom dedup phase. + if (aggregate.getAggregateFunctions().isEmpty()) { + return ImmutableList.of(); + } + // Skip aggregates containing multi-distinct functions (e.g., multi_distinct_count, + // multi_distinct_sum). These are semantically distinct aggregations rewritten by + // DistinctAggregateRewriter — they embed deduplication in the BE-level function. + // The bucketed agg cost model does not account for deduplication overhead, which + // causes the base-table bucketed path to appear artificially cheap compared to + // materialized views using pre-aggregated bitmap_union/hll_union. + for (AggregateFunction func : aggregate.getAggregateFunctions()) { + if (func instanceof MultiDistinction) { + return ImmutableList.of(); + } + } + // Skip aggregates whose child group contains a LogicalAggregate. + // This detects the top aggregate in a DISTINCT decomposition (e.g., + // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b + // on top of GROUP BY a,b dedup). Bucketed agg does not support + // DISTINCT aggregation in the initial version. + if (childGroupContainsAggregate(aggregate)) { + return ImmutableList.of(); + } + // Skip when sortByGroupKey optimization applies. This is detected by + // checking if the aggregate's owner group has a LogicalTopN parent + // whose order key expressions equal the group-by keys (produced by + // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not + // support sortByGroupKey, so we yield to the regular hash-agg plan. + if (hasSortByGroupKeyTopN(aggregate)) { + return ImmutableList.of(); + } + // Skip when data is already distributed by the GROUP BY keys + // (e.g., table bucketed by UserID, query GROUP BY UserID). + // In this case the two-phase plan needs no exchange and is strictly + // better than bucketed agg (no 256-bucket overhead, no merge phase). + if (groupByKeysSatisfyDistribution(aggregate)) { + return ImmutableList.of(); + } + // Data-volume-based checks: control bucketed agg eligibility based on + // estimated data scale, similar to ClickHouse's group_by_two_level_threshold + // and group_by_two_level_threshold_bytes. This reduces reliance on + // column-level statistics which may be inaccurate or missing. + // + // When statistics are unavailable (groupExpression absent or childStats null), + // conservatively skip bucketed agg — without data volume information we cannot + // make an informed decision, and the risk of choosing bucketed agg in a + // high-cardinality scenario outweighs the potential benefit. + if (!aggregate.getGroupExpression().isPresent()) { + return ImmutableList.of(); + } + GroupExpression ge = aggregate.getGroupExpression().get(); + Statistics childStats = ge.childStatistics(0); + if (childStats == null) { + return ImmutableList.of(); + } + double rows = childStats.getRowCount(); + long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows; + long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys; + + // Gate: minimum input rows. + // When input data is too small, the overhead of initializing 256 + // per-bucket hash tables and the pipelined merge phase outweighs + // the benefit of eliminating exchange. Skip bucketed agg. + if (minInputRows > 0 && rows < minInputRows) { + return ImmutableList.of(); + } + + // Gate: maximum estimated group keys (similar to ClickHouse's + // group_by_two_level_threshold). When the number of distinct groups + // is too large, the source-side merge must combine too many keys + // across instances, and the merge cost dominates. Skip bucketed agg. + Statistics aggStats = ge.getOwnerGroup().getStatistics(); + if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() > maxGroupKeys) { + return ImmutableList.of(); + } + + // High-cardinality ratio checks (existing logic). + // These complement the absolute thresholds above with relative checks: + // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows * threshold, + // the combined NDV is at least that high. + // 2. Aggregation ratio check: if estimated output rows > rows * threshold, + // merge cost dominates. + double highCardThreshold = 0.3; + for (Expression groupByKey : aggregate.getGroupByExpressions()) { + ColumnStatistic colStat = childStats.findColumnStatistics(groupByKey); + if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows * highCardThreshold) { + return ImmutableList.of(); + } + } + if (aggStats != null && aggStats.getRowCount() > rows * highCardThreshold) { + return ImmutableList.of(); + } + // Build output expressions: rewrite AggregateFunction -> AggregateExpression with GLOBAL_RESULT param + // (same as one-phase aggregation — raw input directly produces final result). + List aggOutput = ExpressionUtils.rewriteDownShortCircuit( + aggregate.getOutputExpressions(), expr -> { + if (!(expr instanceof AggregateFunction)) { + return expr; + } + return new AggregateExpression((AggregateFunction) expr, AggregateParam.GLOBAL_RESULT); + } + ); + return ImmutableList.of(new PhysicalBucketedHashAggregate<>( + aggregate.getGroupByExpressions(), aggOutput, + aggregate.getLogicalProperties(), aggregate.child())); + } + + /** + * Check if the child group of this aggregate contains a LogicalAggregate. + * This is used to detect aggregates produced by DISTINCT decomposition rewrites + * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where the original + * DISTINCT aggregate is split into a top non-distinct aggregate over a bottom dedup aggregate. + */ + private boolean childGroupContainsAggregate(LogicalAggregate aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + Group childGroup = groupExpr.child(0); + for (GroupExpression childGroupExpr : childGroup.getLogicalExpressions()) { + if (childGroupExpr.getPlan() instanceof LogicalAggregate) { + return true; + } + } + return false; + } + + /** + * Check if a LogicalTopN parent exists whose order keys are identical to + * the aggregate's group-by keys. This means PushTopnToAgg will later set + * sortByGroupKey on PhysicalHashAggregate; bucketed agg doesn't support + * that optimization so we skip it. + * + * Handles both TopN->Agg and TopN->Project->Agg patterns. + */ + private boolean hasSortByGroupKeyTopN(LogicalAggregate aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + List groupByKeys = aggregate.getGroupByExpressions(); + Group ownerGroup = aggregate.getGroupExpression().get().getOwnerGroup(); + for (GroupExpression parentGE : ownerGroup.getParentGroupExpressions()) { + Plan parentPlan = parentGE.getPlan(); + if (parentPlan instanceof LogicalTopN + && orderKeysMatchGroupKeys((LogicalTopN) parentPlan, groupByKeys)) { + return true; + } + if (parentPlan instanceof LogicalProject && parentGE.getOwnerGroup() != null) { + for (GroupExpression gpGE : parentGE.getOwnerGroup().getParentGroupExpressions()) { + if (gpGE.getPlan() instanceof LogicalTopN + && orderKeysMatchGroupKeys((LogicalTopN) gpGE.getPlan(), groupByKeys)) { + return true; + } + } + } + } + return false; + } + + private boolean orderKeysMatchGroupKeys(LogicalTopN topN, List groupByKeys) { + List orderKeys = topN.getOrderKeys(); + if (orderKeys.size() != groupByKeys.size()) { + return false; + } + for (int i = 0; i < groupByKeys.size(); i++) { + if (!groupByKeys.get(i).equals(orderKeys.get(i).getExpr())) { + return false; + } + } + return true; + } + + /** + * Check if the GROUP BY keys of this aggregate are a superset of (or equal to) + * the underlying OlapTable's hash distribution columns. When this is true, + * the data is already correctly partitioned for the aggregation, so the + * two-phase plan (local + global) requires no exchange and is strictly better + * than bucketed agg (no 256-bucket overhead, no merge phase). + * + * Traverses the child group in the Memo to find a LogicalOlapScan, + * walking through LogicalProject and LogicalFilter transparently. + */ + private boolean groupByKeysSatisfyDistribution(LogicalAggregate aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5); + if (table == null) { + return false; + } + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return false; + } + List distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + if (distributionColumns.isEmpty()) { + return false; + } + // Collect GROUP BY column names (only direct SlotReference with original column info) + Set groupByColumnNames = new HashSet<>(); + for (Expression expr : aggregate.getGroupByExpressions()) { + if (expr instanceof SlotReference) { + SlotReference slot = (SlotReference) expr; + if (slot.getOriginalColumn().isPresent()) { + groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); + } + } + } + // All distribution columns must appear in the GROUP BY keys + for (Column column : distributionColumns) { + if (!groupByColumnNames.contains(column.getName().toLowerCase())) { + return false; + } + } + return true; + } + + /** + * Recursively search through a Memo Group to find a LogicalOlapScan, + * walking through LogicalProject and LogicalFilter nodes. + * Returns the OlapTable if found, null otherwise. + * maxDepth prevents infinite recursion. + */ + private OlapTable findOlapTableInGroup(Group group, int maxDepth) { + if (maxDepth <= 0) { + return null; + } + for (GroupExpression ge : group.getLogicalExpressions()) { + Plan plan = ge.getPlan(); + if (plan instanceof LogicalOlapScan) { + return ((LogicalOlapScan) plan).getTable(); + } + if ((plan instanceof LogicalProject || plan instanceof LogicalFilter) && ge.arity() > 0) { + OlapTable result = findOlapTableInGroup(ge.child(0), maxDepth - 1); + if (result != null) { + return result; + } + } + } + return null; + } + private boolean shouldUseLocalAgg(LogicalAggregate aggregate) { Statistics aggStats = aggregate.getGroupExpression().get().getOwnerGroup().getStatistics(); Statistics aggChildStats = aggregate.getGroupExpression().get().childStatistics(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index 353cd4922120c0..7af2d69877d411 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -104,6 +104,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.logical.LogicalWorkTableReference; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; @@ -1035,6 +1036,12 @@ public Statistics visitPhysicalHashAggregate(PhysicalHashAggregate agg, Void context) { + return computeAggregate(agg, groupExpression.childStatistics(0)); + } + @Override public Statistics visitPhysicalRepeat(PhysicalRepeat repeat, Void context) { return computeRepeat(repeat, groupExpression.childStatistics(0)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 486d23b92169f2..a3657b3219b7b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -130,6 +130,7 @@ public enum PlanType { // physical others PHYSICAL_HASH_AGGREGATE, + PHYSICAL_BUCKETED_HASH_AGGREGATE, PHYSICAL_ASSERT_NUM_ROWS, PHYSICAL_CTE_PRODUCER, PHYSICAL_CTE_ANCHOR, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java new file mode 100644 index 00000000000000..949d54d282eb41 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Aggregate; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Physical bucketed hash aggregation plan node. + * + * Fuses two-phase aggregation (local + global) into a single operator for single-BE deployments. + * The sink side builds per-instance hash tables from raw input (first-phase agg). + * The source side merges across instances per-bucket using direct in-memory merge + * (no serialization/deserialization) and outputs the final result. + * + * This node replaces the pattern: GlobalAgg -> PhysicalDistribute -> LocalAgg + * with a single fused operator, eliminating exchange overhead entirely. + */ +public class PhysicalBucketedHashAggregate extends PhysicalUnary + implements Aggregate { + + private final List groupByExpressions; + private final List outputExpressions; + + public PhysicalBucketedHashAggregate(List groupByExpressions, + List outputExpressions, + LogicalProperties logicalProperties, CHILD_TYPE child) { + this(groupByExpressions, outputExpressions, Optional.empty(), logicalProperties, child); + } + + public PhysicalBucketedHashAggregate(List groupByExpressions, + List outputExpressions, + Optional groupExpression, + LogicalProperties logicalProperties, CHILD_TYPE child) { + super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression, logicalProperties, child); + this.groupByExpressions = ImmutableList.copyOf( + Objects.requireNonNull(groupByExpressions, "groupByExpressions cannot be null")); + this.outputExpressions = ImmutableList.copyOf( + Objects.requireNonNull(outputExpressions, "outputExpressions cannot be null")); + } + + /** + * Constructor with group expression, logical properties, physical properties, and statistics. + */ + public PhysicalBucketedHashAggregate(List groupByExpressions, + List outputExpressions, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression, logicalProperties, + physicalProperties, statistics, child); + this.groupByExpressions = ImmutableList.copyOf( + Objects.requireNonNull(groupByExpressions, "groupByExpressions cannot be null")); + this.outputExpressions = ImmutableList.copyOf( + Objects.requireNonNull(outputExpressions, "outputExpressions cannot be null")); + } + + @Override + public List getGroupByExpressions() { + return groupByExpressions; + } + + @Override + public List getOutputExpressions() { + return outputExpressions; + } + + @Override + public List getOutputs() { + return outputExpressions; + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalBucketedHashAggregate(this, context); + } + + @Override + public List getExpressions() { + return new ImmutableList.Builder() + .addAll(groupByExpressions) + .addAll(outputExpressions) + .build(); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalBucketedHashAggregate[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, + "groupByExpr", groupByExpressions, + "outputExpr", outputExpressions + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PhysicalBucketedHashAggregate that = (PhysicalBucketedHashAggregate) o; + return Objects.equals(groupByExpressions, that.groupByExpressions) + && Objects.equals(outputExpressions, that.outputExpressions); + } + + @Override + public int hashCode() { + return Objects.hash(groupByExpressions, outputExpressions); + } + + @Override + public PhysicalBucketedHashAggregate withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new PhysicalBucketedHashAggregate<>(groupByExpressions, outputExpressions, + groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); + } + + @Override + public PhysicalBucketedHashAggregate withGroupExpression( + Optional groupExpression) { + return new PhysicalBucketedHashAggregate<>(groupByExpressions, outputExpressions, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.size() == 1); + return new PhysicalBucketedHashAggregate<>(groupByExpressions, outputExpressions, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalBucketedHashAggregate withPhysicalPropertiesAndStats( + PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalBucketedHashAggregate<>(groupByExpressions, outputExpressions, + groupExpression, getLogicalProperties(), + physicalProperties, statistics, child()); + } + + @Override + public PhysicalBucketedHashAggregate withAggOutput(List newOutput) { + return new PhysicalBucketedHashAggregate<>(groupByExpressions, newOutput, + Optional.empty(), getLogicalProperties(), + physicalProperties, statistics, child()); + } + + @Override + public String shapeInfo() { + StringBuilder builder = new StringBuilder("bucketedHashAgg["); + builder.append("BUCKETED"); + builder.append(']'); + return builder.toString(); + } + + @Override + public List computeOutput() { + return outputExpressions.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public PhysicalBucketedHashAggregate resetLogicalProperties() { + return new PhysicalBucketedHashAggregate<>(groupByExpressions, outputExpressions, + groupExpression, null, + physicalProperties, statistics, child()); + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + DataTrait childFd = child(0).getLogicalProperties().getTrait(); + + ImmutableSet.Builder groupByKeysBuilder = ImmutableSet.builder(); + for (Expression expr : groupByExpressions) { + groupByKeysBuilder.addAll(expr.getInputSlots()); + } + ImmutableSet groupByKeys = groupByKeysBuilder.build(); + + if (groupByExpressions.isEmpty() || childFd.isUniformAndNotNull(groupByKeys)) { + getOutput().forEach(builder::addUniqueSlot); + return; + } + + builder.addUniqueSlot(childFd); + builder.addUniqueSlot(groupByKeys); + } + + @Override + public void computeUniform(DataTrait.Builder builder) { + DataTrait childFd = child(0).getLogicalProperties().getTrait(); + builder.addUniformSlot(childFd); + + ImmutableSet.Builder groupByKeysBuilder = ImmutableSet.builder(); + for (Expression expr : groupByExpressions) { + groupByKeysBuilder.addAll(expr.getInputSlots()); + } + ImmutableSet groupByKeys = groupByKeysBuilder.build(); + + if (groupByExpressions.isEmpty() || childFd.isUniformAndNotNull(groupByKeys)) { + getOutput().forEach(builder::addUniformSlot); + } + } + + @Override + public void computeEqualSet(DataTrait.Builder builder) { + builder.addEqualSet(child().getLogicalProperties().getTrait()); + } + + @Override + public void computeFd(DataTrait.Builder builder) { + builder.addFuncDepsDG(child().getLogicalProperties().getTrait()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index 1aa50b53c05a36..b17f7304564578 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; @@ -332,6 +333,11 @@ public R visitPhysicalHashAggregate(PhysicalHashAggregate agg, C return visit(agg, context); } + public R visitPhysicalBucketedHashAggregate( + PhysicalBucketedHashAggregate agg, C context) { + return visit(agg, context); + } + public R visitPhysicalStorageLayerAggregate(PhysicalStorageLayerAggregate storageLayerAggregate, C context) { return storageLayerAggregate.getRelation().accept(this, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java new file mode 100644 index 00000000000000..8ba43f80323c56 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.ExprToThriftVisitor; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.thrift.TBucketedAggregationNode; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Bucketed hash aggregation node. + * + * Fuses two-phase aggregation (local + global) into a single BE operator for single-BE deployments. + * Produces a BUCKETED_AGGREGATION_NODE in the Thrift plan, which the BE maps to + * BucketedAggSinkOperatorX / BucketedAggSourceOperatorX. + */ +public class BucketedAggregationNode extends PlanNode { + private final AggregateInfo aggInfo; + private final boolean needsFinalize; + + public BucketedAggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo, + boolean needsFinalize) { + super(id, aggInfo.getOutputTupleId().asList(), "BUCKETED AGGREGATE"); + this.aggInfo = aggInfo; + this.needsFinalize = needsFinalize; + this.children.add(input); + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.BUCKETED_AGGREGATION_NODE; + + List aggregateFunctions = Lists.newArrayList(); + for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) { + aggregateFunctions.add(ExprToThriftVisitor.treeToThrift(e)); + } + + List groupingExprs = Lists.newArrayList(); + if (aggInfo.getGroupingExprs() != null) { + groupingExprs = ExprToThriftVisitor.treesToThrift(aggInfo.getGroupingExprs()); + } + + TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode(); + bucketedAggNode.setGroupingExprs(groupingExprs); + bucketedAggNode.setAggregateFunctions(aggregateFunctions); + bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt()); + bucketedAggNode.setOutputTupleId(aggInfo.getOutputTupleId().asInt()); + bucketedAggNode.setNeedFinalize(needsFinalize); + + msg.bucketed_agg_node = bucketedAggNode; + } + + @Override + public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(detailPrefix).append("BUCKETED").append("\n"); + + if (detailLevel == TExplainLevel.BRIEF) { + output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n"); + return output.toString(); + } + + if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) { + output.append(detailPrefix).append("output: ") + .append(getExplainString(aggInfo.getMaterializedAggregateExprs())).append("\n"); + } + output.append(detailPrefix).append("group by: ") + .append(getExplainString(aggInfo.getGroupingExprs())) + .append("\n"); + if (!conjuncts.isEmpty()) { + output.append(detailPrefix).append("having: ").append(getExplainString(conjuncts)).append("\n"); + } + output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n"); + return output.toString(); + } + + @Override + public boolean isSerialOperator() { + // Bucketed agg handles group-by keys only (no without-key in initial version), + // so it's never a serial operator. + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2d24753db7bfa4..808e646797d71d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -977,6 +977,9 @@ public static double getHotValueThreshold() { public static final String MULTI_DISTINCT_STRATEGY = "multi_distinct_strategy"; public static final String AGG_PHASE = "agg_phase"; + public static final String ENABLE_BUCKETED_HASH_AGG = "enable_bucketed_hash_agg"; + public static final String BUCKETED_AGG_MIN_INPUT_ROWS = "bucketed_agg_min_input_rows"; + public static final String BUCKETED_AGG_MAX_GROUP_KEYS = "bucketed_agg_max_group_keys"; public static final String MERGE_IO_READ_SLICE_SIZE_BYTES = "merge_io_read_slice_size_bytes"; @@ -2929,6 +2932,32 @@ public void setSkewRewriteAggBucketNum(int num) { checker = "checkAggPhase") public int aggPhase = 0; + @VariableMgr.VarAttr(name = ENABLE_BUCKETED_HASH_AGG, needForward = true, description = { + "是否启用 bucketed hash aggregation 优化。该优化在单 BE 场景下将两阶段聚合融合为单个算子," + + "消除 Exchange 开销和序列化/反序列化成本。默认开启。", + "Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase " + + "aggregation into a single operator on single-BE deployments, eliminating exchange overhead " + + "and serialization/deserialization costs. Enabled by default."}) + public boolean enableBucketedHashAgg = true; + + @VariableMgr.VarAttr(name = BUCKETED_AGG_MIN_INPUT_ROWS, fuzzy = true, needForward = true, description = { + "bucketed hash aggregation 要求的最小输入行数。当估算输入行数小于此阈值时," + + "数据量太小,256-bucket two-level hash table 的初始化和 merge 开销大于收益," + + "不生成 bucketed agg 候选计划。设为 0 表示不限制。默认 100000。", + "Minimum estimated input rows required for bucketed hash aggregation. When estimated input " + + "rows are below this threshold, the data volume is too small for the 256-bucket two-level " + + "hash table overhead to be worthwhile. Set to 0 to disable this check. Default 100000."}) + public long bucketedAggMinInputRows = 100000; + + @VariableMgr.VarAttr(name = BUCKETED_AGG_MAX_GROUP_KEYS, needForward = true, description = { + "bucketed hash aggregation 允许的最大估算分组数(key 数量)。当估算分组数超过此阈值时," + + "merge 阶段需要合并大量 key,开销会超过 bucketed agg 带来的收益。" + + "类似于 ClickHouse 的 group_by_two_level_threshold。设为 0 表示不限制。默认 0", + "Maximum estimated number of group keys for bucketed hash aggregation. When the estimated " + + "number of groups exceeds this threshold, the merge phase cost of combining large numbers " + + "of keys outweighs the benefit. Similar to ClickHouse's group_by_two_level_threshold. " + + "Set to 0 to disable this check. Default 0."}) + public long bucketedAggMaxGroupKeys = 0; @VariableMgr.VarAttr(name = MERGE_IO_READ_SLICE_SIZE_BYTES, description = { "调整 READ_SLICE_SIZE 大小,降低 Merge IO 读放大影响", @@ -3623,6 +3652,7 @@ public void initFuzzyModeVariables() { this.rewriteOrToInPredicateThreshold = 100000; this.enableFunctionPushdown = false; this.enableSyncRuntimeFilterSize = true; + this.bucketedAggMinInputRows = 0; } else { this.rewriteOrToInPredicateThreshold = 2; this.enableFunctionPushdown = true; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 64ad52267b3418..85d334098d62a6 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -62,7 +62,8 @@ enum TPlanNodeType { GROUP_COMMIT_SCAN_NODE = 33, MATERIALIZATION_NODE = 34, REC_CTE_NODE = 35, - REC_CTE_SCAN_NODE = 36 + REC_CTE_SCAN_NODE = 36, + BUCKETED_AGGREGATION_NODE = 37 } struct TKeyRange { @@ -1074,6 +1075,14 @@ struct TAggregationNode { 10: optional TSortInfo agg_sort_info_by_group_key } +struct TBucketedAggregationNode { + 1: optional list grouping_exprs + 2: optional list aggregate_functions + 3: optional Types.TTupleId intermediate_tuple_id + 4: optional Types.TTupleId output_tuple_id + 5: optional bool need_finalize +} + struct TRepeatNode { // Tulple id used for output, it has new slots. 1: required Types.TTupleId output_tuple_id @@ -1522,6 +1531,7 @@ struct TPlanNode { 50: optional list> distribute_expr_lists 51: optional bool is_serial_operator 52: optional TRecCTEScanNode rec_cte_scan_node + 53: optional TBucketedAggregationNode bucketed_agg_node // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list projections diff --git a/regression-test/data/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.out b/regression-test/data/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.out new file mode 100644 index 00000000000000..a9d049206b61c2 --- /dev/null +++ b/regression-test/data/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_mv -- +2024-07-01 a 1 +2024-07-02 a 1 +2024-07-02 b 1 +2024-07-03 b 2 + +-- !select_mv_no_bucketed -- +2024-07-01 a 1 +2024-07-02 a 1 +2024-07-02 b 1 +2024-07-03 b 2 + diff --git a/regression-test/suites/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.groovy b/regression-test/suites/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.groovy new file mode 100644 index 00000000000000..a85219d8d12eb8 --- /dev/null +++ b/regression-test/suites/mv_p0/ut/testBucketedAggSyncMV/testBucketedAggSyncMV.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("testBucketedAggSyncMV") { + // Regression test: when enable_bucketed_hash_agg=true on a single-BE cluster, + // the PhysicalBucketedHashAggregate plan for the base table with multi_distinct_count + // could appear artificially cheap (bypasses one-phase agg ban, requests ANY properties), + // beating the sync MV path on cost. The fix bans bucketed agg for MultiDistinction + // functions in SplitAggWithoutDistinct.implementBucketedPhase(). + + sql "set pre_materialized_view_rewrite_strategy = TRY_IN_RBO" + sql """set enable_nereids_planner=true;""" + sql "set disable_nereids_rules='DISTINCT_AGGREGATE_SPLIT';" + sql "set enable_bucketed_hash_agg = true;" + + sql """ DROP TABLE IF EXISTS bucketed_agg_mv_test; """ + + sql """ + CREATE TABLE bucketed_agg_mv_test ( + time_col date NOT NULL, + advertiser varchar(10), + dt date NOT NULL, + channel varchar(10), + user_id int) + DUPLICATE KEY(`time_col`, `advertiser`) + PARTITION BY RANGE (dt)( + FROM ("2024-07-01") TO ("2024-07-05") INTERVAL 1 DAY) + DISTRIBUTED BY hash(time_col) BUCKETS 3 + PROPERTIES('replication_num' = '1'); + """ + + sql """insert into bucketed_agg_mv_test values("2024-07-01",'a',"2024-07-01",'x',1);""" + sql """insert into bucketed_agg_mv_test values("2024-07-01",'a',"2024-07-01",'x',1);""" + sql """insert into bucketed_agg_mv_test values("2024-07-02",'a',"2024-07-02",'y',2);""" + sql """insert into bucketed_agg_mv_test values("2024-07-02",'b',"2024-07-02",'x',1);""" + sql """insert into bucketed_agg_mv_test values("2024-07-03",'b',"2024-07-03",'y',3);""" + + createMV(""" + CREATE MATERIALIZED VIEW bucketed_agg_uv_mv AS + SELECT advertiser AS a1, + channel AS a2, + dt AS a3, + bitmap_union(to_bitmap(user_id)) AS a4 + FROM bucketed_agg_mv_test + GROUP BY advertiser, + channel, + dt; + """) + + sql """insert into bucketed_agg_mv_test values("2024-07-03",'b',"2024-07-03",'y',4);""" + + sql "analyze table bucketed_agg_mv_test with sync;" + sql """alter table bucketed_agg_mv_test modify column time_col set stats ('row_count'='6');""" + + // Core assertion: with enable_bucketed_hash_agg=true, the sync MV must still be chosen. + // Before the fix, PhysicalBucketedHashAggregate(multi_distinct_count) would win on cost. + mv_rewrite_success( + "SELECT dt, advertiser, count(DISTINCT user_id) FROM bucketed_agg_mv_test GROUP BY dt, advertiser;", + "bucketed_agg_uv_mv") + + // Verify correctness of results + order_qt_select_mv """ + SELECT dt, advertiser, count(DISTINCT user_id) + FROM bucketed_agg_mv_test + GROUP BY dt, advertiser + ORDER BY dt, advertiser; + """ + + // Also verify the MV is chosen for bitmap_union_count form + mv_rewrite_success( + "SELECT dt, advertiser, bitmap_union_count(to_bitmap(user_id)) FROM bucketed_agg_mv_test GROUP BY dt, advertiser;", + "bucketed_agg_uv_mv") + + // Verify it still works when bucketed agg is disabled (baseline) + sql "set enable_bucketed_hash_agg = false;" + mv_rewrite_success( + "SELECT dt, advertiser, count(DISTINCT user_id) FROM bucketed_agg_mv_test GROUP BY dt, advertiser;", + "bucketed_agg_uv_mv") + + order_qt_select_mv_no_bucketed """ + SELECT dt, advertiser, count(DISTINCT user_id) + FROM bucketed_agg_mv_test + GROUP BY dt, advertiser + ORDER BY dt, advertiser; + """ +} diff --git a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy index 72ee1b92efb415..a8ca39b64e4ef3 100644 --- a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy +++ b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy @@ -21,6 +21,7 @@ suite("agg_strategy") { sql "set global enable_auto_analyze=false" sql "set runtime_filter_mode=OFF" sql "set be_number_for_test=1;" + sql "set enable_bucketed_hash_agg = false;" for (int i = 0; i < 2; i++) { if (i == 0) { diff --git a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy index 953c233c0d517c..bfb38dcf31fa82 100644 --- a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy +++ b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy @@ -21,6 +21,7 @@ suite("distinct_agg_rewriter") { set runtime_filter_mode=OFF; set enable_parallel_result_sink=false; set be_number_for_test=1; + set enable_bucketed_hash_agg = false; """ multi_sql """ analyze table t1000_2 with sync; diff --git a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy index a565f55a0bfedd..8ed45af847f793 100644 --- a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy +++ b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.groovy @@ -21,6 +21,7 @@ suite("distinct_agg_strategy_selector") { set runtime_filter_mode=OFF; set enable_parallel_result_sink=false; set be_number_for_test=1; + set enable_bucketed_hash_agg = false; """ multi_sql """ analyze table t1000 with sync;