Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ set(ICEBERG_SOURCES
partition_summary.cc
puffin/file_metadata.cc
puffin/puffin_format.cc
puffin/puffin_reader.cc
puffin/puffin_writer.cc
puffin/json_serde.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ iceberg_sources = files(
'puffin/file_metadata.cc',
'puffin/json_serde.cc',
'puffin/puffin_format.cc',
'puffin/puffin_reader.cc',
'puffin/puffin_writer.cc',
'row/arrow_array_wrapper.cc',
'row/manifest_wrapper.cc',
'row/partition_values.cc',
Expand Down
8 changes: 7 additions & 1 deletion src/iceberg/puffin/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
# under the License.

install_headers(
['file_metadata.h', 'puffin_format.h', 'type_fwd.h'],
[
'file_metadata.h',
'puffin_format.h',
'puffin_reader.h',
'puffin_writer.h',
'type_fwd.h',
],
subdir: 'iceberg/puffin',
)
24 changes: 12 additions & 12 deletions src/iceberg/puffin/puffin_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
std::unreachable();
}

} // namespace
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to move both Result<std::vector<std::byte>> Compress and Result<std::vector<std::byte>> Decompress out of the namespace?


bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
return (flags[byte_num] & (1 << bit_num)) != 0;
}

void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
flags[byte_num] |= (1 << bit_num);
}

// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
std::span<const std::byte> input) {
Expand Down Expand Up @@ -63,16 +75,4 @@ Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
std::unreachable();
}

} // namespace

bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
return (flags[byte_num] & (1 << bit_num)) != 0;
}

void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
flags[byte_num] |= (1 << bit_num);
}

} // namespace iceberg::puffin
10 changes: 10 additions & 0 deletions src/iceberg/puffin/puffin_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
/// Puffin file format constants and utilities.

#include <array>
#include <cstddef>
#include <cstdint>
#include <span>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/puffin/file_metadata.h"
Expand Down Expand Up @@ -66,4 +68,12 @@ ICEBERG_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag
/// \brief Set a flag in the flags bytes.
ICEBERG_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);

/// \brief Compress data using the specified codec.
ICEBERG_EXPORT Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
std::span<const std::byte> input);

/// \brief Decompress data using the specified codec.
ICEBERG_EXPORT Result<std::vector<std::byte>> Decompress(
PuffinCompressionCodec codec, std::span<const std::byte> input);

} // namespace iceberg::puffin
150 changes: 150 additions & 0 deletions src/iceberg/puffin/puffin_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/puffin/puffin_reader.h"

#include <algorithm>
#include <array>
#include <cstring>
#include <string_view>

#include "iceberg/puffin/json_serde_internal.h"
#include "iceberg/puffin/puffin_format.h"
#include "iceberg/util/endian.h"
#include "iceberg/util/macros.h"

namespace iceberg::puffin {

namespace {

// Validate magic bytes at the given offset.
Status CheckMagic(std::span<const std::byte> data, int64_t offset) {
if (offset < 0 ||
offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) {
return Invalid("Invalid file: cannot read magic at offset {}", offset);
}
auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset);
if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) {
return Invalid("Invalid file: expected magic at offset {}", offset);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also save the value of incorrect magic number into Invalid for easy debugging.

}
return {};
}

} // namespace

PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {}

Result<FileMetadata> PuffinReader::ReadFileMetadata() {
auto file_size = static_cast<int64_t>(data_.size());

if (file_size < PuffinFormat::kFooterStructLength) {
return Invalid("Invalid file: file length {} is less than minimal footer size {}",
file_size, PuffinFormat::kFooterStructLength);
}

// Read footer struct from end of file
auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength;

// Validate footer end magic
ICEBERG_RETURN_UNEXPECTED(
CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset));

// Read payload size from footer struct
auto payload_size = ReadLittleEndian<int32_t>(
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset);

if (payload_size < 0) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should payload_size == 0 also be considered as an error.

return Invalid("Invalid file: negative payload size {}", payload_size);
}

// Calculate total footer size and validate
int64_t footer_size = PuffinFormat::kFooterStartMagicLength +
static_cast<int64_t>(payload_size) +
PuffinFormat::kFooterStructLength;
auto footer_offset = file_size - footer_size;
if (footer_offset < 0) {
return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size,
file_size);
}

// Validate footer start magic
ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset));

// Check flags for footer compression
std::array<uint8_t, 4> flags{};
std::memcpy(
flags.data(),
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4);
Comment on lines +91 to +93
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::memcpy(
flags.data(),
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4);
std::memcpy(
flags.data(),
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset,
4);


PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone;
if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate that all unknown/reserved footer flag bits are unset. Java's PuffinReader rejects unknown flags, and the spec says reserved bits should be 0; silently ignoring them may cause C++ to accept future or invalid files and interpret the footer incorrectly.

footer_compression = PuffinFormat::kDefaultFooterCompressionCodec;
}

// Extract footer payload
auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength;
std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size);
const std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size);

ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes,
Decompress(footer_compression, payload_span));
Comment on lines +69 to +104
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload_size is already validated above:

if (payload_size < 0) {
  return Invalid("Invalid file: negative payload size {}", payload_size);
}


// Parse JSON
std::string_view json_str(reinterpret_cast<const char*>(payload_bytes.data()),
payload_bytes.size());
ICEBERG_ASSIGN_OR_RAISE(auto file_metadata, FileMetadataFromJsonString(json_str));

// Validate header magic
ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, 0));
Comment on lines +111 to +112
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this validation to the beginning of this method?


return file_metadata;
}

Result<std::pair<BlobMetadata, std::vector<std::byte>>> PuffinReader::ReadBlob(
const BlobMetadata& blob_metadata) {
auto file_size = static_cast<int64_t>(data_.size());

if (blob_metadata.offset < 0 || blob_metadata.length < 0 ||
blob_metadata.offset > file_size ||
blob_metadata.length > file_size - blob_metadata.offset) {
return Invalid("Invalid blob: offset {} + length {} exceeds file size {}",
blob_metadata.offset, blob_metadata.length, file_size);
}

std::span<const std::byte> raw_data(data_.data() + blob_metadata.offset,
blob_metadata.length);

// Determine compression codec
ICEBERG_ASSIGN_OR_RAISE(
auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec));
ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data));

return std::pair{blob_metadata, std::move(decompressed)};
}

Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>>
PuffinReader::ReadAll(const std::vector<BlobMetadata>& blobs) {
std::vector<std::pair<BlobMetadata, std::vector<std::byte>>> results;
results.reserve(blobs.size());
for (const auto& blob : blobs) {
ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(blob));
results.push_back(std::move(blob_pair));
}
return results;
}

} // namespace iceberg::puffin
66 changes: 66 additions & 0 deletions src/iceberg/puffin/puffin_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/puffin/puffin_reader.h
/// Puffin file reader.

#include <cstddef>
#include <cstdint>
#include <span>
#include <utility>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/puffin/file_metadata.h"
#include "iceberg/result.h"

namespace iceberg::puffin {

/// \brief Reader for Puffin files.
///
/// Parses a Puffin file from an in-memory buffer. Usage:
/// PuffinReader reader(file_data);
/// auto metadata = reader.ReadFileMetadata();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be the recommended approach unless error handling has been added. Otherwise users may silently ignore the error and call metadata.value() unconsciously.

/// auto blob = reader.ReadBlob(metadata.value().blobs[0]);
class ICEBERG_EXPORT PuffinReader {
public:
/// \brief Construct a reader from file data.
explicit PuffinReader(std::span<const std::byte> data);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue of this ctor is that we lose the capability of seeking into a segment of a puffin file, which is the main use case of v3 deletion vector.


/// \brief Read and return the file metadata from the footer.
Result<FileMetadata> ReadFileMetadata();

/// \brief Read a specific blob's data by its metadata.
/// \param blob_metadata The metadata describing the blob to read.
/// \return A pair of (BlobMetadata, decompressed data), or an error.
Result<std::pair<BlobMetadata, std::vector<std::byte>>> ReadBlob(
const BlobMetadata& blob_metadata);

/// \brief Read all blobs described in the file metadata.
/// \return A vector of (BlobMetadata, decompressed data) pairs, or an error.
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>> ReadAll(
const std::vector<BlobMetadata>& blobs);

private:
std::span<const std::byte> data_;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::span<const std::byte> data_;
const std::span<const std::byte> data_;

};

} // namespace iceberg::puffin
Loading
Loading