Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion src/paimon/common/data/columnar/columnar_batch_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

#pragma once

#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>

#include "arrow/array/array_base.h"
#include "arrow/util/bit_util.h"

namespace arrow {
class StructArray;
Expand All @@ -28,12 +31,105 @@ class StructArray;
namespace paimon {
class MemoryPool;

/// Pre-cached column metadata for fast access without virtual function calls or checked_cast.
struct CachedColumnMeta {
/// Null bitmap pointer. nullptr means no nulls (all valid).
const uint8_t* null_bitmap = nullptr;
/// Arrow array offset (for sliced arrays).
int64_t array_offset = 0;
/// For fixed-width types: raw pointer to values buffer (buffer[1]).
/// For variable-length types (STRING/BINARY): raw pointer to data buffer (buffer[2]).
const uint8_t* values_data = nullptr;
/// For variable-length types (STRING/BINARY): raw pointer to offsets buffer (buffer[1]).
const int32_t* offsets = nullptr;

/// Fast null check: directly reads validity bitmap bit.
inline bool IsNull(int64_t row_id) const {
return null_bitmap != nullptr &&
!arrow::bit_util::GetBit(null_bitmap, array_offset + row_id);
}

/// Fast fixed-width value access (INT8/INT16/INT32/INT64/FLOAT/DOUBLE/DATE32/TIMESTAMP).
template <typename T>
inline T GetFixed(int64_t row_id) const {
return reinterpret_cast<const T*>(values_data)[array_offset + row_id];
}

/// Fast boolean value access (bit-packed in values buffer).
inline bool GetBool(int64_t row_id) const {
return arrow::bit_util::GetBit(values_data, array_offset + row_id);
}

/// Fast string_view access for non-dictionary STRING/BINARY columns.
inline std::string_view GetVarLenView(int64_t row_id) const {
int64_t idx = array_offset + row_id;
int32_t start = offsets[idx];
int32_t length = offsets[idx + 1] - start;
return {reinterpret_cast<const char*>(values_data) + start, static_cast<size_t>(length)};
}
};

struct ColumnarBatchContext {
ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
const std::shared_ptr<MemoryPool>& pool_in)
: pool(pool_in), array_vec(array_vec_in) {}
: pool(pool_in), array_vec(array_vec_in) {
BuildCachedMeta();
}

std::shared_ptr<MemoryPool> pool;
arrow::ArrayVector array_vec;
/// Pre-cached metadata per column for fast access.
std::vector<CachedColumnMeta> cached_meta;
Comment thread
lxy-9602 marked this conversation as resolved.

private:
void BuildCachedMeta() {
cached_meta.resize(array_vec.size());
for (size_t i = 0; i < array_vec.size(); i++) {
const auto* array = array_vec[i].get();
auto& meta = cached_meta[i];
meta.array_offset = array->offset();

// Cache null bitmap
if (array->null_count() != 0) {
meta.null_bitmap = array->null_bitmap_data();
}

// Cache data pointers based on type
const auto& array_data = array->data();
switch (array->type_id()) {
case arrow::Type::BOOL:
case arrow::Type::INT8:
case arrow::Type::INT16:
case arrow::Type::INT32:
case arrow::Type::DATE32:
case arrow::Type::INT64:
case arrow::Type::FLOAT:
case arrow::Type::DOUBLE:
case arrow::Type::TIMESTAMP:
case arrow::Type::DECIMAL128:
// Fixed-width: values in buffer[1]
if (array_data->buffers.size() > 1 && array_data->buffers[1]) {
meta.values_data = array_data->buffers[1]->data();
}
break;
case arrow::Type::STRING:
case arrow::Type::BINARY:
// Variable-length: offsets in buffer[1], data in buffer[2]
if (array_data->buffers.size() > 2) {
if (array_data->buffers[1]) {
meta.offsets =
reinterpret_cast<const int32_t*>(array_data->buffers[1]->data());
}
if (array_data->buffers[2]) {
meta.values_data = array_data->buffers[2]->data();
}
}
break;
default:
// DICTIONARY, LIST, MAP, STRUCT — not cached, use array_vec fallback
break;
}
}
}
};
} // namespace paimon
35 changes: 17 additions & 18 deletions src/paimon/common/data/columnar/columnar_row_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ namespace paimon {
class Bytes;

/// Columnar row view which shares batch-level context to reduce per-row overhead.
/// Uses pre-cached column metadata for fast field access without virtual function calls.
class ColumnarRowRef : public InternalRow {
public:
ColumnarRowRef(std::shared_ptr<ColumnarBatchContext> ctx, int64_t row_id)
: ctx_(std::move(ctx)), row_id_(row_id) {}
: ctx_(std::move(ctx)), cached_meta_ptr_(ctx_->cached_meta.data()), row_id_(row_id) {}

Result<const RowKind*> GetRowKind() const override {
return row_kind_;
Expand All @@ -55,47 +56,39 @@ class ColumnarRowRef : public InternalRow {
}

bool IsNullAt(int32_t pos) const override {
return ctx_->array_vec[pos]->IsNull(row_id_);
return cached_meta_ptr_[pos].IsNull(row_id_);
}

bool GetBoolean(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetBool(row_id_);
}

char GetByte(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_vec[pos].get(),
row_id_);
return static_cast<char>(cached_meta_ptr_[pos].GetFixed<int8_t>(row_id_));
}

int16_t GetShort(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetFixed<int16_t>(row_id_);
}

int32_t GetInt(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetFixed<int32_t>(row_id_);
}

int32_t GetDate(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
ctx_->array_vec[pos].get(), row_id_);
return cached_meta_ptr_[pos].GetFixed<int32_t>(row_id_);
}

int64_t GetLong(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetFixed<int64_t>(row_id_);
}

float GetFloat(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetFixed<float>(row_id_);
}

double GetDouble(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_vec[pos].get(),
row_id_);
return cached_meta_ptr_[pos].GetFixed<double>(row_id_);
}

BinaryString GetString(int32_t pos) const override {
Expand All @@ -105,6 +98,11 @@ class ColumnarRowRef : public InternalRow {
}

std::string_view GetStringView(int32_t pos) const override {
auto& meta = cached_meta_ptr_[pos];
if (meta.values_data && meta.offsets) {
return meta.GetVarLenView(row_id_);
}
// Fallback for dictionary-encoded or uncached types
return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
}

Expand All @@ -129,6 +127,7 @@ class ColumnarRowRef : public InternalRow {

private:
std::shared_ptr<ColumnarBatchContext> ctx_;
const CachedColumnMeta* cached_meta_ptr_;
const RowKind* row_kind_ = RowKind::Insert();
int64_t row_id_;
};
Expand Down
66 changes: 31 additions & 35 deletions src/paimon/core/io/async_key_value_producer_and_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ AsyncKeyValueProducerAndConsumer<T, R>::AsyncKeyValueProducerAndConsumer(
pool_(pool),
sort_merge_reader_(std::move(sort_merge_reader)),
create_consumer_(std::move(create_consumer)) {
kv_queue_.set_capacity(batch_size);
kv_queue_.set_capacity(consumer_thread_num * 2);
result_queue_.set_capacity(RESULT_BATCH_COUNT);
}

Expand Down Expand Up @@ -79,8 +79,8 @@ Result<R> AsyncKeyValueProducerAndConsumer<T, R>::NextBatch() {
Result<std::unique_ptr<RowToArrowArrayConverter<T, R>>> consumer = create_consumer_();
PAIMON_RETURN_NOT_OK(consumer.status());
auto async_consumer = std::make_unique<AsyncKeyValueConsumer<T, R>>(
batch_size_ / consumer_thread_num_, std::move(consumer).value(), consume_finished_,
consumer_finished_count_, kv_queue_, result_queue_);
std::move(consumer).value(), consume_finished_, consumer_finished_count_, kv_queue_,
result_queue_);
consumers_.push_back(std::move(async_consumer));
}
}
Expand All @@ -107,24 +107,33 @@ Result<R> AsyncKeyValueProducerAndConsumer<T, R>::NextBatch() {

template <typename T, typename R>
Status AsyncKeyValueProducerAndConsumer<T, R>::ProduceLoop() {
std::vector<KeyValue> batch;
batch.reserve(batch_size_);
while (!consume_finished_) {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<SortMergeReader::Iterator> iterator,
sort_merge_reader_->NextBatch());
if (iterator == nullptr) {
// all iterator is all visited
kv_queue_.push(std::nullopt);
break;
}
while (!consume_finished_) {
PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext());
if (!has_next) {
// current iterator is all visited
break;
}
std::optional<KeyValue> kv = std::move(iterator->Next());
kv_queue_.push(std::move(kv));
batch.push_back(std::move(iterator->Next()));
if (static_cast<int32_t>(batch.size()) >= batch_size_) {
kv_queue_.push(std::move(batch));
batch = std::vector<KeyValue>();
batch.reserve(batch_size_);
Comment thread
lxy-9602 marked this conversation as resolved.
}
}
}
// Push remaining rows
if (!batch.empty()) {
kv_queue_.push(std::move(batch));
}
// Push empty batch as EOF signal
kv_queue_.push(std::vector<KeyValue>());
return Status::OK();
}

