Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ set(ARROW_SRCS
extension/bool8.cc
extension/json.cc
extension/parquet_variant.cc
extension/variant/encoding.cc
extension/variant/builder.cc
extension/uuid.cc
pretty_print.cc
record_batch.cc
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/extension/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ add_arrow_test(test
PREFIX
"arrow-canonical-extensions")

add_subdirectory(variant)

arrow_install_all_headers("arrow/extension")
2 changes: 2 additions & 0 deletions cpp/src/arrow/extension/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ exc = executable(
)
test('arrow-canonical-extensions-test', exc)

subdir('variant')

install_headers(
[
'bool8.h',
Expand Down
185 changes: 141 additions & 44 deletions cpp/src/arrow/extension/parquet_variant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,30 @@

#include "arrow/extension/parquet_variant.h"

#include <algorithm>
#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/logging_internal.h"

namespace arrow::extension {

VariantExtensionType::VariantExtensionType(const std::shared_ptr<DataType>& storage_type)
: ExtensionType(storage_type) {
// GH-45948: Shredded variants will need to handle an optional shredded_value as
// well as value_ becoming optional.

// IsSupportedStorageType should have been called already, asserting that both
// metadata and value are present.
if (storage_type->field(0)->name() == "metadata") {
metadata_ = storage_type->field(0);
value_ = storage_type->field(1);
} else {
value_ = storage_type->field(0);
metadata_ = storage_type->field(1);
// IsSupportedStorageType should have been called already, asserting that
// metadata is present and at least one of value / typed_value is present.
for (const auto& field : storage_type->fields()) {
if (field->name() == "metadata") {
metadata_ = field;
} else if (field->name() == "value") {
value_ = field;
} else if (field->name() == "typed_value") {
typed_value_ = field;
}
}
}

Expand All @@ -57,50 +59,148 @@ std::string VariantExtensionType::Serialize() const { return ""; }
std::shared_ptr<Array> VariantExtensionType::MakeArray(
std::shared_ptr<ArrayData> data) const {
DCHECK_EQ(data->type->id(), Type::EXTENSION);
DCHECK_EQ("arrow.parquet.variant",
DCHECK_EQ(kVariantExtensionName,
internal::checked_cast<const ExtensionType&>(*data->type).extension_name());
return std::make_shared<VariantArray>(data);
}

namespace {
bool IsBinaryField(const std::shared_ptr<Field> field) {
return field->type()->storage_id() == Type::BINARY ||
field->type()->storage_id() == Type::LARGE_BINARY;

bool IsSupportedPrimitiveTypedValue(const std::shared_ptr<DataType>& type) {
switch (type->id()) {
case Type::BOOL:
case Type::INT8:
case Type::INT16:
case Type::INT32:
case Type::INT64:
case Type::FLOAT:
case Type::DOUBLE:
case Type::DATE32:
case Type::BINARY:
case Type::LARGE_BINARY:
case Type::BINARY_VIEW:
case Type::STRING:
case Type::LARGE_STRING:
case Type::STRING_VIEW:
return true;
case Type::DECIMAL32:
case Type::DECIMAL64:
case Type::DECIMAL128: {
const auto& decimal = internal::checked_cast<const DecimalType&>(*type);
return decimal.scale() >= 0 && decimal.scale() <= decimal.precision();
}
case Type::TIME64:
return internal::checked_cast<const Time64Type&>(*type).unit() == TimeUnit::MICRO;
case Type::TIMESTAMP: {
const auto unit = internal::checked_cast<const TimestampType&>(*type).unit();
return unit == TimeUnit::MICRO || unit == TimeUnit::NANO;
}
case Type::FIXED_SIZE_BINARY:
return internal::checked_cast<const FixedSizeBinaryType&>(*type).byte_width() == 16;
case Type::EXTENSION: {
const auto& ext_type = internal::checked_cast<const ExtensionType&>(*type);
return ext_type.extension_name() == "arrow.uuid";
}
default:
return false;
}
}

bool IsSupportedTypedValue(const std::shared_ptr<Field>& field);

bool IsVariantFieldGroup(const std::shared_ptr<DataType>& type) {
if (type->id() != Type::STRUCT) {
return false;
}

std::shared_ptr<Field> value;
std::shared_ptr<Field> typed_value;
for (const auto& field : type->fields()) {
if (field->name() == "value") {
if (value != nullptr || !field->nullable() ||
!is_binary_or_binary_view(field->type()->storage_id())) {
return false;
}
value = field;
} else if (field->name() == "typed_value") {
if (typed_value != nullptr || !IsSupportedTypedValue(field)) {
return false;
}
typed_value = field;
} else {
return false;
}
}
return value != nullptr || typed_value != nullptr;
}

bool IsSupportedTypedValue(const std::shared_ptr<Field>& field) {
if (!field->nullable()) {
return false;
}
auto is_variant_field_group = [](const auto& field) {
return !field->nullable() && IsVariantFieldGroup(field->type());
};

switch (field->type()->id()) {
case Type::STRUCT:
return field->type()->num_fields() > 0 &&
std::ranges::all_of(field->type()->fields(), is_variant_field_group);
case Type::LIST:
case Type::LARGE_LIST:
case Type::LIST_VIEW:
case Type::LARGE_LIST_VIEW:
case Type::FIXED_SIZE_LIST:
return is_variant_field_group(field->type()->field(0));
default:
return IsSupportedPrimitiveTypedValue(field->type());
}
}

} // namespace

bool VariantExtensionType::IsSupportedStorageType(
const std::shared_ptr<DataType>& storage_type) {
// For now we only supported unshredded variants. Unshredded variant storage
// type should be a struct with a binary metadata and binary value.
//
// GH-45948: In shredded variants, the binary value field can be replaced
// with one or more of the following: object, array, typed_value, and
// variant_value.
if (storage_type->id() == Type::STRUCT) {
if (storage_type->num_fields() == 2) {
// Ordering of metadata and value fields does not matter, as we will assign
// these to the VariantExtensionType's member shared_ptrs in the constructor.
// Here we just need to check that they are both present.

const auto& field0 = storage_type->field(0);
const auto& field1 = storage_type->field(1);

bool metadata_and_value_present =
(field0->name() == "metadata" && field1->name() == "value") ||
(field1->name() == "metadata" && field0->name() == "value");

if (metadata_and_value_present) {
// Both metadata and value must be non-nullable binary types for unshredded
// variants. This will change in GH-46948, when we will require a Visitor
// to traverse the structure of the variant.
return IsBinaryField(field0) && IsBinaryField(field1) && !field0->nullable() &&
!field1->nullable();
if (storage_type->id() != Type::STRUCT) {
return false;
}

std::shared_ptr<Field> metadata;
std::shared_ptr<Field> value;
std::shared_ptr<Field> typed_value;

for (const auto& field : storage_type->fields()) {
if (field->name() == "metadata") {
if (metadata != nullptr || !is_binary_or_binary_view(field->type()->storage_id()) ||
field->nullable()) {
return false;
}
metadata = field;
} else if (field->name() == "value") {
if (value != nullptr || !is_binary_or_binary_view(field->type()->storage_id())) {
return false;
}
value = field;
} else if (field->name() == "typed_value") {
if (typed_value != nullptr || !IsSupportedTypedValue(field)) {
return false;
}
typed_value = field;
} else {
return false;
}
}

return false;
if (metadata == nullptr || (value == nullptr && typed_value == nullptr)) {
return false;
}
if (value == nullptr) {
return true;
}
if (typed_value == nullptr) {
return !value->nullable();
}
return value->nullable();
}

Result<std::shared_ptr<DataType>> VariantExtensionType::Make(
Expand All @@ -113,9 +213,6 @@ Result<std::shared_ptr<DataType>> VariantExtensionType::Make(
return std::make_shared<VariantExtensionType>(std::move(storage_type));
}

/// NOTE: this is still experimental. GH-45948 will add shredding support, at which point
/// we need to separate this into unshredded_variant and shredded_variant helper
/// functions.
std::shared_ptr<DataType> variant(std::shared_ptr<DataType> storage_type) {
return VariantExtensionType::Make(std::move(storage_type)).ValueOrDie();
}
Expand Down
22 changes: 20 additions & 2 deletions cpp/src/arrow/extension/parquet_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
#pragma once

#include <string>
#include <string_view>

#include "arrow/extension_type.h"
#include "arrow/util/visibility.h"

namespace arrow::extension {

/// \brief The extension name for the Variant extension type.
inline constexpr std::string_view kVariantExtensionName = "arrow.parquet.variant";

class ARROW_EXPORT VariantArray : public ExtensionArray {
public:
using ExtensionArray::ExtensionArray;
Expand All @@ -43,13 +47,25 @@ class ARROW_EXPORT VariantArray : public ExtensionArray {
/// To read more about variant encoding, see the variant encoding spec at
/// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
///
/// Shredded variant representation:
/// optional group shredded_variant_name (VARIANT) {
/// required binary metadata;
/// optional binary value;
/// optional <type> typed_value;
/// }
///
/// The value and typed_value fields are optional in the schema, but at least one
/// must be present.
///
/// To read more about variant shredding, see the variant shredding spec at
/// https://github.com/apache/parquet-format/blob/master/VariantShredding.md
class ARROW_EXPORT VariantExtensionType : public ExtensionType {
public:
explicit VariantExtensionType(const std::shared_ptr<DataType>& storage_type);

std::string extension_name() const override { return "arrow.parquet.variant"; }
std::string extension_name() const override {
return std::string(kVariantExtensionName);
}

bool ExtensionEquals(const ExtensionType& other) const override;

Expand All @@ -69,10 +85,12 @@ class ARROW_EXPORT VariantExtensionType : public ExtensionType {

std::shared_ptr<Field> value() const { return value_; }

std::shared_ptr<Field> typed_value() const { return typed_value_; }

private:
// TODO GH-45948 added shredded_value
std::shared_ptr<Field> metadata_;
std::shared_ptr<Field> value_;
std::shared_ptr<Field> typed_value_;
};

/// \brief Return a VariantExtensionType instance.
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/extension/variant/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

add_arrow_test(test
SOURCES
type_test.cc
encoding_test.cc
builder_test.cc
PREFIX
"arrow-variant-extension")

arrow_install_all_headers("arrow/extension/variant")
Loading
Loading