Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4a249b9
introduce parquet page filter
liangjie3138 Apr 14, 2026
434dd99
page level prebuffer
liangjie3138 Apr 14, 2026
5118f9e
remove trace
liangjie3138 Apr 14, 2026
63b5f1c
BucketSelectConverter support timestamp tyope & add ut
liangjie3138 Apr 15, 2026
27277b0
fix SetupCxxFlags.cmake
liangjie3138 Apr 15, 2026
efc80b1
Merge remote-tracking branch 'origin/main' into dev_parquet
liangjie3138 Apr 15, 2026
d889f1d
fix code style
liangjie3138 Apr 16, 2026
c729b3b
Merge branch 'main' into dev_parquet
liangjie3138 Apr 16, 2026
6343a61
remove bucket selector
liangjie3138 Apr 20, 2026
90a42fc
fix review
liangjie3138 Apr 21, 2026
d6a8499
add itcase
liangjie3138 Apr 21, 2026
1e93976
Merge remote-tracking branch 'origin/main' into dev_parquet
liangjie3138 Apr 23, 2026
5078e1f
fix bucket
liangjie3138 Apr 24, 2026
246ea68
fix style
liangjie3138 Apr 27, 2026
9521467
Merge branch 'main' into dev_parquet
liangjie3138 Apr 27, 2026
318f1b8
Merge branch 'main' into dev_parquet
lucasfang Apr 28, 2026
b44bedf
revert pending_futures_mutex_ in arrow input stream
liangjie3138 May 8, 2026
54cabd5
fix review
liangjie3138 May 8, 2026
7e74147
not copy chunks for page-filtered reader
liangjie3138 May 8, 2026
b6c6f8e
Merge remote-tracking branch 'origin/main' into dev_parquet
liangjie3138 May 8, 2026
c43ff66
perf: skip redundant PreBufferRanges when no page-filtered row groups
liangjie3138 May 8, 2026
adb0d76
refactor: drop per-RG metadata cache for page-filtered row groups
liangjie3138 May 8, 2026
58f277e
fix style & add ut
liangjie3138 May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions cmake_modules/arrow.diff
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,193 @@ index 4d3acb491e..3906ff3c59 100644
int64_t pagesize_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;

--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -210,6 +210,17 @@
::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const;

+ /// Pre-buffer arbitrary byte ranges (e.g., page-level ranges from OffsetIndex).
+ /// Unlike PreBuffer(), this does NOT set the column bitmap, so
+ /// GetColumnPageReader will use CachedInputStream (page-level cache path).
+ void PreBufferRanges(const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options);
+
+ /// Wait for arbitrary byte ranges to be pre-buffered.
+ ::arrow::Future<> WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const;
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;

--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -207,6 +207,100 @@
return {col_start, col_length};
}

+// CachedInputStream: InputStream adapter that reads through ReadRangeCache with
+// zero-cost skip for non-cached pages. Used for page-level caching where only
+// specific pages are pre-buffered.
+//
+// Key behavior:
+// - Read(): On cache hit, returns cached data. On cache miss, returns zero-filled
+// buffer (zero I/O). This makes InputStream::Advance() (which calls Read() and
+// discards) effectively free for skipped pages.
+// - Peek(): Always falls back to source on cache miss, because PageReader uses
+// Peek() to read Thrift page headers (~30 bytes) which must have real data.
+class CachedInputStream : public ::arrow::io::InputStream {
+ public:
+ CachedInputStream(
+ std::shared_ptr<::arrow::io::internal::ReadRangeCache> cache,
+ std::shared_ptr<ArrowInputFile> source,
+ int64_t offset, int64_t length)
+ : cache_(std::move(cache)),
+ source_(std::move(source)),
+ base_offset_(offset),
+ length_(length) {}
+
+ ::arrow::Status Close() override {
+ closed_ = true;
+ return ::arrow::Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ ::arrow::Result<int64_t> Tell() const override { return position_; }
+
+ ::arrow::Result<std::string_view> Peek(int64_t nbytes) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) {
+ return std::string_view();
+ }
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ peek_buffer_ = *result;
+ } else {
+ // Peek is used for Thrift page headers (~30 bytes) — must read real data
+ ARROW_ASSIGN_OR_RAISE(peek_buffer_,
+ source_->ReadAt(range.offset, range.length));
+ }
+ return std::string_view(
+ reinterpret_cast<const char*>(peek_buffer_->data()),
+ static_cast<size_t>(peek_buffer_->size()));
+ }
+
+ ::arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) return 0;
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ auto& buf = *result;
+ memcpy(out, buf->data(), static_cast<size_t>(buf->size()));
+ position_ += buf->size();
+ return buf->size();
+ }
+ // Cache miss: fall back to real I/O from source
+ ARROW_ASSIGN_OR_RAISE(auto buf, source_->ReadAt(range.offset, range.length));
+ memcpy(out, buf->data(), static_cast<size_t>(buf->size()));
+ position_ += buf->size();
+ return buf->size();
+ }
+
+ ::arrow::Result<std::shared_ptr<::arrow::Buffer>> Read(int64_t nbytes) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) {
+ return std::make_shared<::arrow::Buffer>(nullptr, 0);
+ }
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ position_ += (*result)->size();
+ return *result;
+ }
+ // Cache miss: fall back to real I/O from source
+ ARROW_ASSIGN_OR_RAISE(auto buf, source_->ReadAt(range.offset, range.length));
+ position_ += buf->size();
+ return std::shared_ptr<::arrow::Buffer>(std::move(buf));
+ }
+
+ private:
+ std::shared_ptr<::arrow::io::internal::ReadRangeCache> cache_;
+ std::shared_ptr<ArrowInputFile> source_;
+ int64_t base_offset_;
+ int64_t length_;
+ int64_t position_ = 0;
+ bool closed_ = false;
+ std::shared_ptr<::arrow::Buffer> peek_buffer_;
+};
+
// RowGroupReader::Contents implementation for the Parquet file specification
class SerializedRowGroup : public RowGroupReader::Contents {
public:
@@ -242,6 +336,11 @@
// segments.
PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
stream = std::make_shared<::arrow::io::BufferReader>(buffer);
+ } else if (cached_source_) {
+ // Page-level caching: read through cache with fallback to source.
+ // Advance() is zero-cost for skipped pages via data_page_filter.
+ stream = std::make_shared<CachedInputStream>(
+ cached_source_, source_, col_range.offset, col_range.length);
} else {
stream = properties_.GetStream(source_, col_range.offset, col_range.length);
}
@@ -417,6 +516,26 @@
return cached_source_->WaitFor(ranges);
}

