Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions be/src/exec/common/agg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ using AggregatedDataWithNullableUInt32KeyPhase2 =
using AggregatedDataWithNullableUInt64KeyPhase2 =
DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
using AggregatedDataWithNullableShortStringKey = DataWithNullKey<AggregatedDataWithShortStringKey>;

using AggregatedMethodVariants = std::variant<
using AggregatedDataWithNullableStringKey = DataWithNullKey<AggregatedDataWithStringKey>;

/// Parameterized method variant for aggregation hash tables.
/// StringData / NullableStringData control which hash map is used for string keys:
/// - AggregatedDataVariants uses StringHashMap (AggregatedDataWithShortStringKey)
/// - BucketedAggDataVariants uses PHHashMap<StringRef> (AggregatedDataWithStringKey)
/// to avoid StringHashMap's sub-table complexity and unify the emplace interface.
template <typename StringData, typename NullableStringData>
using AggMethodVariantsBase = std::variant<
std::monostate, MethodSerialized<AggregatedDataWithStringKey>,
MethodOneNumber<UInt8, AggData<UInt8>>, MethodOneNumber<UInt16, AggData<UInt16>>,
MethodOneNumber<UInt32, AggData<UInt32>>, MethodOneNumber<UInt64, AggData<UInt64>>,
MethodStringNoCache<AggregatedDataWithShortStringKey>,
MethodOneNumber<UInt128, AggData<UInt128>>, MethodOneNumber<UInt256, AggData<UInt256>>,
MethodStringNoCache<StringData>, MethodOneNumber<UInt128, AggData<UInt128>>,
MethodOneNumber<UInt256, AggData<UInt256>>,
MethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>,
MethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>,
MethodSingleNullableColumn<MethodOneNumber<UInt8, AggDataNullable<UInt8>>>,
Expand All @@ -66,89 +73,134 @@ using AggregatedMethodVariants = std::variant<
MethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>,
MethodSingleNullableColumn<MethodOneNumber<UInt128, AggDataNullable<UInt128>>>,
MethodSingleNullableColumn<MethodOneNumber<UInt256, AggDataNullable<UInt256>>>,
MethodSingleNullableColumn<MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
MethodSingleNullableColumn<MethodStringNoCache<NullableStringData>>,
MethodKeysFixed<AggData<UInt64>>, MethodKeysFixed<AggData<UInt72>>,
MethodKeysFixed<AggData<UInt96>>, MethodKeysFixed<AggData<UInt104>>,
MethodKeysFixed<AggData<UInt128>>, MethodKeysFixed<AggData<UInt136>>,
MethodKeysFixed<AggData<UInt256>>>;

