Skip to content

Kafka Connect: Support VARIANT when record convert#15283

Open
seokyun-ha-toss wants to merge 32 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector
Open

Kafka Connect: Support VARIANT when record convert#15283
seokyun-ha-toss wants to merge 32 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector

Conversation

@seokyun-ha-toss
Copy link
Copy Markdown

@seokyun-ha-toss seokyun-ha-toss commented Feb 10, 2026

Summary

Add support for converting arbitrary Java objects (e.g. Map<String, Object>, lists, temporal types(ex: Date, Instant), primitives) into Iceberg Variant type in the Kafka Connect RecordConverter. Nested maps and lists are converted recursively so that structures like {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} are correctly represented as a single Variant.

Motivation

Kafka Connect payloads often come as schema-less or JSON-like maps. To write them into Iceberg tables with a Variant column, the connector must convert these Java objects into the Variant format (metadata + value) and support nested maps/arrays without losing structure or key names.

Behaviour

Input Result
Primitives (String, int, long, boolean, etc.) Single metadata (empty) + corresponding Variant primitive.
Flat map e.g. {"a": 1, "b": "x"} One metadata with keys ["a", "b"], one ShreddedObject with two fields.
Nested map e.g. {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} One shared metadata for all keys; root and nested objects as ShreddedObjects with consistent field IDs.
Lists Converted to VariantArray with elements converted recursively.
Already Variant / ByteBuffer Pass-through or Variant.from(ByteBuffer) where appropriate.
Temporal types (java.time and related) Empty metadata at the root; values map to Variant primitives: Instant / OffsetDateTime / ZonedDateTime → timestamptz; LocalDateTime → timestampntz; LocalDate → date; LocalTime → time (micros/day encoding via DateTimeUtil).
java.util.Date (Connect runtime) Same empty metadata when valid; requires the Kafka Connect field schema so the logical type name can distinguish Timestamp, Time, and Date (all represented as java.util.Date at runtime). Without a matching logical type, conversion throws. Nested values receive schema from the parent map value schema or struct field schema when present.
Kafka Connect Struct Treated like a keyed object: one shared metadata with all field names collected from the struct tree (including nested maps/structs/lists); the struct becomes a ShreddedObject whose keys are connector field names. Each field is converted recursively with field.schema() so nested collections, maps, temporal fields, and inner structs keep correct typing.

Relates

Thanks, Good Day!

++ Thanks for contribute @brandonstanleyappfolio

protected Variant convertVariantValue(Object value) {
if (value instanceof Variant) {
return (Variant) value;
} else if (value instanceof ByteBuffer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is correct to assume this is encoded as a variant. It should be handled as a literal binary.

Copy link
Copy Markdown
Author

@seokyun-ha-toss seokyun-ha-toss Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15283 (comment)

same as this, handled together

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seokyun-ha-toss I still don't think this has been addressed.

I think it's fine to check and pass through a variant if it is an instance of a variant. I think it's unexpected as I noted in another comment (I suppose this could be done through an SMT and I believe Kafka may be introducing a variant type, but I'm not sure if that's a real scenario).

However, if we see a bytebuffer, I don't see why we would assume it's a variant as opposed to a literal binary. That is what the comment originally was intended to address, but it appears we're still treating it as a variant.

@alexkot1394
Copy link
Copy Markdown

Hi @seokyun-ha-toss,
I've opened a PR that's trying to solve the same issue: #15498
I'm happy to help with your PR to get this change into main. Let me know if you want to collaborate on your PR or where you're at with the review comments above.

@seokyun-ha-toss
Copy link
Copy Markdown
Author

Hi @seokyun-ha-toss, I've opened a PR that's trying to solve the same issue: #15498 I'm happy to help with your PR to get this change into main. Let me know if you want to collaborate on your PR or where you're at with the review comments above.

Hi @alexkot1394, thanks for reaching out.

I stepped away for a bit to validate my changes in our production environment.

I took a look at your PR as well. From what I see, it treats Variant as a string primitive, whereas I believe it should be handled recursively to properly construct the Variant structure.

I’d prefer to continue working on this PR and incorporate the review comments as soon as possible.

Thanks again!

@seokyun-ha-toss
Copy link
Copy Markdown
Author

Hello, @danielcweeks, @emkornfield, and @alexkot1394, I'm ready to get a review on this. I've tested this for a month in production, and it works well! I can query the output Iceberg tables with Variant columns using Spark and Snowflake.

Please take a look at the latest updates. Thanks!

@brandonstanleyappfolio
Copy link
Copy Markdown

brandonstanleyappfolio commented Mar 17, 2026

👋 Hi @seokyun-ha-toss, thanks for introducing this change! Is there a reason Struct types aren't included in this PR?

@seokyun-ha-toss
Copy link
Copy Markdown
Author

👋 Hi @seokyun-ha-toss, thanks for introducing this change! Is there a reason Struct types aren't included in this PR?

Good catch! I missed the case of org.apache.kafka.connect.data.Struct. I will handle it as well. Thanks!

@seokyun-ha-toss
Copy link
Copy Markdown
Author

Hello, @brandonstanleyappfolio. I've added Struct type handling and unit tests in the following two commits:

Thanks for pointing this out!

@brandonstanleyappfolio
Copy link
Copy Markdown

@seokyun-ha-toss Thank you - I’ve tested the changes locally, and everything is working as expected!

@seokyun-ha-toss
Copy link
Copy Markdown
Author

@danielcweeks @emkornfield Gentle ping — would appreciate your review when you have a moment. Thank you!

brandonstanleyappfolio and others added 11 commits March 31, 2026 18:16
…and Time logical types

Kafka Connect represents three distinct temporal logical types using the same
Java class (java.util.Date):
- org.apache.kafka.connect.data.Timestamp (milliseconds since epoch)
- org.apache.kafka.connect.data.Time (milliseconds since midnight)
- org.apache.kafka.connect.data.Date (days since epoch)

The existing primitiveToVariantValue method only checked `instanceof Date`
with no schema context, converting all java.util.Date values to Iceberg DATE
(days since epoch). This silently discarded the time component from Timestamp
fields (e.g. 2025-12-09T14:30:45.123Z became 2025-12-09).

Fix: thread the Kafka Connect schema through objectToVariantValue and
primitiveToVariantValue so the Date branch can inspect the schema's logical
type name and convert to the correct Iceberg variant type:
- Timestamp (logical name: org.apache.kafka.connect.data.Timestamp)
    -> Variants.ofTimestamptz (microseconds since epoch)
- Time (logical name: org.apache.kafka.connect.data.Time)
    -> Variants.ofTime (microseconds since midnight)
- Date (logical name: org.apache.kafka.connect.data.Date)
    -> Variants.ofDate (days since epoch)

The schema is propagated from Struct fields (field.schema()), Collection
elements (schema.valueSchema()), and Map values (schema.valueSchema()).
When no schema is available, an IllegalArgumentException is thrown to
prevent silent data loss.
Fix `java.util.Date` variant conversion losing precision for Timestamp and Time logical types
@seokyun-ha-toss
Copy link
Copy Markdown
Author

Hello, @emkornfield @brandonstanleyappfolio @danielcweeks @alexkot1394, all temporal types are handled in this PR, and I re-arrange the unittest to checking multiple primitive, mixed types and nested values. Please, take a look at 🙏 Many thanks!

@danielcweeks danielcweeks self-requested a review April 6, 2026 00:41
@seokyun-ha-toss
Copy link
Copy Markdown
Author

Good morning, @emkornfield . I've applied all the feedback and commented:

  • "throw an exception for non-string/empty keys on Map type."
  • LocalDateTime is treated as micro-seconds across the repo.

So please take a look again 🙏


protected Variant convertVariantValue(Object value) {
if (value instanceof ByteBuffer) {
return Variant.from((ByteBuffer) value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seokyun-ha-toss I still don't think this has been addressed.

I think it's fine to check and pass through a variant if it is an instance of a variant. I think it's unexpected as I noted in another comment (I suppose this could be done through an SMT, but I'm not sure if that's a real scenario).

However, if we see a bytebuffer, I don't see why we would assume it's a variant as opposed to a literal binary. That is what the comment originally was intended to address, but it appears we're still treating it as a variant.

Comment on lines +487 to +488
Set<String> fieldNames = Sets.newHashSet();
collectFieldNames(value, fieldNames);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have collectFiledNames return the Set<String> as opposed to creating it here and passing it in?

Map<?, ?> map = (Map<?, ?>) value;
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = entry.getKey();
if (key != null && key instanceof String) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key != null && is unnecessary here

schema != null ? schema.valueSchema() : null;
map.forEach(
(key, val) -> {
if (key != null && key instanceof String) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key != null && redundant here as well

return mapToVariantValue(value, metadata, schema);
}
if (value instanceof Struct) {
Struct struct = (Struct) value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're now on Java 17, so this should be a pattern variable.

return;
}
if (value instanceof Struct) {
Struct struct = (Struct) value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're now on Java 17, so this should be a pattern variable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants