Skip to content

[AURON #1850] Add ArrowFieldWriters for temporal and composite types#2086

Open
x-tong wants to merge 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2b
Open

[AURON #1850] Add ArrowFieldWriters for temporal and composite types#2086
x-tong wants to merge 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2b

Conversation

@x-tong
Copy link
Contributor

@x-tong x-tong commented Mar 11, 2026

Which issue does this PR close?

Partially addresses #1850 (Part 2b of the Flink RowData to Arrow conversion).

Rationale for this change

Part 2a (#2079) implemented ArrowFieldWriter base class, 12 basic type writers, and FlinkArrowWriter orchestrator. This PR completes the remaining 5 writer types (Time, Timestamp, Array, Map, Row), enabling full coverage of all Flink logical types supported by the Arrow type mapping introduced in Part 1 (#1959).

The implementation follows Flink's official flink-python Arrow module as established in Part 2a, with the same forRow()/forArray() dual-mode factory pattern and template method design.

What changes are included in this PR?

Commit 1: 5 ArrowFieldWriters + unit tests (10 files, +1509 lines)

  • TimeWriter — Handles all 4 Arrow time precisions (TimeSecVector, TimeMilliVector, TimeMicroVector, TimeNanoVector) via instanceof dispatch. Flink stores TIME as int (milliseconds), converted to each precision with L-suffixed literals to avoid int overflow.
  • TimestampWriter — Handles all 4 Arrow timestamp precisions. Combines TimestampData.getMillisecond() (long) and getNanoOfMillisecond() (int) for sub-millisecond precision. Constructor validates timezone == null via Preconditions.checkState, matching Flink official — timezone is not handled at the writer layer.
  • ArrayWriter — Delegates to an elementWriter (ArrowFieldWriter<ArrayData>) for each array element. Overrides finish()/reset() to propagate to the element writer.
  • MapWriter — Arrow maps are List<Struct{key, value}>. Holds separate key and value writers operating on ArrayData. Sets structVector.setIndexDefined() for each entry. Overrides finish()/reset() to propagate to key/value writers.
  • RowWriter — Nested struct handling with ArrowFieldWriter<RowData>[] for child fields. Caches a nullRow (GenericRowData) in the constructor for null struct handling (avoids per-call allocation). Uses a single child-write loop for both null and non-null paths, matching Flink official.
  • Unit tests: TimeWriterTest (8), TimestampWriterTest (9), ArrayWriterTest (5), MapWriterTest (3), RowWriterTest (3) — 28 tests covering all precisions, null handling, reset/multi-batch, edge cases (pre-epoch timestamps, empty arrays/maps).

Commit 2: Factory method extension + integration test (2 files, +158 lines)

  • FlinkArrowUtils — Extended createArrowFieldWriterForRow() and createArrowFieldWriterForArray() with branches for TimeWriter, TimestampWriter, ArrayWriter, MapWriter, RowWriter. MapVector check is placed before ListVector (since MapVector extends ListVector). Timestamp branch extracts precision from both TimestampType and LocalZonedTimestampType.
  • FlinkArrowWriterTest — Added testWriteTemporalAndComplexTypes integration test covering TIME(6), TIMESTAMP(6), TIMESTAMP_LTZ(3), ARRAY<INT>, MAP<VARCHAR, INT>, ROW<nested_id INT>. Updated testUnsupportedTypeThrows to use MultisetType (since ArrayType is now supported).

Scope

This PR completes all Flink-to-Arrow writer types. The remaining work for #1850 is the reverse direction (Arrow-to-Flink reader), which is tracked separately.

Are there any user-facing changes?

No. Internal API for Flink integration.

How was this patch tested?

36 tests across 6 test classes (28 new + 8 existing):

./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
  -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative

Result: 36 pass, 0 failures.

x-tong added 2 commits March 12, 2026 00:28
…s with tests

- TimeWriter: 4 Arrow time precisions (sec/milli/micro/nano)
- TimestampWriter: 4 Arrow timestamp precisions with timezone==null validation
- ArrayWriter: delegates to element writer for List vectors
- MapWriter: key/value writers for Map vectors (List<Struct{key,value}>)
- RowWriter: nested struct with cached nullRow for null handling
- All writers follow forRow()/forArray() dual-mode factory pattern
…al and composite types

- Add createArrowFieldWriter branches for Time, Timestamp, Array, Map, Row
- MapVector check placed before ListVector (MapVector extends ListVector)
- Add integration test covering TIME, TIMESTAMP, TIMESTAMP_LTZ, ARRAY, MAP, ROW
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR expands the Flink→Arrow writing support in auron-flink-runtime by adding Arrow field writers for additional logical types (temporal + complex/nested) and adding unit tests to validate the new writers and factory wiring.

Changes:

  • Add new ArrowFieldWriter implementations for TIME, TIMESTAMP, ARRAY/LIST, MAP, and nested ROW/STRUCT Arrow vectors.
  • Extend FlinkArrowUtils writer factory methods to instantiate the new writers for both RowData and ArrayData.
  • Add focused unit tests for each new writer and enhance FlinkArrowWriterTest to cover temporal + complex type end-to-end writing.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java Adds TIME writer supporting sec/milli/micro/nano Arrow time vectors.
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java Adds TIMESTAMP writer supporting sec/milli/micro/nano Arrow timestamp vectors.
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java Adds ListVector writer for Flink ARRAY types.
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java Adds MapVector writer for Flink MAP types.
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java Adds StructVector writer for nested Flink ROW types.
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java Wires the new writers into the Row/Array writer factories.
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java New unit tests for TIME writer across precisions, nulls, arrays, reset.
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java New unit tests for TIMESTAMP writer across precisions, nulls, pre-epoch, arrays, reset.
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java New unit tests for ARRAY/ListVector writer (non-null, null, empty, reset).
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java New unit tests for MAP/MapVector writer (non-null, null, empty).
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java New unit tests for nested ROW/StructVector writer (non-null, null, multiple).
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java Updates unsupported-type test and adds end-to-end write coverage for temporal + complex types.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +58 to +62
public void doWrite(T in, int ordinal) {
if (!isNullAt(in, ordinal)) {
((ListVector) getValueVector()).startNewValue(getCount());
ArrayData array = readArray(in, ordinal);
for (int i = 0; i < array.size(); i++) {
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayWriter.doWrite() does nothing when the input value is null. For ListVector this is unsafe: you still need to explicitly mark the current index as null (and ensure offsets for index+1 are updated), otherwise old validity/offset buffer contents from a previous batch can leak after reset(), producing incorrect non-null reads or invalid offsets. Add an explicit null branch (e.g., listVector.setNull(getCount()) / setSafeNull-style handling) so every written position is defined.

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +81
if (!isNullAt(in, ordinal)) {
((MapVector) getValueVector()).startNewValue(getCount());

StructVector structVector = (StructVector) ((MapVector) getValueVector()).getDataVector();
MapData map = readMap(in, ordinal);
ArrayData keys = map.keyArray();
ArrayData values = map.valueArray();
for (int i = 0; i < map.size(); i++) {
structVector.setIndexDefined(keyWriter.getCount());
keyWriter.write(keys, i);
valueWriter.write(values, i);
}

((MapVector) getValueVector()).endValue(getCount(), map.size());
}
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MapWriter.doWrite() skips writing entirely when the input map is null. For MapVector/ListVector semantics this can leave the current position undefined with stale validity/offsets from earlier writes (especially across reset()), causing incorrect non-null values or invalid offsets. Add an explicit null branch that marks the current index as null (and keeps offsets consistent) before returning.

Suggested change
if (!isNullAt(in, ordinal)) {
((MapVector) getValueVector()).startNewValue(getCount());
StructVector structVector = (StructVector) ((MapVector) getValueVector()).getDataVector();
MapData map = readMap(in, ordinal);
ArrayData keys = map.keyArray();
ArrayData values = map.valueArray();
for (int i = 0; i < map.size(); i++) {
structVector.setIndexDefined(keyWriter.getCount());
keyWriter.write(keys, i);
valueWriter.write(values, i);
}
((MapVector) getValueVector()).endValue(getCount(), map.size());
}
MapVector mapVector = (MapVector) getValueVector();
// Explicitly mark null entries to avoid leaving stale validity/offset state.
if (isNullAt(in, ordinal)) {
mapVector.setNull(getCount());
return;
}
mapVector.startNewValue(getCount());
StructVector structVector = (StructVector) mapVector.getDataVector();
MapData map = readMap(in, ordinal);
ArrayData keys = map.keyArray();
ArrayData values = map.valueArray();
for (int i = 0; i < map.size(); i++) {
structVector.setIndexDefined(keyWriter.getCount());
keyWriter.write(keys, i);
valueWriter.write(values, i);
}
mapVector.endValue(getCount(), map.size());

Copilot uses AI. Check for mistakes.
Comment on lines +61 to +66
public void doWrite(T in, int ordinal) {
ValueVector vector = getValueVector();
if (isNullAt(in, ordinal)) {
((BaseFixedWidthVector) vector).setNull(getCount());
} else {
int millis = readTime(in, ordinal);
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeWriter currently assumes the provided ValueVector is a time vector: it unconditionally casts to BaseFixedWidthVector for nulls, and if the vector isn't one of the 4 supported Time*Vector types the non-null branch will silently write nothing. It would be safer to validate the vector type up-front (similar to TimestampWriter's Preconditions check) and/or throw an exception in the final else branch to avoid silent data loss or ClassCastException when misused.

Copilot uses AI. Check for mistakes.
@Tartarus0zm Tartarus0zm self-requested a review March 12, 2026 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants