Skip to content

[spark] support map type#2740

Open
YannByron wants to merge 2 commits intoapache:mainfrom
YannByron:main-spark-mapType
Open

[spark] support map type#2740
YannByron wants to merge 2 commits intoapache:mainfrom
YannByron:main-spark-mapType

Conversation

@YannByron
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2673

Brief change log

Tests

API and Format

Documentation

@YannByron YannByron changed the title [spark] to support map type [spark] support map type Feb 27, 2026
@YannByron
Copy link
Copy Markdown
Contributor Author

@wuchong @Yohahaha please take a look.

def toSparkMap(flussMap: FlussInternalMap, mapType: FlussMapType): SparkMapData = {
// TODO: support map type in fluss-spark
throw new UnsupportedOperationException()
new FlussAsSparkMap(mapType).replace(flussMap)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to call InternalRowUtils.copyMap ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. What is needed here is an implementation of SparkMapData.

@wuchong wuchong requested a review from Copilot April 8, 2026 11:24
@wuchong
Copy link
Copy Markdown
Member

wuchong commented Apr 8, 2026

cc @beryllw could you help to review this?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds Spark ↔ Fluss support for MAP types and updates tests to validate map read/write behavior (closes #2673).

Changes:

  • Implement map wrappers/converters: SparkAsFlussMap, FlussAsSparkMap, and enable getMap in row/array wrappers.
  • Extend Spark test schemas and end-to-end read/write tests to include map columns.
  • Add/unit-update tests covering map handling in rows, arrays, and the converter layer.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala Implements getMap for Spark→Fluss row wrapper
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussMap.scala New Spark→Fluss map wrapper
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala Enables map elements inside arrays (Spark→Fluss)
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala Implements getMap for Fluss→Spark row wrapper
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkMap.scala New Fluss→Spark map wrapper
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala Implements toSparkMap
fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java Exposes copyMap for reuse by Spark adapter
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala Adds a map field to the shared test schema
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala Extends row test with map assertions
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala Adds array-of-map test coverage
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala New unit/integration tests for Spark→Fluss map wrapper
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala Updates row tests to validate getMap
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala Updates array tests to validate getMap
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkMapTest.scala New unit/integration tests for Fluss→Spark map wrapper
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala Adds toSparkMap test, removes “unsupported” test
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala Adds map literal to write test and asserts map contents
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala Adds map column to nested-types read test

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@YannByron
Copy link
Copy Markdown
Contributor Author

The failed CI is caused by a flaky UT that is fixed in #3045.

def toSparkArray(flussArray: FlussInternalArray, arrayType: FlussArrayType): SparkArrayData = {
val elementType = arrayType.getElementType
new FlussAsSparkArray(elementType)
.replace(InternalRowUtils.copyArray(flussArray, elementType))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(selectedFieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < selectedFieldGetters.length; i++) {
newRow.setField(i, selectedFieldGetters[i].getFieldOrNull(internalRow));
}
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getChangeType(), newRow);
}

case ARRAY:
DataType nestedType = ((ArrayType) fieldType).getElementType();
ElementGetter nestedGetter = createDeepElementGetter(nestedType);
elementGetter =
(array, pos) -> {
InternalArray inner = array.getArray(pos);
Object[] objs = new Object[inner.size()];
for (int i = 0; i < inner.size(); i++) {
objs[i] = nestedGetter.getElementOrNull(inner, i);
}
return new GenericArray(objs);
};
break;

May be we don't need to call InternalRowUtils.copyArray?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] to support map type

5 participants