Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/paimon/data/timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class PAIMON_EXPORT Timestamp {
nano_of_millisecond_ == other.nano_of_millisecond_;
}

bool operator!=(const Timestamp& other) const {
return !(*this == other);
}

bool operator<(const Timestamp& other) const {
if (millisecond_ == other.millisecond_) {
return nano_of_millisecond_ < other.nano_of_millisecond_;
Expand Down
38 changes: 38 additions & 0 deletions include/paimon/disk/io_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <memory>
#include <string>

#include "paimon/result.h"
#include "paimon/visibility.h"

namespace paimon {
/// The facade for the provided disk I/O services.
class PAIMON_EXPORT IOManager {
public:
virtual ~IOManager() = default;
static std::unique_ptr<IOManager> Create(const std::string& tmp_dir);

/// @return Temp directory path.
virtual const std::string& GetTempDir() const = 0;

virtual Result<std::string> GenerateTempFilePath(const std::string& prefix) const = 0;
};
} // namespace paimon
2 changes: 1 addition & 1 deletion include/paimon/reader/file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
using BatchReader::NextBatchWithBitmap;

/// Get the row number of the first row in the previously read batch.
virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0;
virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0;

/// Get the number of rows in the file.
virtual Result<uint64_t> GetNumberOfRows() const = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ set(PAIMON_COMMON_SRCS
common/utils/string_utils.cpp)

set(PAIMON_CORE_SRCS
core/disk/io_manager.cpp
core/append/append_only_writer.cpp
core/append/bucketed_append_compact_manager.cpp
core/casting/binary_to_string_cast_executor.cpp
Expand Down Expand Up @@ -233,6 +234,8 @@ set(PAIMON_CORE_SRCS
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
core/mergetree/merge_tree_writer.cpp
core/mergetree/levels.cpp
core/mergetree/lookup_levels.cpp
core/migrate/file_meta_utils.cpp
core/operation/data_evolution_file_store_scan.cpp
core/operation/data_evolution_split_read.cpp
Expand Down Expand Up @@ -537,6 +540,9 @@ if(PAIMON_BUILD_TESTS)
core/manifest/partition_entry_test.cpp
core/manifest/file_entry_test.cpp
core/manifest/index_manifest_entry_serializer_test.cpp
core/mergetree/levels_test.cpp
core/mergetree/lookup_file_test.cpp
core/mergetree/lookup_levels_test.cpp
core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp
core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp
core/mergetree/compact/aggregate/field_bool_agg_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/timestamp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ TEST_F(TimestampTest, EqualityOperator) {
Timestamp ts3(1622547800000, 654321);
ASSERT_EQ(ts1, ts1);
ASSERT_EQ(ts1, ts2);
ASSERT_FALSE(ts1 == ts3);
ASSERT_NE(ts1, ts3);
}

TEST_F(TimestampTest, LessThanOperator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "arrow/c/helpers.h"
#include "paimon/common/reader/reader_utils.h"
#include "paimon/file_index/bitmap_index_result.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/reader/file_batch_reader.h"
#include "paimon/result.h"
#include "paimon/status.h"
Expand All @@ -34,7 +33,7 @@
namespace paimon {
class Metrics;

class ApplyBitmapIndexBatchReader : public BatchReader {
class ApplyBitmapIndexBatchReader : public FileBatchReader {
public:
ApplyBitmapIndexBatchReader(std::unique_ptr<FileBatchReader>&& reader, RoaringBitmap32&& bitmap)
: reader_(std::move(reader)), bitmap_(std::move(bitmap)) {
Expand Down Expand Up @@ -72,10 +71,31 @@ class ApplyBitmapIndexBatchReader : public BatchReader {
return reader_->GetReaderMetrics();
}

Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
return reader_->GetFileSchema();
}

Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap) override {
return Status::Invalid("ApplyBitmapIndexBatchReader does not support SetReadSchema");
}

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return reader_->GetPreviousBatchFirstRowNumber();
}

Result<uint64_t> GetNumberOfRows() const override {
return reader_->GetNumberOfRows();
}

bool SupportPreciseBitmapSelection() const override {
return reader_->SupportPreciseBitmapSelection();
}

private:
Result<RoaringBitmap32> Filter(int32_t batch_size) const {
RoaringBitmap32 is_valid;
int32_t start_pos = reader_->GetPreviousBatchFirstRowNumber();
PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, reader_->GetPreviousBatchFirstRowNumber());
int32_t length = batch_size;
for (auto iter = bitmap_.EqualOrLarger(start_pos);
iter != bitmap_.End() && *iter < start_pos + length; ++iter) {
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/reader/delegating_prefetch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DelegatingPrefetchReader : public FileBatchReader {
return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap);
}

uint64_t GetPreviousBatchFirstRowNumber() const override {
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return GetReader()->GetPreviousBatchFirstRowNumber();
}

Expand Down
5 changes: 3 additions & 2 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
Status PrefetchFileBatchReaderImpl::HandleReadResult(
size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
ReadBatchWithBitmap&& read_batch_with_bitmap) {
uint64_t first_row_number = readers_[reader_idx]->GetPreviousBatchFirstRowNumber();
PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
auto& prefetch_queue = prefetch_queues_[reader_idx];
if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
auto& [read_batch, bitmap] = read_batch_with_bitmap;
Expand Down Expand Up @@ -570,7 +571,7 @@ Result<std::unique_ptr<::ArrowSchema>> PrefetchFileBatchReaderImpl::GetFileSchem
return readers_[0]->GetFileSchema();
}

uint64_t PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
return previous_batch_first_row_num_;
}

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
const std::optional<RoaringBitmap32>& selection_bitmap) override;

Status SeekToRow(uint64_t row_number) override;
uint64_t GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetNumberOfRows() const override;
uint64_t GetNextRowToRead() const override;
void Close() override;
Expand Down
34 changes: 17 additions & 17 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 101);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));
}
Expand Down Expand Up @@ -396,11 +396,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 101);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));
}
Expand All @@ -424,11 +424,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) {
}

arrow::ArrayVector result_array_vector;
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap());
auto& [batch, bitmap] = batch_with_bitmap;
ASSERT_EQ(batch.first->length, bitmap.Cardinality());
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 0);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
ASSERT_OK_AND_ASSIGN(auto array, ReadResultCollector::GetArray(std::move(batch)));
result_array_vector.push_back(array);
ASSERT_OK(prefetch_reader->GetReadStatus());
Expand Down Expand Up @@ -469,9 +469,9 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) {
->SetNextBatchStatus(Status::IOError("mock error"));
}

ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
auto batch_result = reader->NextBatchWithBitmap();
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_FALSE(batch_result.ok());
ASSERT_TRUE(batch_result.status().IsIOError());
ASSERT_FALSE(prefetch_reader->is_shutdown_);
Expand All @@ -480,7 +480,7 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) {

// call NextBatch again, will still return error status
auto batch_result2 = reader->NextBatchWithBitmap();
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_FALSE(batch_result2.ok());
ASSERT_TRUE(batch_result2.status().IsIOError());
}
Expand All @@ -497,11 +497,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) {
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 0);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
ASSERT_FALSE(result_array);
}

Expand All @@ -517,11 +517,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) {
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 10);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));

Expand Down Expand Up @@ -623,11 +623,11 @@ TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCom
PreparePrefetchReader(file_format, schema.get(), predicate,
/*selection_bitmap=*/std::nullopt,
/*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 90);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);

arrow::ArrayVector expected_array_vector;
expected_array_vector.push_back(data_array->Slice(0, 30));
Expand Down Expand Up @@ -659,11 +659,11 @@ TEST_P(PrefetchFileBatchReaderImplTest,
/*selection_bitmap=*/std::nullopt,
/*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode);
ASSERT_OK(reader->RefreshReadRanges());
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), -1);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber(), 90);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);

arrow::ArrayVector expected_array_vector;
expected_array_vector.push_back(data_array->Slice(0, 20));
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/sst/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class AlignedBlockReader : public BlockReader {
class UnAlignedBlockReader : public BlockReader {
public:
UnAlignedBlockReader(const std::shared_ptr<MemorySlice>& data,
std::shared_ptr<MemorySlice>& index,
const std::shared_ptr<MemorySlice>& index,
MemorySlice::SliceComparator comparator)
: BlockReader(data, index->Length() / 4, std::move(comparator)), index_(index) {}

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/sst/sst_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Result<std::shared_ptr<Bytes>> SstFileReader::Lookup(const std::shared_ptr<Bytes
auto key_slice = MemorySlice::Wrap(key);
// seek the index to the block containing the key
auto index_block_iterator = index_block_reader_->Iterator();
PAIMON_ASSIGN_OR_RAISE(bool _, index_block_iterator->SeekTo(key_slice));
PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool _, index_block_iterator->SeekTo(key_slice));
// if indexIterator does not have a next, it means the key does not exist in this iterator
if (index_block_iterator->HasNext()) {
// seek the current iterator to the key
Expand Down
18 changes: 18 additions & 0 deletions src/paimon/common/utils/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ void ArrowUtils::TraverseArray(const std::shared_ptr<arrow::Array>& array) {
}
}

bool ArrowUtils::EqualsIgnoreNullable(const std::shared_ptr<arrow::DataType>& type,
const std::shared_ptr<arrow::DataType>& other_type) {
if (type->id() != other_type->id() || type->num_fields() != other_type->num_fields()) {
return false;
}
for (int32_t i = 0; i < type->num_fields(); ++i) {
const auto& field = type->field(i);
const auto& other_field = other_type->field(i);
if (field->name() != other_field->name()) {
return false;
}
if (!EqualsIgnoreNullable(field->type(), other_field->type())) {
return false;
}
}
return true;
}

Status ArrowUtils::InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
const std::shared_ptr<arrow::Array>& data) {
if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) {
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class PAIMON_EXPORT ArrowUtils {
static Result<std::shared_ptr<arrow::StructArray>> RemoveFieldFromStructArray(
const std::shared_ptr<arrow::StructArray>& struct_array, const std::string& field_name);

static bool EqualsIgnoreNullable(const std::shared_ptr<arrow::DataType>& type,
const std::shared_ptr<arrow::DataType>& other_type);

private:
static Status InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
const std::shared_ptr<arrow::Array>& data);
Expand Down
Loading
Loading