Parquet: Keep FileSystem reachable during writes when Hadoop FS cache is disabled#16641
Parquet: Keep FileSystem reachable during writes when Hadoop FS cache is disabled#16641wombatu-kun wants to merge 1 commit into
Conversation
… is disabled Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
This isn't needed
Disabling abfs caching is a performance killer as you lose the thread pool, the http connection pool and the http prefetch buffer pool. Unless you have some fundamental reason, fix that. ...but it's not needed in iceberg main branch once spark 3.5 is cut; it's a very transient workaround for the situation "caching disabled". Once the spark-3.5 branch goes this PR just becomes superfluous and should really be rolled back to keep the codebase leaner. Can't you just enable caching and speed all your work up at the same time? or, if there is some abfs issue which requires caching to be turned off, why not discuss it here or creating a HADOOP- jira on fs/azure. Though as usual: test against 3.4.3/3.5.0 before reporting. |
|
@steveloughran I opened this only to help the reporter of #16640, who hit it on the current Spark 3.5 runtime (Spark bundles Hadoop 3.3.x) with the abfs cache disabled - not as a feature I'm attached to. If you don't think it's worth carrying as a transitional workaround until the spark-3.5 tree is dropped, feel free to close it - no objection from me. |
|
I'll ask why they can't turn caching on; this is such a transient issue. Always frustrating to see something fixed years ago still not being picked up ... we don't want to taint other projects just because of a bug which was fixed within a week of being discovered in 2023. |
Problem
When the Hadoop FileSystem cache is disabled (for example
fs.abfs.impl.disable.cache=true), Parquet writes through Iceberg can fail mid-write. The reporter hit this on Spark writing to Azure ADLS Gen2 overabfs://with a Hadoop catalog: the job fails withCould not submit task to executor ... ThreadPoolExecutor [Terminated], and the debug log showsAzureBlobFileSystem.finalize()running while the file is still being written.Root cause
ParquetWriter#ensureWriterInitializedbuildsnew ParquetFileWriter(ParquetIO.file(output, conf), ...)and does not keep the ParquetOutputFileit passes in. For aHadoopOutputFile,ParquetIO.file(...)returns a Parquet-nativeorg.apache.parquet.hadoop.util.HadoopOutputFilethat resolves its ownFileSystemthroughpath.getFileSystem(conf); with the cache disabled this is a fresh instance. BecauseParquetFileWriterretains only the output stream and not theOutputFile, nothing keeps thatFileSystemreachable once the writer has been constructed, so it can be garbage-collected while the write is still in progress.On Azure this is fatal.
AbfsOutputStreamreferences the store's bounded thread pool (through aSemaphoredDelegatingExecutor) and theAbfsClient, but never theAzureBlobFileSystemwrapper. When the wrapper becomes unreachable,AzureBlobFileSystem.finalize()callsclose(), which shuts downAzureBlobFileSystemStore'sboundedThreadPool; the still-open stream's next asynchronous flush then submits to that terminated pool and fails, which is exactly the reported log sequence.The read path is unaffected because Parquet's
ParquetFileReaderretains itsInputFile, and therefore theFileSystem, for the reader's lifetime. This change restores the same symmetry on the write side.Change
Keep the Parquet
OutputFileonParquetWriterso itsFileSystemstays reachable for the writer's lifetime, including throughclose()and the footer flush. This keeps alive the exactFileSysteminstance that backs the write until the write finishes.Tests
Added
TestParquetWriterFileSystemReachability. It writes through aParquetWriterbacked by aHadoopOutputFilewith the Hadoop FileSystem cache disabled, using a local FileSystem whose output stream does not hold a back-reference to the FileSystem (mirroringAbfsOutputStream). It asserts that every FileSystem resolved for the write stays reachable while the writer is open and becomes collectible after the writer is dropped. The test fails without the production change and passes with it.Closes #16640