Skip to content
Merged
3 changes: 2 additions & 1 deletion include/paimon/data/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class PAIMON_EXPORT Blob {
/// @param metadata A map of key-value metadata to be attached to the field.
/// @return A result containing a unique pointer to the generated `ArrowSchema` or an error.
static Result<std::unique_ptr<::ArrowSchema>> ArrowField(
const std::string& field_name, std::unordered_map<std::string, std::string> metadata = {});
const std::string& field_name, bool nullable = false,
std::unordered_map<std::string, std::string> metadata = {});

private:
class Impl;
Expand Down
2 changes: 1 addition & 1 deletion include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ struct PAIMON_EXPORT Options {
/// "partition.legacy-name" - The legacy partition name is using `ToString` for all types. If
/// false, using casting to string for all types. Default value is "true".
static const char PARTITION_GENERATE_LEGACY_NAME[];
/// "blob-as-descriptor" - Read and write blob field using blob descriptor rather than blob
/// "blob-as-descriptor" - Read blob field using blob descriptor rather than blob
/// bytes. Default value is "false".
static const char BLOB_AS_DESCRIPTOR[];
/// "blob-field" - Specifies column names that should be stored as blob type. This is used
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ set(PAIMON_CORE_SRCS
core/bucket/hive_bucket_function.cpp
core/bucket/mod_bucket_function.cpp
core/bucket/bucket_id_calculator.cpp
core/casting/binary_to_blob_cast_executor.cpp
core/casting/binary_to_string_cast_executor.cpp
core/casting/boolean_to_decimal_cast_executor.cpp
core/casting/boolean_to_numeric_cast_executor.cpp
Expand Down Expand Up @@ -222,6 +223,7 @@ set(PAIMON_CORE_SRCS
core/io/key_value_meta_projection_consumer.cpp
core/io/key_value_projection_consumer.cpp
core/io/key_value_projection_reader.cpp
core/io/external_storage_blob_writer.cpp
core/io/multiple_blob_file_writer.cpp
core/io/rolling_blob_file_writer.cpp
core/manifest/file_kind.cpp
Expand Down Expand Up @@ -601,6 +603,7 @@ if(PAIMON_BUILD_TESTS)
core/io/file_index_evaluator_test.cpp
core/io/single_file_writer_test.cpp
core/io/rolling_blob_file_writer_test.cpp
core/io/external_storage_blob_writer_test.cpp
core/global_index/indexed_split_test.cpp
core/manifest/file_source_test.cpp
core/manifest/file_kind_test.cpp
Expand Down
5 changes: 3 additions & 2 deletions src/paimon/common/data/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ Result<PAIMON_UNIQUE_PTR<Bytes>> Blob::ToData(const std::shared_ptr<FileSystem>&
}

Result<std::unique_ptr<ArrowSchema>> Blob::ArrowField(
const std::string& field_name, std::unordered_map<std::string, std::string> metadata) {
auto blob_field = BlobUtils::ToArrowField(field_name, /*nullable=*/false, metadata);
const std::string& field_name, bool nullable,
std::unordered_map<std::string, std::string> metadata) {
auto blob_field = BlobUtils::ToArrowField(field_name, nullable, metadata);
auto field = std::make_unique<::ArrowSchema>();
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportField(*blob_field, field.get()));
return field;
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/blob_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace paimon {
/// | 13 + N | offset | long | 8 |
/// | 21 + N | length | long | 8 |

class BlobDescriptor {
class PAIMON_EXPORT BlobDescriptor {
public:
static Result<std::unique_ptr<BlobDescriptor>> Create(const std::string& uri, int64_t offset,
int64_t length);
Expand Down
14 changes: 5 additions & 9 deletions src/paimon/common/data/blob_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,38 +144,34 @@ TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) {
}

TEST_F(BlobTest, TestArrowField) {
{
// basic: field name, non-nullable by default
ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("my_blob"));
for (bool nullable : {false, true}) {
ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("my_blob", nullable));
ASSERT_NE(schema, nullptr);

// import back to arrow::Field to verify
auto field_result = arrow::ImportField(schema.get());
ASSERT_TRUE(field_result.ok());
auto field = field_result.ValueUnsafe();

ASSERT_EQ(field->name(), "my_blob");
ASSERT_EQ(field->type()->id(), arrow::Type::LARGE_BINARY);
ASSERT_FALSE(field->nullable());
ASSERT_EQ(field->nullable(), nullable);
ASSERT_TRUE(field->HasMetadata());
auto extension_type = field->metadata()->Get("paimon.extension.type");
ASSERT_TRUE(extension_type.ok());
ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
}
{
// with custom metadata
std::unordered_map<std::string, std::string> custom_metadata = {
{"custom_key", "custom_value"}};
ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("meta_blob", custom_metadata));
ASSERT_OK_AND_ASSIGN(auto schema,
Blob::ArrowField("meta_blob", /*nullable=*/false, custom_metadata));
auto field = arrow::ImportField(schema.get()).ValueUnsafe();
ASSERT_EQ(field->name(), "meta_blob");
ASSERT_FALSE(field->nullable());
ASSERT_TRUE(field->HasMetadata());
// blob extension metadata should be present
auto extension_type = field->metadata()->Get("paimon.extension.type");
ASSERT_TRUE(extension_type.ok());
ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
// custom metadata should also be present
auto custom_val = field->metadata()->Get("custom_key");
ASSERT_TRUE(custom_val.ok());
ASSERT_EQ(custom_val.ValueUnsafe(), "custom_value");
Expand Down
110 changes: 92 additions & 18 deletions src/paimon/common/data/blob_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,77 @@
#include "paimon/common/data/blob_utils.h"

#include <cstddef>
#include <utility>
#include <set>
#include <vector>

#include "arrow/api.h"
#include "arrow/array/array_nested.h"
#include "arrow/type.h"
#include "fmt/format.h"
#include "paimon/common/data/blob_defs.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/arrow/status_utils.h"
Comment thread
lxy-9602 marked this conversation as resolved.
#include "paimon/common/utils/string_utils.h"

namespace arrow {
class Array;
}

namespace paimon {

BlobUtils::SeparatedSchemas BlobUtils::SeparateBlobSchema(
const std::shared_ptr<arrow::Schema>& schema) {
std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
const std::shared_ptr<arrow::Schema>& schema, const std::set<std::string>& inline_fields) {
std::vector<std::shared_ptr<arrow::Field>> main_fields;
std::vector<std::shared_ptr<arrow::Field>> blob_fields;
for (auto i = 0; i < schema->num_fields(); i++) {
for (int32_t i = 0; i < schema->num_fields(); i++) {
auto field = schema->field(i);
Comment thread
lxy-9602 marked this conversation as resolved.
if (IsBlobField(field)) {
if (IsBlobField(field) && inline_fields.count(field->name()) == 0) {
// Non-inline BLOB -> goes to blob file
blob_fields.emplace_back(field);
} else {
remaining_fields.emplace_back(field);
// Non-blob fields OR inline BLOB fields -> stay in main
main_fields.emplace_back(field);
}
}
SeparatedSchemas result;
result.main_schema = arrow::schema(remaining_fields);
result.main_schema = arrow::schema(main_fields);
result.blob_schema = arrow::schema(blob_fields);
return result;
}

Result<BlobUtils::SeparatedStructArrays> BlobUtils::SeparateBlobArray(
const std::shared_ptr<arrow::StructArray>& struct_array) {
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_fields) {
std::shared_ptr<arrow::StructType> old_type =
std::static_pointer_cast<arrow::StructType>(struct_array->type());
const auto& old_fields = old_type->fields();
const auto& old_arrays = struct_array->fields();

std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
std::vector<std::shared_ptr<arrow::Array>> remaining_arrays;
std::vector<std::shared_ptr<arrow::Field>> blob_fields;
std::vector<std::shared_ptr<arrow::Array>> blob_arrays;
arrow::ArrayVector main_arrays;
arrow::ArrayVector blob_arrays;
arrow::FieldVector main_fields;
arrow::FieldVector blob_fields;

for (size_t i = 0; i < old_fields.size(); i++) {
if (IsBlobField(old_fields[i])) {
if (IsBlobField(old_fields[i]) && inline_fields.count(old_fields[i]->name()) == 0) {
blob_fields.push_back(old_fields[i]);
blob_arrays.push_back(old_arrays[i]);
} else {
remaining_fields.push_back(old_fields[i]);
remaining_arrays.push_back(old_arrays[i]);
main_fields.push_back(old_fields[i]);
main_arrays.push_back(old_arrays[i]);
}
}

if (blob_fields.empty()) {
return Status::Invalid(
"SeparateBlobArray expects at least one non-inline blob field, but got none.");
}
if (main_fields.empty()) {
return Status::Invalid("SeparateBlobArray expects at least one main field, but got none.");
}

Comment thread
lxy-9602 marked this conversation as resolved.
SeparatedStructArrays result;
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.main_array,
arrow::StructArray::Make(remaining_arrays, remaining_fields));
arrow::StructArray::Make(main_arrays, main_fields));
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.blob_array,
arrow::StructArray::Make(blob_arrays, blob_fields));
return result;
Expand Down Expand Up @@ -114,4 +126,66 @@ std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
return arrow::field(field_name, arrow::large_binary(), nullable,
std::make_shared<arrow::KeyValueMetadata>(metadata));
}

Status BlobUtils::ValidateInlineBlobDescriptors(
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_descriptor_fields) {
if (inline_descriptor_fields.empty()) {
return Status::OK();
}
if (!struct_array) {
return Status::Invalid("array in ValidateInlineBlobDescriptors must be a struct_array");
}
for (const auto& field_name : inline_descriptor_fields) {
auto field_array = struct_array->GetFieldByName(field_name);
if (!field_array) {
continue;
}
const auto* binary_array =
arrow::internal::checked_cast<const arrow::LargeBinaryArray*>(field_array.get());
if (!binary_array) {
return Status::Invalid(
fmt::format("cannot cast array for field {} to LargeBinaryArray", field_name));
}
for (int64_t row = 0; row < binary_array->length(); ++row) {
if (binary_array->IsNull(row)) {
continue;
}
auto value = binary_array->GetView(row);
PAIMON_ASSIGN_OR_RAISE(bool is_descriptor,
BlobDescriptor::IsBlobDescriptor(value.data(), value.size()));
if (!is_descriptor) {
return Status::Invalid(fmt::format(
"BLOB inline field {} configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.",
field_name));
}
}
}
return Status::OK();
}

std::vector<DataField> BlobUtils::ConvertBlobInlineDataFields(
const std::vector<DataField>& data_fields, const std::vector<std::string>& blob_inline_fields) {
if (blob_inline_fields.empty()) {
return data_fields;
}

std::set<std::string> blob_inline_field_set(blob_inline_fields.begin(),
blob_inline_fields.end());
std::vector<DataField> converted_fields;
converted_fields.reserve(data_fields.size());
for (const auto& data_field : data_fields) {
if (blob_inline_field_set.find(data_field.Name()) == blob_inline_field_set.end()) {
converted_fields.push_back(data_field);
continue;
}

auto binary_field = arrow::field(data_field.Name(), arrow::binary(), data_field.Nullable(),
data_field.ArrowField()->metadata());
converted_fields.emplace_back(data_field.Id(), binary_field, data_field.Description());
}
return converted_fields;
}

} // namespace paimon
36 changes: 30 additions & 6 deletions src/paimon/common/data/blob_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#pragma once

#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>

#include "paimon/result.h"
#include "paimon/visibility.h"
Expand All @@ -30,6 +32,10 @@ class Schema;
class StructArray;
} // namespace arrow

namespace paimon {
class DataField;
} // namespace paimon

namespace paimon {
/// Utils for blob type.
class PAIMON_EXPORT BlobUtils {
Expand All @@ -38,23 +44,29 @@ class PAIMON_EXPORT BlobUtils {
~BlobUtils() = delete;

struct SeparatedSchemas {
/// Non-blob fields
/// Non-blob fields (includes inline blob fields when inline_fields is provided)
std::shared_ptr<arrow::Schema> main_schema;
/// Blob fields only
/// Blob fields that go to separate .blob files
std::shared_ptr<arrow::Schema> blob_schema;
};

struct SeparatedStructArrays {
/// Non-blob fields
/// Non-blob fields (includes inline blob fields when inline_fields is provided)
std::shared_ptr<arrow::StructArray> main_array;
/// Blob fields only
/// Blob fields that go to separate .blob files
std::shared_ptr<arrow::StructArray> blob_array;
};

static SeparatedSchemas SeparateBlobSchema(const std::shared_ptr<arrow::Schema>& schema);
/// Separates schema with inline field awareness.
/// BLOB fields in inline_fields stay in main_schema; others go to blob_schema.
static SeparatedSchemas SeparateBlobSchema(const std::shared_ptr<arrow::Schema>& schema,
const std::set<std::string>& inline_fields);

/// Separates array with inline field awareness.
/// BLOB fields in inline_fields stay in main_array; others go to blob_array.
static Result<SeparatedStructArrays> SeparateBlobArray(
const std::shared_ptr<arrow::StructArray>& struct_array);
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_fields);

static bool IsBlobField(const std::shared_ptr<arrow::Field>& field);
static bool IsBlobMetadata(const std::shared_ptr<const arrow::KeyValueMetadata>& metadata);
Expand All @@ -63,6 +75,18 @@ class PAIMON_EXPORT BlobUtils {
static std::shared_ptr<arrow::Field> ToArrowField(
const std::string& field_name, bool nullable = false,
std::unordered_map<std::string, std::string> metadata = {});

static Status ValidateInlineBlobDescriptors(
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_descriptor_fields);

/// Converts inline blob DataFields from large_binary to binary type.
/// Inline blob fields use large_binary in the table schema (because they are BLOB type),
/// but are stored as binary in data files. This conversion aligns the field type with
/// the actual on-disk storage format for correct reading.
static std::vector<DataField> ConvertBlobInlineDataFields(
const std::vector<DataField>& data_fields,
const std::vector<std::string>& blob_inline_fields);
};

} // namespace paimon
Loading
Loading