diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h b/src/paimon/common/data/columnar/columnar_batch_context.h index 1bea6d1ec..66d46f3ca 100644 --- a/src/paimon/common/data/columnar/columnar_batch_context.h +++ b/src/paimon/common/data/columnar/columnar_batch_context.h @@ -16,10 +16,13 @@ #pragma once +#include #include +#include #include #include "arrow/array/array_base.h" +#include "arrow/util/bit_util.h" namespace arrow { class StructArray; @@ -28,12 +31,106 @@ class StructArray; namespace paimon { class MemoryPool; +/// Pre-cached column metadata for fast access without virtual function calls or checked_cast. +struct CachedColumnMeta { + /// Null bitmap pointer. nullptr means no nulls (all valid). + const uint8_t* null_bitmap = nullptr; + /// Arrow array offset (for sliced arrays). + int64_t array_offset = 0; + /// For fixed-width types: raw pointer to values buffer (buffer[1]). + /// For variable-length types (STRING/BINARY): raw pointer to data buffer (buffer[2]). + const uint8_t* values_data = nullptr; + /// For variable-length types (STRING/BINARY): raw pointer to offsets buffer (buffer[1]). + const int32_t* offsets = nullptr; + + /// Fast null check: directly reads validity bitmap bit. + inline bool IsNull(int64_t row_id) const { + return null_bitmap != nullptr && + !arrow::bit_util::GetBit(null_bitmap, array_offset + row_id); + } + + /// Fast fixed-width value access (INT8/INT16/INT32/INT64/FLOAT/DOUBLE/DATE32/TIMESTAMP). + template + inline T GetFixed(int64_t row_id) const { + return reinterpret_cast(values_data)[array_offset + row_id]; + } + + /// Fast boolean value access (bit-packed in values buffer). + inline bool GetBool(int64_t row_id) const { + return arrow::bit_util::GetBit(values_data, array_offset + row_id); + } + + /// Fast string_view access for non-dictionary STRING/BINARY columns. + inline std::string_view GetVarLenView(int64_t row_id) const { + int64_t idx = array_offset + row_id; + int32_t start = offsets[idx]; + int32_t length = offsets[idx + 1] - start; + return {reinterpret_cast(values_data) + start, static_cast(length)}; + } +}; + struct ColumnarBatchContext { ColumnarBatchContext(const arrow::ArrayVector& array_vec_in, const std::shared_ptr& pool_in) - : pool(pool_in), array_vec(array_vec_in) {} + : pool(pool_in), array_vec(array_vec_in) { + BuildCachedMeta(); + } std::shared_ptr pool; arrow::ArrayVector array_vec; + /// Pre-cached metadata per column for fast access. + std::vector cached_meta; + + private: + void BuildCachedMeta() { + cached_meta.resize(array_vec.size()); + for (size_t i = 0; i < array_vec.size(); i++) { + const auto* array = array_vec[i].get(); + auto& meta = cached_meta[i]; + meta.array_offset = array->offset(); + + // Cache null bitmap + if (array->null_count() != 0) { + meta.null_bitmap = array->null_bitmap_data(); + } + + // Cache data pointers based on type + const auto& array_data = array->data(); + switch (array->type_id()) { + case arrow::Type::BOOL: + case arrow::Type::INT8: + case arrow::Type::INT16: + case arrow::Type::INT32: + case arrow::Type::DATE32: + case arrow::Type::INT64: + case arrow::Type::FLOAT: + case arrow::Type::DOUBLE: { + // Fixed-width: values in buffer[1] + if (array_data->buffers.size() > 1 && array_data->buffers[1]) { + meta.values_data = array_data->buffers[1]->data(); + } + break; + } + case arrow::Type::STRING: + case arrow::Type::BINARY: { + // Variable-length: offsets in buffer[1], data in buffer[2] + if (array_data->buffers.size() > 2) { + if (array_data->buffers[1]) { + meta.offsets = + reinterpret_cast(array_data->buffers[1]->data()); + } + if (array_data->buffers[2]) { + meta.values_data = array_data->buffers[2]->data(); + } + } + break; + } + default: + // TIMESTAMP, DECIMAL, DICTIONARY, LIST, MAP, STRUCT — not cached, use array_vec + // fallback + break; + } + } + } }; } // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h b/src/paimon/common/data/columnar/columnar_row_ref.h index 6939a859b..58df49c28 100644 --- a/src/paimon/common/data/columnar/columnar_row_ref.h +++ b/src/paimon/common/data/columnar/columnar_row_ref.h @@ -37,10 +37,11 @@ namespace paimon { class Bytes; /// Columnar row view which shares batch-level context to reduce per-row overhead. +/// Uses pre-cached column metadata for fast field access without virtual function calls. class ColumnarRowRef : public InternalRow { public: ColumnarRowRef(std::shared_ptr ctx, int64_t row_id) - : ctx_(std::move(ctx)), row_id_(row_id) {} + : ctx_(std::move(ctx)), cached_meta_ptr_(ctx_->cached_meta.data()), row_id_(row_id) {} Result GetRowKind() const override { return row_kind_; @@ -55,47 +56,39 @@ class ColumnarRowRef : public InternalRow { } bool IsNullAt(int32_t pos) const override { - return ctx_->array_vec[pos]->IsNull(row_id_); + return cached_meta_ptr_[pos].IsNull(row_id_); } bool GetBoolean(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetBool(row_id_); } char GetByte(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return static_cast(cached_meta_ptr_[pos].GetFixed(row_id_)); } int16_t GetShort(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } int32_t GetInt(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } int32_t GetDate(int32_t pos) const override { - return ColumnarUtils::GetGenericValue( - ctx_->array_vec[pos].get(), row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } int64_t GetLong(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } float GetFloat(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } double GetDouble(int32_t pos) const override { - return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), - row_id_); + return cached_meta_ptr_[pos].GetFixed(row_id_); } BinaryString GetString(int32_t pos) const override { @@ -105,6 +98,11 @@ class ColumnarRowRef : public InternalRow { } std::string_view GetStringView(int32_t pos) const override { + auto& meta = cached_meta_ptr_[pos]; + if (meta.values_data && meta.offsets) { + return meta.GetVarLenView(row_id_); + } + // Fallback for dictionary-encoded or uncached types return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_); } @@ -129,6 +127,7 @@ class ColumnarRowRef : public InternalRow { private: std::shared_ptr ctx_; + const CachedColumnMeta* cached_meta_ptr_; const RowKind* row_kind_ = RowKind::Insert(); int64_t row_id_; }; diff --git a/src/paimon/core/io/async_key_value_producer_and_consumer.cpp b/src/paimon/core/io/async_key_value_producer_and_consumer.cpp index fb516fde7..ea597c006 100644 --- a/src/paimon/core/io/async_key_value_producer_and_consumer.cpp +++ b/src/paimon/core/io/async_key_value_producer_and_consumer.cpp @@ -38,7 +38,7 @@ AsyncKeyValueProducerAndConsumer::AsyncKeyValueProducerAndConsumer( pool_(pool), sort_merge_reader_(std::move(sort_merge_reader)), create_consumer_(std::move(create_consumer)) { - kv_queue_.set_capacity(batch_size); + kv_queue_.set_capacity(consumer_thread_num * 2); result_queue_.set_capacity(RESULT_BATCH_COUNT); } @@ -79,8 +79,8 @@ Result AsyncKeyValueProducerAndConsumer::NextBatch() { Result>> consumer = create_consumer_(); PAIMON_RETURN_NOT_OK(consumer.status()); auto async_consumer = std::make_unique>( - batch_size_ / consumer_thread_num_, std::move(consumer).value(), consume_finished_, - consumer_finished_count_, kv_queue_, result_queue_); + std::move(consumer).value(), consume_finished_, consumer_finished_count_, kv_queue_, + result_queue_); consumers_.push_back(std::move(async_consumer)); } } @@ -107,24 +107,33 @@ Result AsyncKeyValueProducerAndConsumer::NextBatch() { template Status AsyncKeyValueProducerAndConsumer::ProduceLoop() { + std::vector batch; + batch.reserve(batch_size_); while (!consume_finished_) { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iterator, sort_merge_reader_->NextBatch()); if (iterator == nullptr) { - // all iterator is all visited - kv_queue_.push(std::nullopt); break; } while (!consume_finished_) { PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); if (!has_next) { - // current iterator is all visited break; } - std::optional kv = std::move(iterator->Next()); - kv_queue_.push(std::move(kv)); + batch.push_back(std::move(iterator->Next())); + if (static_cast(batch.size()) >= batch_size_) { + kv_queue_.push(std::move(batch)); + batch = std::vector(); + batch.reserve(batch_size_); + } } } + // Push remaining rows + if (!batch.empty()) { + kv_queue_.push(std::move(batch)); + } + // Push empty batch as EOF signal + kv_queue_.push(std::vector()); return Status::OK(); } @@ -157,8 +166,8 @@ void AsyncKeyValueProducerAndConsumer::CleanUpQueue() { } } - std::optional kv; - while (kv_queue_.try_pop(kv)) { + std::vector kv_batch; + while (kv_queue_.try_pop(kv_batch)) { } } @@ -167,12 +176,11 @@ template class AsyncKeyValueProducerAndConsumer; template AsyncKeyValueConsumer::AsyncKeyValueConsumer( - int32_t batch_size, std::unique_ptr>&& key_value_consumer, + std::unique_ptr>&& key_value_consumer, std::atomic& consume_finished, std::atomic& consumer_finished_count, - tbb::concurrent_bounded_queue>& kv_queue, + tbb::concurrent_bounded_queue>& kv_queue, tbb::concurrent_bounded_queue& result_queue) - : batch_size_(batch_size), - key_value_consumer_(std::move(key_value_consumer)), + : key_value_consumer_(std::move(key_value_consumer)), consume_finished_(consume_finished), consumer_finished_count_(consumer_finished_count), kv_queue_(kv_queue), @@ -195,30 +203,18 @@ Status AsyncKeyValueConsumer::GetStatus() const { template Status AsyncKeyValueConsumer::ConsumeLoop() { while (!consume_finished_) { - int32_t cur_batch_size = 0; std::vector key_value_vec; - key_value_vec.reserve(batch_size_); - while (!consume_finished_) { - std::optional kv; - if (!kv_queue_.try_pop(kv)) { - usleep(1); - continue; - } - if (!kv) { - consume_finished_ = true; - break; - } - key_value_vec.push_back(std::move(kv).value()); - cur_batch_size++; - if (cur_batch_size >= batch_size_) { - break; - } + if (!kv_queue_.try_pop(key_value_vec)) { + usleep(100); + continue; } - - if (cur_batch_size > 0) { - PAIMON_ASSIGN_OR_RAISE(R result, key_value_consumer_->NextBatch(key_value_vec)); - result_queue_.push(std::move(result)); + if (key_value_vec.empty()) { + // Empty batch is EOF signal; re-push for other consumers + kv_queue_.push(std::move(key_value_vec)); + break; } + PAIMON_ASSIGN_OR_RAISE(R result, key_value_consumer_->NextBatch(key_value_vec)); + result_queue_.push(std::move(result)); } consumer_finished_count_++; return Status::OK(); diff --git a/src/paimon/core/io/async_key_value_producer_and_consumer.h b/src/paimon/core/io/async_key_value_producer_and_consumer.h index 6a2fee67f..d60fe65a4 100644 --- a/src/paimon/core/io/async_key_value_producer_and_consumer.h +++ b/src/paimon/core/io/async_key_value_producer_and_consumer.h @@ -92,18 +92,17 @@ class AsyncKeyValueProducerAndConsumer { std::shared_future producer_future_; std::vector>> consumers_; std::atomic consumer_finished_count_ = 0; - tbb::concurrent_bounded_queue> kv_queue_; + tbb::concurrent_bounded_queue> kv_queue_; tbb::concurrent_bounded_queue result_queue_; }; template class AsyncKeyValueConsumer { public: - AsyncKeyValueConsumer(int32_t batch_size, - std::unique_ptr>&& key_value_consumer, + AsyncKeyValueConsumer(std::unique_ptr>&& key_value_consumer, std::atomic& consume_finished, std::atomic& consumer_finished_count, - tbb::concurrent_bounded_queue>& kv_queue, + tbb::concurrent_bounded_queue>& kv_queue, tbb::concurrent_bounded_queue& result_queue); ~AsyncKeyValueConsumer() { @@ -117,12 +116,11 @@ class AsyncKeyValueConsumer { Status ConsumeLoop(); private: - int32_t batch_size_; std::unique_ptr> key_value_consumer_; std::shared_future consumer_future_; std::atomic& consume_finished_; std::atomic& consumer_finished_count_; - tbb::concurrent_bounded_queue>& kv_queue_; + tbb::concurrent_bounded_queue>& kv_queue_; tbb::concurrent_bounded_queue& result_queue_; }; diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index 5a38352e8..eec3215cd 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -68,7 +68,7 @@ Result KeyValueDataFileRecordReader::Iterator::HasNext() const { Result KeyValueDataFileRecordReader::Iterator::Next() { // key is only used in merge sort; key context does not hold parent struct array - auto key = std::make_unique(reader_->key_ctx_, cursor_); + std::shared_ptr key = std::make_shared(reader_->key_ctx_, cursor_); // value is used in merge sort and projection (maybe async and multi-thread), so value context // holds parent struct array to ensure data remains valid auto value = std::make_unique(reader_->value_ctx_, cursor_); diff --git a/src/paimon/core/io/key_value_in_memory_record_reader.cpp b/src/paimon/core/io/key_value_in_memory_record_reader.cpp index a78d14578..64648e13d 100644 --- a/src/paimon/core/io/key_value_in_memory_record_reader.cpp +++ b/src/paimon/core/io/key_value_in_memory_record_reader.cpp @@ -45,7 +45,7 @@ Result KeyValueInMemoryRecordReader::Iterator::Next() { } // key must hold value_struct_array as min/max key may be used after projection - auto key = std::make_unique(reader_->key_ctx_, index); + std::shared_ptr key = std::make_shared(reader_->key_ctx_, index); auto value = std::make_unique(reader_->value_ctx_, index); return KeyValue(row_kind, reader_->last_sequence_num_ + index, /*level=*/KeyValue::UNKNOWN_LEVEL, std::move(key), std::move(value)); diff --git a/src/paimon/core/mergetree/spill_reader.cpp b/src/paimon/core/mergetree/spill_reader.cpp index 7b78782ee..c56f1da6e 100644 --- a/src/paimon/core/mergetree/spill_reader.cpp +++ b/src/paimon/core/mergetree/spill_reader.cpp @@ -76,7 +76,7 @@ Result SpillReader::Iterator::Next() { PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_))); int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_); - auto key = std::make_unique(reader_->key_ctx_, cursor_); + std::shared_ptr key = std::make_shared(reader_->key_ctx_, cursor_); auto value = std::make_unique(reader_->value_ctx_, cursor_); cursor_++; return KeyValue(row_kind, sequence_number, KeyValue::UNKNOWN_LEVEL, std::move(key),