Spark, Parquet: honor Hadoop vectored IO read config#16614
Spark, Parquet: honor Hadoop vectored IO read config#16614venkateshwaracholan wants to merge 7 commits into
Conversation
f81d217 to
1fed7bd
Compare
1fed7bd to
b4f4091
Compare
| @@ -125,6 +148,10 @@ protected NameMapping nameMapping() { | |||
| return nameMapping; | |||
| } | |||
There was a problem hiding this comment.
Thanks for working on this. I was looking at whether the setting reaches all Parquet read paths and noticed one path that may still be uncovered.
BaseRowReader and BaseBatchReader now call setAll(parquetReadProperties()) for Parquet data files, but SparkDeleteFilter still creates BaseDeleteLoader/CachingDeleteLoader with only loadInputFile. BaseDeleteLoader.openDeletes then builds the reader without any read properties:
FormatModelRegistry.readBuilder(format, Record.class, inputFile)
For S3FileIO-backed equality/position delete files, the InputFile is not a HadoopInputFile, so Parquet.buildReadOptions may not see the Spark Hadoop configuration and may fall back to the default vectored IO setting. That would mean spark.hadoop.parquet.hadoop.vectored.io.enabled=false may still be ignored while applying Parquet delete files.
Could you check whether delete-file reads need the same propagated parquetReadProperties? If so, a small regression test with a Parquet position/equality delete file would make this easier to keep covered.
There was a problem hiding this comment.
Small note on the anchor: GitHub only allowed me to attach this to the added parquetReadProperties() getter. The path I mean is a bit lower in this file: SparkDeleteFilter.newDeleteLoader() creates BaseDeleteLoader/CachingDeleteLoader without passing these properties through.
There was a problem hiding this comment.
Thanks for catching this. The delete-file path wasn't getting parquetReadProperties().
I've propagated the same Parquet read properties through BaseDeleteLoader, including the cached delete loader path, and added regression tests covering Parquet position deletes with vectored I/O disabled.
Please take another look when you have a chance.
f97f994 to
93ed1e6
Compare
alexandrefimov
left a comment
There was a problem hiding this comment.
I had one small source-compatibility question below.
| this(loadInputFile, ThreadPools.getDeleteWorkerPool()); | ||
| } | ||
|
|
||
| public BaseDeleteLoader( |
There was a problem hiding this comment.
Small source-compatibility question: adding this overload makes existing source calls like new BaseDeleteLoader(loadInputFile, null) ambiguous with the existing (Function, ExecutorService) overload. This PR had to update the JMH callers for that reason.
Since BaseDeleteLoader is public in iceberg-data, would it be better to avoid this two-arg overload, for example by keeping only the three-arg variant for callers that need read properties? That would keep the read-properties propagation while avoiding a source-compat break for existing two-arg callers.
There was a problem hiding this comment.
Good point. I removed the (Function<DeleteFile, InputFile>, Map<String, String>) overload and updated the new call sites to use the existing worker-pool constructor plus read properties. This keeps the read-properties propagation while avoiding ambiguity for existing callers using (Function<DeleteFile, InputFile>, ExecutorService) with null.
alexandrefimov
left a comment
There was a problem hiding this comment.
Thanks, this addresses my concern. Removing the two-arg read-properties overload avoids the source ambiguity while still allowing the delete-file read path to receive the propagated Parquet read properties.
I also re-ran the focused checks for the updated paths:
:iceberg-data:test --tests org.apache.iceberg.data.TestBaseDeleteLoader
:iceberg-parquet:test --tests org.apache.iceberg.parquet.TestParquet
:iceberg-spark:iceberg-spark-4.1_2.13:compileTestJava
:iceberg-spark:iceberg-spark-4.1_2.13:test --tests org.apache.iceberg.spark.source.TestSparkReaderDeletes.testParquetDeletesWithVectoredIoDisabled
Fixes #16600
Summary
Iceberg currently hardcodes Hadoop vectored I/O on when building Parquet read options by calling
withUseHadoopVectoredIo(true). This prevents users from disabling vectored I/O with Parquet’s existingparquet.hadoop.vectored.io.enabled=falsesetting.This change:
parquet.hadoop.vectored.io.enabledthrough Spark read planning so executor-side Parquet readers can honor it.Why
For
S3FileIO, Spark executor-side reads use Iceberg input files such asS3InputFile, notHadoopInputFile. That means Hadoop configuration is not automatically available when Parquet read options are built on executors.By carrying
parquet.hadoop.vectored.io.enabledfrom Spark read options, Spark Hadoop configuration, or table properties into the Parquet reader, users can disable vectored I/O for environments where it causes excessive on-heap allocation.Testing
./gradlew :iceberg-parquet:test --tests org.apache.iceberg.parquet.TestParquet./gradlew :iceberg-spark:iceberg-spark-4.1_2.13:test --tests org.apache.iceberg.spark.TestSparkReadConf./gradlew :iceberg-spark:iceberg-spark-4.1_2.13:compileTestJava./gradlew spotlessCheck