Expand Down Expand Up @@ -157,8 +166,8 @@ void AsyncKeyValueProducerAndConsumer<T, R>::CleanUpQueue() {
}
}

std::optional<KeyValue> kv;
while (kv_queue_.try_pop(kv)) {
std::vector<KeyValue> kv_batch;
while (kv_queue_.try_pop(kv_batch)) {
}
}

Expand All @@ -167,12 +176,11 @@ template class AsyncKeyValueProducerAndConsumer<KeyValue, KeyValueBatch>;

template <typename T, typename R>
AsyncKeyValueConsumer<T, R>::AsyncKeyValueConsumer(
int32_t batch_size, std::unique_ptr<RowToArrowArrayConverter<T, R>>&& key_value_consumer,
std::unique_ptr<RowToArrowArrayConverter<T, R>>&& key_value_consumer,
std::atomic<bool>& consume_finished, std::atomic<int32_t>& consumer_finished_count,
tbb::concurrent_bounded_queue<std::optional<KeyValue>>& kv_queue,
tbb::concurrent_bounded_queue<std::vector<KeyValue>>& kv_queue,
tbb::concurrent_bounded_queue<R>& result_queue)
: batch_size_(batch_size),
key_value_consumer_(std::move(key_value_consumer)),
: key_value_consumer_(std::move(key_value_consumer)),
consume_finished_(consume_finished),
consumer_finished_count_(consumer_finished_count),
kv_queue_(kv_queue),
Expand All @@ -195,30 +203,18 @@ Status AsyncKeyValueConsumer<T, R>::GetStatus() const {
template <typename T, typename R>
Status AsyncKeyValueConsumer<T, R>::ConsumeLoop() {
while (!consume_finished_) {
int32_t cur_batch_size = 0;
std::vector<KeyValue> key_value_vec;
key_value_vec.reserve(batch_size_);
while (!consume_finished_) {
std::optional<KeyValue> kv;
if (!kv_queue_.try_pop(kv)) {
usleep(1);
continue;
}
if (!kv) {
consume_finished_ = true;
break;
}
key_value_vec.push_back(std::move(kv).value());
cur_batch_size++;
if (cur_batch_size >= batch_size_) {
break;
}
if (!kv_queue_.try_pop(key_value_vec)) {
usleep(100);
continue;
}

if (cur_batch_size > 0) {
PAIMON_ASSIGN_OR_RAISE(R result, key_value_consumer_->NextBatch(key_value_vec));
result_queue_.push(std::move(result));
if (key_value_vec.empty()) {
// Empty batch is EOF signal; re-push for other consumers
kv_queue_.push(std::move(key_value_vec));
break;
}
PAIMON_ASSIGN_OR_RAISE(R result, key_value_consumer_->NextBatch(key_value_vec));
result_queue_.push(std::move(result));
}
consumer_finished_count_++;
return Status::OK();
Expand Down
10 changes: 4 additions & 6 deletions src/paimon/core/io/async_key_value_producer_and_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,17 @@ class AsyncKeyValueProducerAndConsumer {
std::shared_future<Status> producer_future_;
std::vector<std::unique_ptr<AsyncKeyValueConsumer<T, R>>> consumers_;
std::atomic<int32_t> consumer_finished_count_ = 0;
tbb::concurrent_bounded_queue<std::optional<KeyValue>> kv_queue_;
tbb::concurrent_bounded_queue<std::vector<KeyValue>> kv_queue_;
tbb::concurrent_bounded_queue<R> result_queue_;
};

template <typename T, typename R>
class AsyncKeyValueConsumer {
public:
AsyncKeyValueConsumer(int32_t batch_size,
std::unique_ptr<RowToArrowArrayConverter<T, R>>&& key_value_consumer,
AsyncKeyValueConsumer(std::unique_ptr<RowToArrowArrayConverter<T, R>>&& key_value_consumer,
std::atomic<bool>& consume_finished,
std::atomic<int32_t>& consumer_finished_count,
tbb::concurrent_bounded_queue<std::optional<KeyValue>>& kv_queue,
tbb::concurrent_bounded_queue<std::vector<KeyValue>>& kv_queue,
tbb::concurrent_bounded_queue<R>& result_queue);

~AsyncKeyValueConsumer() {
Expand All @@ -117,12 +116,11 @@ class AsyncKeyValueConsumer {
Status ConsumeLoop();

private:
int32_t batch_size_;
std::unique_ptr<RowToArrowArrayConverter<T, R>> key_value_consumer_;
std::shared_future<Status> consumer_future_;
std::atomic<bool>& consume_finished_;
std::atomic<int32_t>& consumer_finished_count_;
tbb::concurrent_bounded_queue<std::optional<KeyValue>>& kv_queue_;
tbb::concurrent_bounded_queue<std::vector<KeyValue>>& kv_queue_;
tbb::concurrent_bounded_queue<R>& result_queue_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/io/key_value_data_file_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Result<bool> KeyValueDataFileRecordReader::Iterator::HasNext() const {

Result<KeyValue> KeyValueDataFileRecordReader::Iterator::Next() {
// key is only used in merge sort; key context does not hold parent struct array
auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, cursor_);
std::shared_ptr<InternalRow> key = std::make_shared<ColumnarRowRef>(reader_->key_ctx_, cursor_);
// value is used in merge sort and projection (maybe async and multi-thread), so value context
// holds parent struct array to ensure data remains valid
auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_, cursor_);
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/io/key_value_in_memory_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Result<KeyValue> KeyValueInMemoryRecordReader::Iterator::Next() {
}

// key must hold value_struct_array as min/max key may be used after projection
auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, index);
std::shared_ptr<InternalRow> key = std::make_shared<ColumnarRowRef>(reader_->key_ctx_, index);
auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_, index);
return KeyValue(row_kind, reader_->last_sequence_num_ + index,
/*level=*/KeyValue::UNKNOWN_LEVEL, std::move(key), std::move(value));
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/mergetree/spill_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Result<KeyValue> SpillReader::Iterator::Next() {
PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind,
RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_)));
int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_);
auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, cursor_);
std::shared_ptr<InternalRow> key = std::make_shared<ColumnarRowRef>(reader_->key_ctx_, cursor_);
auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_, cursor_);
cursor_++;
return KeyValue(row_kind, sequence_number, KeyValue::UNKNOWN_LEVEL, std::move(key),
Expand Down
Loading