+ void PreBufferRanges(const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
+ cached_source_ =
+ std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options);
+ // Do NOT set prebuffered_column_chunks_ bitmap — GetColumnPageReader will
+ // use CachedInputStream path instead of full-chunk BufferReader path.
+ prebuffered_column_chunks_.clear();
+ PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
+ }
+
+ ::arrow::Future<> WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const {
+ if (!cached_source_) {
+ return ::arrow::Status::Invalid(
+ "Must call PreBufferRanges before WhenBufferedRanges");
+ }
+ return cached_source_->WaitFor(ranges);
+ }
+
// Metadata/footer parsing. Divided up to separate sync/async paths, and to use
// exceptions for error handling (with the async path converting to Future/Status).

@@ -911,6 +1030,22 @@
return file->WhenBuffered(row_groups, column_indices);
}

+void ParquetFileReader::PreBufferRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
+ SerializedFile* file =
+ ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+ file->PreBufferRanges(ranges, ctx, options);
+}
+
+::arrow::Future<> ParquetFileReader::WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const {
+ SerializedFile* file =
+ ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+ return file->WhenBufferedRanges(ranges);
+}
+
// ----------------------------------------------------------------------
// File metadata helpers

diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"

#include <cstdint>
#include <functional>
#include <utility>

#include "arrow/api.h"
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/operation/key_value_file_store_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Result<std::unique_ptr<KeyValueFileStoreScan>> KeyValueFileStoreScan::Create(
scan->SplitAndSetFilter(table_schema->PartitionKeys(), arrow_schema, scan_filters));
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_pk, table_schema->TrimmedPrimaryKeys());
PAIMON_RETURN_NOT_OK(scan->SplitAndSetKeyValueFilter(trimmed_pk));

return scan;
}

Expand Down
9 changes: 8 additions & 1 deletion src/paimon/format/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ set(PAIMON_PARQUET_FILE_FORMAT
parquet_field_id_converter.cpp
predicate_converter.cpp
file_reader_wrapper.cpp
page_filtered_row_group_reader.cpp
parquet_timestamp_converter.cpp
parquet_file_batch_reader.cpp
parquet_file_format_factory.cpp
parquet_format_writer.cpp
parquet_schema_util.cpp
parquet_stats_extractor.cpp
parquet_writer_builder.cpp)
parquet_writer_builder.cpp
row_ranges.cpp
column_index_filter.cpp)

add_paimon_lib(paimon_parquet_file_format
SOURCES
${PAIMON_PARQUET_FILE_FORMAT}
DEPENDENCIES
paimon_shared
parquet
PRIVATE_INCLUDES
"${ARROW_SOURCE_DIR}/cpp/src"
STATIC_LINK_LIBS
parquet
arrow
Expand All @@ -46,6 +51,7 @@ if(PAIMON_BUILD_TESTS)
add_paimon_test(parquet_format_test
SOURCES
file_reader_wrapper_test.cpp
page_filtered_row_group_reader_test.cpp
parquet_timestamp_converter_test.cpp
parquet_field_id_converter_test.cpp
parquet_file_batch_reader_test.cpp
Expand All @@ -54,6 +60,7 @@ if(PAIMON_BUILD_TESTS)
parquet_writer_builder_test.cpp
predicate_converter_test.cpp
predicate_pushdown_test.cpp
column_index_filter_test.cpp
STATIC_LINK_LIBS
paimon_shared
test_utils_static
Expand Down
Loading
Loading