Kafka Connect: Support VARIANT when record convert#15283
Kafka Connect: Support VARIANT when record convert#15283seokyun-ha-toss wants to merge 32 commits intoapache:mainfrom
Conversation
…ersion methods for nested structures
| protected Variant convertVariantValue(Object value) { | ||
| if (value instanceof Variant) { | ||
| return (Variant) value; | ||
| } else if (value instanceof ByteBuffer) { |
There was a problem hiding this comment.
I don't think it is correct to assume this is encoded as a variant. It should be handled as a literal binary.
There was a problem hiding this comment.
same as this, handled together
There was a problem hiding this comment.
@seokyun-ha-toss I still don't think this has been addressed.
I think it's fine to check and pass through a variant if it is an instance of a variant. I think it's unexpected as I noted in another comment (I suppose this could be done through an SMT and I believe Kafka may be introducing a variant type, but I'm not sure if that's a real scenario).
However, if we see a bytebuffer, I don't see why we would assume it's a variant as opposed to a literal binary. That is what the comment originally was intended to address, but it appears we're still treating it as a variant.
|
Hi @seokyun-ha-toss, |
Hi @alexkot1394, thanks for reaching out. I stepped away for a bit to validate my changes in our production environment. I took a look at your PR as well. From what I see, it treats Variant as a string primitive, whereas I believe it should be handled recursively to properly construct the Variant structure. I’d prefer to continue working on this PR and incorporate the review comments as soon as possible. Thanks again! |
…Set for uniqueness
… for null, primitive types, lists, maps, and mixed types
|
Hello, @danielcweeks, @emkornfield, and @alexkot1394, I'm ready to get a review on this. I've tested this for a month in production, and it works well! I can query the output Iceberg tables with Variant columns using Spark and Snowflake. Please take a look at the latest updates. Thanks! |
|
👋 Hi @seokyun-ha-toss, thanks for introducing this change! Is there a reason |
Good catch! I missed the case of |
|
Hello, @brandonstanleyappfolio. I've added Thanks for pointing this out! |
|
@seokyun-ha-toss Thank you - I’ve tested the changes locally, and everything is working as expected! |
|
@danielcweeks @emkornfield Gentle ping — would appreciate your review when you have a moment. Thank you! |
…and Time logical types
Kafka Connect represents three distinct temporal logical types using the same
Java class (java.util.Date):
- org.apache.kafka.connect.data.Timestamp (milliseconds since epoch)
- org.apache.kafka.connect.data.Time (milliseconds since midnight)
- org.apache.kafka.connect.data.Date (days since epoch)
The existing primitiveToVariantValue method only checked `instanceof Date`
with no schema context, converting all java.util.Date values to Iceberg DATE
(days since epoch). This silently discarded the time component from Timestamp
fields (e.g. 2025-12-09T14:30:45.123Z became 2025-12-09).
Fix: thread the Kafka Connect schema through objectToVariantValue and
primitiveToVariantValue so the Date branch can inspect the schema's logical
type name and convert to the correct Iceberg variant type:
- Timestamp (logical name: org.apache.kafka.connect.data.Timestamp)
-> Variants.ofTimestamptz (microseconds since epoch)
- Time (logical name: org.apache.kafka.connect.data.Time)
-> Variants.ofTime (microseconds since midnight)
- Date (logical name: org.apache.kafka.connect.data.Date)
-> Variants.ofDate (days since epoch)
The schema is propagated from Struct fields (field.schema()), Collection
elements (schema.valueSchema()), and Map values (schema.valueSchema()).
When no schema is available, an IllegalArgumentException is thrown to
prevent silent data loss.
Fix `java.util.Date` variant conversion losing precision for Timestamp and Time logical types
|
Hello, @emkornfield @brandonstanleyappfolio @danielcweeks @alexkot1394, all temporal types are handled in this PR, and I re-arrange the unittest to checking multiple primitive, mixed types and nested values. Please, take a look at 🙏 Many thanks! |
|
Good morning, @emkornfield . I've applied all the feedback and commented:
So please take a look again 🙏 |
|
|
||
| protected Variant convertVariantValue(Object value) { | ||
| if (value instanceof ByteBuffer) { | ||
| return Variant.from((ByteBuffer) value); |
There was a problem hiding this comment.
@seokyun-ha-toss I still don't think this has been addressed.
I think it's fine to check and pass through a variant if it is an instance of a variant. I think it's unexpected as I noted in another comment (I suppose this could be done through an SMT, but I'm not sure if that's a real scenario).
However, if we see a bytebuffer, I don't see why we would assume it's a variant as opposed to a literal binary. That is what the comment originally was intended to address, but it appears we're still treating it as a variant.
| Set<String> fieldNames = Sets.newHashSet(); | ||
| collectFieldNames(value, fieldNames); |
There was a problem hiding this comment.
Why not have collectFiledNames return the Set<String> as opposed to creating it here and passing it in?
| Map<?, ?> map = (Map<?, ?>) value; | ||
| for (Map.Entry<?, ?> entry : map.entrySet()) { | ||
| Object key = entry.getKey(); | ||
| if (key != null && key instanceof String) { |
There was a problem hiding this comment.
key != null && is unnecessary here
| schema != null ? schema.valueSchema() : null; | ||
| map.forEach( | ||
| (key, val) -> { | ||
| if (key != null && key instanceof String) { |
There was a problem hiding this comment.
key != null && redundant here as well
| return mapToVariantValue(value, metadata, schema); | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; |
There was a problem hiding this comment.
We're now on Java 17, so this should be a pattern variable.
| return; | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; |
There was a problem hiding this comment.
We're now on Java 17, so this should be a pattern variable.
Summary
Add support for converting arbitrary Java objects (e.g.
Map<String, Object>, lists, temporal types(ex:Date,Instant), primitives) into Iceberg Variant type in the Kafka Connect RecordConverter. Nested maps and lists are converted recursively so that structures like{"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}}are correctly represented as a single Variant.Motivation
Kafka Connect payloads often come as schema-less or JSON-like maps. To write them into Iceberg tables with a Variant column, the connector must convert these Java objects into the Variant format (metadata + value) and support nested maps/arrays without losing structure or key names.
Behaviour
{"a": 1, "b": "x"}["a", "b"], one ShreddedObject with two fields.{"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}}Variant.from(ByteBuffer)where appropriate.java.timeand related)Instant/OffsetDateTime/ZonedDateTime→ timestamptz;LocalDateTime→ timestampntz;LocalDate→ date;LocalTime→ time (micros/day encoding viaDateTimeUtil).java.util.Date(Connect runtime)java.util.Dateat runtime). Without a matching logical type, conversion throws. Nested values receive schema from the parent map value schema or struct field schema when present.Structfield.schema()so nested collections, maps, temporal fields, and inner structs keep correct typing.Relates
Thanks, Good Day!
++ Thanks for contribute @brandonstanleyappfolio