Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,22 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
*
* 7: start from doris 3.0.2
* a. window funnel logic change
* b. support const column in serialize/deserialize function: PR #41175
* b. support const column in serialize/deserialize function: PR #41175
*/

const int BeExecVersionManager::max_be_exec_version = 8;
// /////////////////////////////////////////////////////////////////////////////
// ATTN: !!! BE EXEC VERSION IS A VERY SENSITIVE COMPATIBILITY FIELD !!!
// 1. We should avoid abusing be_exec_version, especially not using it to handle
// compatibility issues of functions (use function aliases for that instead).
// 2. Do not fork versions in past releases; all new be exec versions should
// first go into master before entering new release versions.
// !!! DO NOT CHANGE IT UNLESS YOU ARE 100% SURE WHAT YOU ARE DOING !!!
// /////////////////////////////////////////////////////////////////////////////

// 10: start from doris 4.0.3
// a. use new fixed object serialization way.

const int BeExecVersionManager::max_be_exec_version = 10;
const int BeExecVersionManager::min_be_exec_version = 0;
std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {};
std::set<std::string> BeExecVersionManager::_function_restrict_map;
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ constexpr inline int AGGREGATION_2_1_VERSION =
6; // some aggregation changed the data format after this version
constexpr inline int USE_CONST_SERDE =
8; // support const column in serialize/deserialize function: PR #41175
constexpr inline int USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION = 10;

class BeExecVersionManager {
public:
Expand Down
13 changes: 1 addition & 12 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,6 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
}

size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() *
Expand Down Expand Up @@ -354,10 +351,6 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
Base::_shared_state->aggregate_evaluators[i]);
}
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())
->get_nested_column_ptr();
}

size_t buffer_size = Base::_shared_state->aggregate_evaluators[i]
->function()
Expand Down Expand Up @@ -412,9 +405,6 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
}

SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]
Expand Down Expand Up @@ -731,8 +721,7 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_i
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_pool(pool),
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts && !tnode.conjuncts.empty())),
_have_conjuncts(tnode.__isset.conjuncts && !tnode.conjuncts.empty()),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,6 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
auto col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
}

size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() * rows;
Expand Down
15 changes: 9 additions & 6 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ std::string OperatorXBase::debug_string(RuntimeState* state, int indentation_lev
return state->get_local_state(operator_id())->debug_string(indentation_level);
}

Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) {
std::string node_name = print_plan_node_type(tnode.node_type);
_nereids_id = tnode.nereids_id;
if (!tnode.intermediate_output_tuple_id_list.empty()) {
Expand All @@ -194,19 +194,16 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
_op_name = substr + "_OPERATOR";

if (tnode.__isset.vconjunct) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context));
_conjuncts.emplace_back(context);
return Status::InternalError("vconjunct is not supported yet");
} else if (tnode.__isset.conjuncts) {
for (auto& conjunct : tnode.conjuncts) {
for (const auto& conjunct : tnode.conjuncts) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context));
_conjuncts.emplace_back(context);
}
}

// create the projections expr

if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
Expand All @@ -227,6 +224,12 @@ Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
if (state->enable_adjust_conjunct_order_by_cost()) {
std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
return a->execute_cost() < b->execute_cost();
});
};

for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
Expand Down
12 changes: 9 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
int& arrived_rf_num) {
// Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads
std::unique_lock lock(_conjuncts_lock);
return _helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
arrived_rf_num, _conjuncts);
RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
arrived_rf_num, _conjuncts));
if (state->enable_adjust_conjunct_order_by_cost()) {
std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
return a->execute_cost() < b->execute_cost();
});
};
return Status::OK();
}

Status ScanLocalStateBase::clone_conjunct_ctxs(vectorized::VExprContextSPtrs& scanner_conjuncts) {
Expand Down Expand Up @@ -323,7 +329,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
message += conjunct->root()->debug_string();
}
}
custom_profile()->add_info_string("RemainedDownPredicates", message);
custom_profile()->add_info_string("RemainedPredicates", message);
}

for (auto& it : _slot_id_to_value_range) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool* pool, int operator_id,
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

void StreamingAggOperatorX::update_operator(const TPlanNode& tnode,
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ class StreamingAggOperatorX MOCK_REMOVE(final) : public StatefulOperatorX<Stream
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;
std::vector<size_t> _make_nullable_keys;
bool _have_conjuncts;
RowDescriptor _agg_fn_output_row_descriptor;
// For sort limit
bool _do_sort_limit = false;
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class RuntimeState {
: _query_options.mem_limit / 20;
}

bool enable_adjust_conjunct_order_by_cost() const {
return _query_options.__isset.enable_adjust_conjunct_order_by_cost &&
_query_options.enable_adjust_conjunct_order_by_cost;
}

int32_t max_column_reader_num() const {
return _query_options.__isset.max_column_reader_num ? _query_options.max_column_reader_num
: 20000;
Expand Down Expand Up @@ -575,6 +580,11 @@ class RuntimeState {
return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan;
}

bool enable_aggregate_function_null_v2() const {
return _query_options.__isset.enable_aggregate_function_null_v2 &&
_query_options.enable_aggregate_function_null_v2;
}

bool is_read_csv_empty_line_as_null() const {
return _query_options.__isset.read_csv_empty_line_as_null &&
_query_options.read_csv_empty_line_as_null;
Expand Down
75 changes: 74 additions & 1 deletion be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,93 @@
#include "util/hash/city.h"
#include "util/murmur_hash3.h"
#include "util/sse_util.hpp"
#include "vec/common/endian.h"

namespace doris {
#include "common/compile_check_begin.h"
namespace detail {
// Slicing-by-4 table: t[0] is the standard byte-at-a-time table,
// t[1..3] are extended tables for parallel 4-byte processing.
struct CRC32SliceBy4Table {
uint32_t t[4][256] {};
constexpr CRC32SliceBy4Table() {
// t[0]: standard CRC32 lookup table
for (uint32_t i = 0; i < 256; i++) {
uint32_t c = i;
for (int j = 0; j < 8; j++) {
c = (c & 1) ? ((c >> 1) ^ 0xEDB88320U) : (c >> 1);
}
t[0][i] = c;
}
// t[1..3]: each entry is one additional CRC byte-step applied to t[k-1]
for (uint32_t i = 0; i < 256; i++) {
uint32_t c = t[0][i];
for (int k = 1; k < 4; k++) {
c = t[0][c & 0xFF] ^ (c >> 8);
t[k][i] = c;
}
}
}
};
} // namespace detail

// Utility class to compute hash values.
class HashUtil {
private:
static inline constexpr detail::CRC32SliceBy4Table CRC32_TABLE {};

public:
static uint32_t zlib_crc_hash(const void* data, uint32_t bytes, uint32_t hash) {
return (uint32_t)crc32(hash, (const unsigned char*)data, bytes);
}

// Inline CRC32 (zlib-compatible, standard CRC32 polynomial) for fixed-size types.
// Uses Slicing-by-4 technique for 4/8-byte types: processes 4 bytes at a time using
// 4 precomputed lookup tables, reducing serial table lookups from 4 to 1 per 4-byte chunk.
// Polynomial: 0xEDB88320 (reflected form of 0x04C11DB7).
// Endian note: CRC32 reflected algorithm processes bytes in address order (byte[0] first).
// Slicing-by-4 requires byte[0] at LSB of the loaded uint32_t, which is little-endian layout.
// LittleEndian::Load32 provides this on ALL platforms: noop on LE, bswap on BE.
template <typename T>
static uint32_t zlib_crc32_fixed(const T& value, uint32_t hash) {
const auto* p = reinterpret_cast<const uint8_t*>(&value);
// zlib convention: pre/post XOR with 0xFFFFFFFF
uint32_t crc = hash ^ 0xFFFFFFFFU;

if constexpr (sizeof(T) == 1) {
// 1 byte: single table lookup
crc = CRC32_TABLE.t[0][(crc ^ p[0]) & 0xFF] ^ (crc >> 8);
} else if constexpr (sizeof(T) == 2) {
// 2 bytes: two sequential table lookups (slicing doesn't help below 4 bytes)
crc = CRC32_TABLE.t[0][(crc ^ p[0]) & 0xFF] ^ (crc >> 8);
crc = CRC32_TABLE.t[0][(crc ^ p[1]) & 0xFF] ^ (crc >> 8);
} else if constexpr (sizeof(T) == 4) {
// 4 bytes: one Slicing-by-4 step — 4 independent lookups in parallel
// LittleEndian::Load32 handles unaligned load + byte-swap on big-endian,
// ensuring byte[0] is always at LSB for correct CRC byte processing order.
uint32_t word = LittleEndian::Load32(p) ^ crc;
crc = CRC32_TABLE.t[3][(word)&0xFF] ^ CRC32_TABLE.t[2][(word >> 8) & 0xFF] ^
CRC32_TABLE.t[1][(word >> 16) & 0xFF] ^ CRC32_TABLE.t[0][(word >> 24) & 0xFF];
} else if constexpr (sizeof(T) == 8) {
// 8 bytes: two Slicing-by-4 steps
uint32_t word = LittleEndian::Load32(p) ^ crc;
crc = CRC32_TABLE.t[3][(word)&0xFF] ^ CRC32_TABLE.t[2][(word >> 8) & 0xFF] ^
CRC32_TABLE.t[1][(word >> 16) & 0xFF] ^ CRC32_TABLE.t[0][(word >> 24) & 0xFF];

word = LittleEndian::Load32(p + 4) ^ crc;
crc = CRC32_TABLE.t[3][(word)&0xFF] ^ CRC32_TABLE.t[2][(word >> 8) & 0xFF] ^
CRC32_TABLE.t[1][(word >> 16) & 0xFF] ^ CRC32_TABLE.t[0][(word >> 24) & 0xFF];
} else {
// Fallback to zlib for larger/unusual types
return (uint32_t)crc32(hash, (const unsigned char*)&value, sizeof(T));
}
return crc ^ 0xFFFFFFFFU;
}

static uint32_t zlib_crc_hash_null(uint32_t hash) {
// null is treat as 0 when hash
static const int INT_VALUE = 0;
return (uint32_t)crc32(hash, (const unsigned char*)(&INT_VALUE), 4);
return zlib_crc32_fixed(INT_VALUE, hash);
}

template <typename T>
Expand Down
Loading
Loading