struct AggregatedDataVariants
: public DataVariants<AggregatedMethodVariants, MethodSingleNullableColumn, MethodOneNumber,
DataWithNullKey> {
AggregatedDataWithoutKey without_key = nullptr;

void init(const std::vector<DataTypePtr>& data_types, HashKeyType type) {
using AggregatedMethodVariants = AggMethodVariantsBase<AggregatedDataWithShortStringKey,
AggregatedDataWithNullableShortStringKey>;

/// Bucketed agg uses PHHashMap<StringRef> for string keys instead of StringHashMap.
/// This avoids StringHashMap's sub-table complexity and unifies the emplace interface
/// (3-arg PHHashMap::emplace), while still using HashMethodString for correct
/// single-column string key extraction.
using BucketedAggMethodVariants =
AggMethodVariantsBase<AggregatedDataWithStringKey, AggregatedDataWithNullableStringKey>;

/// Intermediate base that adds the shared init logic for aggregation data
/// variants. Only the string_key case differs between AggregatedDataVariants
/// and BucketedAggDataVariants; all other key types are identical. The
/// StringData/NullableStringData template parameters control which hash map
/// type is emplaced for string_key.
template <typename MethodVariants, typename StringData, typename NullableStringData>
struct AggDataVariantsBase : public DataVariants<MethodVariants, MethodSingleNullableColumn,
MethodOneNumber, DataWithNullKey> {
void init_agg_data(const std::vector<DataTypePtr>& data_types, HashKeyType type) {
bool nullable = data_types.size() == 1 && data_types[0]->is_nullable();

switch (type) {
case HashKeyType::without_key:
break;
case HashKeyType::serialized:
method_variant.emplace<MethodSerialized<AggregatedDataWithStringKey>>();
this->method_variant.template emplace<MethodSerialized<AggregatedDataWithStringKey>>();
break;
case HashKeyType::int8_key:
emplace_single<UInt8, AggData<UInt8>>(nullable);
this->template emplace_single<UInt8, AggData<UInt8>>(nullable);
break;
case HashKeyType::int16_key:
emplace_single<UInt16, AggData<UInt16>>(nullable);
this->template emplace_single<UInt16, AggData<UInt16>>(nullable);
break;
case HashKeyType::int32_key:
emplace_single<UInt32, AggData<UInt32>>(nullable);
this->template emplace_single<UInt32, AggData<UInt32>>(nullable);
break;
case HashKeyType::int32_key_phase2:
emplace_single<UInt32, AggregatedDataWithUInt32KeyPhase2>(nullable);
this->template emplace_single<UInt32, AggregatedDataWithUInt32KeyPhase2>(nullable);
break;
case HashKeyType::int64_key:
emplace_single<UInt64, AggData<UInt64>>(nullable);
this->template emplace_single<UInt64, AggData<UInt64>>(nullable);
break;
case HashKeyType::int64_key_phase2:
emplace_single<UInt64, AggregatedDataWithUInt64KeyPhase2>(nullable);
this->template emplace_single<UInt64, AggregatedDataWithUInt64KeyPhase2>(nullable);
break;
case HashKeyType::int128_key:
emplace_single<UInt128, AggData<UInt128>>(nullable);
this->template emplace_single<UInt128, AggData<UInt128>>(nullable);
break;
case HashKeyType::int256_key:
emplace_single<UInt256, AggData<UInt256>>(nullable);
this->template emplace_single<UInt256, AggData<UInt256>>(nullable);
break;
case HashKeyType::string_key:
if (nullable) {
method_variant.emplace<MethodSingleNullableColumn<
MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>>();
this->method_variant.template emplace<
MethodSingleNullableColumn<MethodStringNoCache<NullableStringData>>>();
} else {
method_variant.emplace<MethodStringNoCache<AggregatedDataWithShortStringKey>>();
this->method_variant.template emplace<MethodStringNoCache<StringData>>();
}
break;
case HashKeyType::fixed64:
method_variant.emplace<MethodKeysFixed<AggData<UInt64>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt64>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed72:
method_variant.emplace<MethodKeysFixed<AggData<UInt72>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt72>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed96:
method_variant.emplace<MethodKeysFixed<AggData<UInt96>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt96>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed104:
method_variant.emplace<MethodKeysFixed<AggData<UInt104>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt104>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed128:
method_variant.emplace<MethodKeysFixed<AggData<UInt128>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt128>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed136:
method_variant.emplace<MethodKeysFixed<AggData<UInt136>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt136>>>(
get_key_sizes(data_types));
break;
case HashKeyType::fixed256:
method_variant.emplace<MethodKeysFixed<AggData<UInt256>>>(get_key_sizes(data_types));
this->method_variant.template emplace<MethodKeysFixed<AggData<UInt256>>>(
get_key_sizes(data_types));
break;
default:
throw Exception(ErrorCode::INTERNAL_ERROR,
"AggregatedDataVariants meet invalid key type, type={}", type);
throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid agg key type, type={}", type);
}
}
};

struct AggregatedDataVariants
: public AggDataVariantsBase<AggregatedMethodVariants, AggregatedDataWithShortStringKey,
AggregatedDataWithNullableShortStringKey> {
AggregatedDataWithoutKey without_key = nullptr;

bool is_fixed_key = true;

void init(const std::vector<DataTypePtr>& data_types, HashKeyType type) {
is_fixed_key = !(type == HashKeyType::without_key || type == HashKeyType::EMPTY ||
type == HashKeyType::serialized || type == HashKeyType::string_key);
this->init_agg_data(data_types, type);
}
};

using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
using ArenaUPtr = std::unique_ptr<Arena>;

/// Data variants for bucketed hash aggregation.
/// Uses BucketedAggMethodVariants (PHHashMap for string keys).
struct BucketedAggDataVariants
: public AggDataVariantsBase<BucketedAggMethodVariants, AggregatedDataWithStringKey,
AggregatedDataWithNullableStringKey> {
void init(const std::vector<DataTypePtr>& data_types, HashKeyType type) {
this->init_agg_data(data_types, type);
}
};

using BucketedAggDataVariantsUPtr = std::unique_ptr<BucketedAggDataVariants>;

struct AggregateDataContainer {
public:
AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states)
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/common/hash_table/hash_map_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ struct MethodBaseInner {
Arena arena;
DorisVector<size_t> hash_values;

/// Reusable buffer for source-side output iteration to avoid per-batch
/// heap allocation of std::vector<Key>. Callers use resize() + direct
/// element assignment, so the capacity is retained across batches.
std::vector<Key> output_keys;

// use in join case
DorisVector<uint32_t> bucket_nums;

Expand Down
Loading
Loading