Skip to content

Spark 4.1: [WIP] Geometry / Geography end-to-end support#16650

Draft
huan233usc wants to merge 2 commits into
apache:mainfrom
huan233usc:spark-geo-e2e
Draft

Spark 4.1: [WIP] Geometry / Geography end-to-end support#16650
huan233usc wants to merge 2 commits into
apache:mainfrom
huan233usc:spark-geo-e2e

Conversation

@huan233usc
Copy link
Copy Markdown

Important

WIP / Draft. Stacked on top of #16607 (API: Single-value binary serialization for geometry and geography). The first 1 commit of this PR is from #16607; please review only the most recent commit (Spark 4.1: WIP geometry / geography end-to-end support) here. This PR will be rebased / re-targeted once #16607 lands.

Summary

Wires up Iceberg's geometry and geography types end-to-end on Spark 4.1, so CREATE TABLE / INSERT / SELECT / DELETE work over Parquet with the in-progress geospatial column types.

The change has three layers:

  1. Parquet schema mapping — round-trip GEOMETRY / GEOGRAPHY through LogicalTypeAnnotation.geometryType / geographyType (TypeToMessageType, MessageTypeToType).
  2. Generic Parquet RWBaseParquetReaders / BaseParquetWriter surface WKB as ByteBuffer for the engine-agnostic data path.
  3. Spark 4.1 integration:
    • TypeToSparkType / SparkTypeToType bridge Iceberg ↔ Spark GeometryType / GeographyType, preserving CRS.
    • PruneColumnsWithoutReordering accepts geo column types during schema pruning.
    • SparkParquetReaders / SparkParquetWriters translate between Spark's internal GeometryVal / GeographyVal (4-byte little-endian SRID header + WKB) and the pure-WKB Parquet representation. SRID is derived per-column from the CRS via Spark's CartesianSpatialReferenceSystemMapper / GeographicSpatialReferenceSystemMapper.
    • ParquetMetrics skips lex bounds() for geo (no Comparators.forType ordering for spatial WKB) and falls back to value / null counts. Real spatial bounding-box stats (X:Y:Z:M) will be plumbed through FieldMetrics in a follow-up.

What's intentionally out of scope (follow-ups)

  • Spatial bounding-box statistics (GeospatialBound X:Y:Z:M) plumbed through FieldMetrics and used for partition / file-skipping pruning.
  • Vectorized reads — geo columns force read.parquet.vectorization.enabled = false per-table for now.
  • Topological predicate pushdown (ST_Intersects, ST_Within, ...) — not part of stock Spark 4.1.
  • Flink / Pig / Hive / Spark 3.5 wiring.
  • ORC / Avro — Parquet-only in this PR.

Test plan

TestSparkGeoTypes (Spark 4.1, 11 cases):

  • testGeometryRoundTrip — flat GEOMETRY round-trip
  • testGeographyRoundTrip — flat GEOGRAPHY round-trip
  • testSridFilterRoundtripST_Srid(geom) = N predicate validates CRS → SRID re-attachment on read
  • testDeleteWithDeletionVector — v3 + MoR DELETE produces a Puffin DV (added-dvs=1, added-position-deletes=1)
  • testNullGeometryValue — NULL geometry rows mixed with non-NULL in the same data file
  • testMultipleGeoColumnsInOneTableGEOMETRY + GEOGRAPHY side by side, per-column CRS metadata honored
  • testStructWithGeometrySTRUCT<eid, loc: GEOMETRY>
  • testArrayOfGeometryARRAY<GEOMETRY>
  • testMapOfGeometryMAP<STRING, GEOMETRY>
  • testStructOfArrayOfGeometrySTRUCT<tid, points: ARRAY<GEOMETRY>>
  • testDeleteWithDeletionVectorOnNestedGeometry — DV path on a table where geometry sits inside a struct
TestSparkGeoTypes: 11 tests, 0 failures (~9.5s locally)

