Skip to content

feat: Add batch spill limit to improve shuffle writer throughput [experimental]#3941

Draft
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:optimize-buffered-shuffle
Draft

feat: Add batch spill limit to improve shuffle writer throughput [experimental]#3941
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:optimize-buffered-shuffle

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

The buffered shuffle writer (MultiPartitionShuffleRepartitioner) accumulates all input batches in memory before writing partitioned output during shuffle_write. When given ample memory (or no memory limit), it buffers everything — which paradoxically degrades throughput because interleave_record_batch has poor cache locality when working over a huge buffer of batches.

Benchmarks show throughput drops as more memory is available:

Memory Limit Throughput Peak RSS
512MB 2.33 M/s 2.2 GiB
1GB 2.33 M/s 2.8 GiB
4GB 2.22 M/s 7.3 GiB
unlimited 1.40 M/s 22.6 GiB

What changes are included in this PR?

A new config spark.comet.exec.shuffle.batchSpillLimit (default: 100) that triggers spilling after a fixed number of buffered input batches, regardless of memory availability. This keeps the working set bounded and maintains good cache locality during the interleave phase.

The change is minimal — a single condition added to buffer_partitioned_batch_may_spill:

let over_batch_limit = self.batch_spill_limit > 0
    && self.buffered_batches.len() >= self.batch_spill_limit;
if over_batch_limit || self.reservation.try_grow(mem_growth).is_err() {
    self.spill()?;
}

Setting batchSpillLimit=0 disables the threshold, preserving the existing behavior.

Benchmark results (200 partitions, 100M rows, unlimited memory):

Config Throughput Peak RSS Spills
No limit (current) 1.38 M/s 22.6 GiB 0
batchSpillLimit=100 2.43 M/s 1.7 GiB 123

+76% throughput, -93% memory usage. No regression when memory is already constrained.

How are these changes tested?

  • All 19 existing shuffle tests pass (batch_spill_limit=0 in tests preserves existing behavior)
  • Benchmarked with TPC-H SF100 lineitem data across multiple configurations

… writer

The buffered shuffle writer accumulates all input batches in memory
before writing partitioned output. With unlimited or large memory pools,
this leads to poor cache locality during the final interleave phase,
causing throughput to drop as more memory is available.

Add a configurable batch_spill_limit that triggers spilling after a
fixed number of buffered batches, regardless of memory availability.
This keeps the working set bounded and maintains good cache locality.

Config: spark.comet.exec.shuffle.batchSpillLimit (default: 100, 0=disabled)

Benchmark (200 partitions, 100M rows, unlimited memory):
- Without limit: 1.38 M rows/s, 22.6 GiB peak RSS
- With limit=100: 2.43 M rows/s, 1.7 GiB peak RSS (+76% throughput, -93% memory)
@andygrove andygrove marked this pull request as ready for review April 14, 2026 00:11
@andygrove andygrove marked this pull request as draft April 14, 2026 00:15
The batch spill limit is configured via `spark.comet.exec.shuffle.batchSpillLimit` (default: 100). Setting it
to 0 disables this threshold, meaning spills only occur under memory pressure.

In most cases, the default value of 100 provides good performance. If you observe that shuffle throughput
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update this section once I have run benchmarks with different values

@andygrove andygrove changed the title feat: Add batch spill limit to improve shuffle writer throughput feat: Add batch spill limit to improve shuffle writer throughput [experimental] Apr 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant