Conversation
| def toSparkMap(flussMap: FlussInternalMap, mapType: FlussMapType): SparkMapData = { | ||
| // TODO: support map type in fluss-spark | ||
| throw new UnsupportedOperationException() | ||
| new FlussAsSparkMap(mapType).replace(flussMap) |
There was a problem hiding this comment.
do we need to call InternalRowUtils.copyMap ?
There was a problem hiding this comment.
No. What is needed here is an implementation of SparkMapData.
|
cc @beryllw could you help to review this? |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds Spark ↔ Fluss support for MAP types and updates tests to validate map read/write behavior (closes #2673).
Changes:
- Implement map wrappers/converters:
SparkAsFlussMap,FlussAsSparkMap, and enablegetMapin row/array wrappers. - Extend Spark test schemas and end-to-end read/write tests to include map columns.
- Add/unit-update tests covering map handling in rows, arrays, and the converter layer.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala | Implements getMap for Spark→Fluss row wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussMap.scala | New Spark→Fluss map wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala | Enables map elements inside arrays (Spark→Fluss) |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala | Implements getMap for Fluss→Spark row wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkMap.scala | New Fluss→Spark map wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala | Implements toSparkMap |
| fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java | Exposes copyMap for reuse by Spark adapter |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala | Adds a map field to the shared test schema |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala | Extends row test with map assertions |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala | Adds array-of-map test coverage |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala | New unit/integration tests for Spark→Fluss map wrapper |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala | Updates row tests to validate getMap |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala | Updates array tests to validate getMap |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkMapTest.scala | New unit/integration tests for Fluss→Spark map wrapper |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala | Adds toSparkMap test, removes “unsupported” test |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala | Adds map literal to write test and asserts map contents |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala | Adds map column to nested-types read test |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala
Outdated
Show resolved
Hide resolved
|
The failed CI is caused by a flaky UT that is fixed in #3045. |
| def toSparkArray(flussArray: FlussInternalArray, arrayType: FlussArrayType): SparkArrayData = { | ||
| val elementType = arrayType.getElementType | ||
| new FlussAsSparkArray(elementType) | ||
| .replace(InternalRowUtils.copyArray(flussArray, elementType)) |
There was a problem hiding this comment.
fluss/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
Lines 178 to 190 in b306ed0
May be we don't need to call InternalRowUtils.copyArray?
Purpose
Linked issue: close #2673
Brief change log
Tests
API and Format
Documentation