./gradlew :iceberg-spark:iceberg-spark-4.1_2.13:spotlessCheck :iceberg-parquet:spotlessCheck is clean.

Xin Huang added 2 commits May 28, 2026 23:37
Iceberg v3 stores geometry and geography lower/upper bounds as binary using
the x:y:z:m encoding defined in the Bound Serialization section of the spec
(Appendix D). The encoding is already implemented in GeospatialBound, but
Conversions.toByteBuffer / fromByteBuffer have no GEOMETRY / GEOGRAPHY cases
and throw UnsupportedOperationException, blocking ManifestEvaluator and any
metric-based use of geo bounds.

Wire GeospatialBound through Conversions for both geometry and geography,
and add round-trip coverage in TestConversions for the 16-, 24-, and 32-byte
shapes (including the x:y:NaN:m case for XYM bounds), CRS variants, and
nulls.
Build on top of apache#16607 to enable end-to-end CREATE / INSERT / SELECT /
DELETE on Iceberg geometry and geography columns from Spark 4.1.

Parquet schema mapping
- TypeToMessageType: emit Iceberg GEOMETRY / GEOGRAPHY as Parquet BINARY
  with LogicalTypeAnnotation.geometryType / geographyType, propagating
  CRS and (for geography) the EdgeAlgorithm.
- MessageTypeToType: read those annotations back into Iceberg
  Types.GeometryType / Types.GeographyType.
- ParquetMetrics: skip lex min/max bounds for geometry / geography
  (Comparators.forType has no ordering for spatial WKB) and fall back to
  value / null counts only. Spatial bounding boxes (X:Y:Z:M) will be
  plumbed through FieldMetrics in a follow-up.

Generic Parquet readers / writers
- BaseParquetReaders / BaseParquetWriter: dispatch the geometry and
  geography logical type annotations to ParquetValueReaders.byteBuffers
  and ParquetValueWriters.byteBuffers, surfacing WKB as ByteBuffer for
  the engine-agnostic data path.

Spark 4.1 type bridge
- TypeToSparkType / SparkTypeToType: bidirectional mapping between
  Iceberg GEOMETRY / GEOGRAPHY and Spark's GeometryType / GeographyType,
  preserving the CRS string. Geography defaults to the SPHERICAL edge
  algorithm on the Iceberg side.
- PruneColumnsWithoutReordering: register GEOMETRY and GEOGRAPHY in the
  type-id table so column pruning does not reject geo columns.

Spark 4.1 Parquet readers / writers
- SparkParquetReaders: new GeometryReader / GeographyReader that read
  pure WKB from Parquet and prepend the 4-byte little-endian SRID header
  expected by Spark's internal GeometryVal / GeographyVal. The SRID is
  derived from the column's CRS via Spark's CartesianSpatialReference-
  SystemMapper / GeographicSpatialReferenceSystemMapper.
- SparkParquetWriters: new GeometryWriter / GeographyWriter that strip
  Spark's SRID header and write only the WKB body to Parquet, matching
  the Iceberg / Parquet on-disk representation.

Tests
- TestSparkGeoTypes (Spark 4.1): 11 end-to-end SQL tests covering
  - flat geometry / geography round-trip,
  - ST_Srid predicate (validates SRID re-attachment from CRS),
  - DELETE on a v3 merge-on-read table producing a Puffin DV,
  - NULL geometry mixed with non-NULL,
  - mixed geometry + geography in the same table,
  - STRUCT<..., loc: GEOMETRY>, ARRAY<GEOMETRY>, MAP<STRING, GEOMETRY>,
    STRUCT<..., points: ARRAY<GEOMETRY>>,
  - DELETE + DV on a table whose geometry sits inside a struct.
  Vectorized reads are disabled per-table because the vectorized geo
  path is not yet implemented.

Topological predicates such as ST_Intersects are not part of stock
Spark 4.1, so predicate coverage is limited to ST_Srid for now.
@huan233usc
Copy link
Copy Markdown
Author

cc @szehon-ho

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant