Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,18 @@ object CometConf extends ShimCometConf {
"Should not be larger than batch size `spark.comet.batchSize`")
.createWithDefault(8192)

val COMET_SHUFFLE_PARTITIONER_MODE: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.partitionerMode")
.category(CATEGORY_SHUFFLE)
.doc(
"The partitioner mode used by the native shuffle writer. " +
"'immediate' writes partitioned IPC blocks immediately as batches arrive, " +
"reducing memory usage. 'buffered' buffers all rows before writing, which may " +
"improve performance for small datasets but uses more memory.")
.stringConf
.checkValues(Set("immediate", "buffered"))
.createWithDefault("buffered")

val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
.category(CATEGORY_SHUFFLE)
Expand Down
98 changes: 70 additions & 28 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
│ (hash/range partitioning) │ │ (single partition case) │
└───────────────────────────────────┘ └───────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────┐
│ Partitioner Selection │
│ Controlled by spark.comet.exec.shuffle.partitionerMode │
├───────────────────────────┬───────────────────────────────────────────┤
│ immediate │ buffered (default) │
│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │
│ (hash/range/round-robin) │ (hash/range/round-robin) │
│ Writes IPC blocks as │ Buffers all rows in memory │
│ batches arrive │ before writing │
├───────────────────────────┴───────────────────────────────────────────┤
│ SinglePartitionShufflePartitioner (single partition case) │
└───────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────┐
Expand Down Expand Up @@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Rust Side

| File | Location | Description |
| ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ |
| `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. |
| `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
| `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |
| File | Location | Description |
| ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. |
| `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. |
| `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. |
| `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
| `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |

## Data Flow

Expand All @@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner
based on the `partitionerMode` configuration:
- **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning.
As each batch arrives, rows are scattered into per-partition Arrow array builders. When a
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC block to an in-memory buffer.

The current IPC writer uses block compression (compressing each batch), which may lead to poor compression ratios. In gluten, the shuffle writer first serializes and buffers the batches, then performs streming compression during eviction, achieving better compression ratios. I'm not entirely sure which is better.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've been experimenting with switching to IPC stream approach. I have a draft PR open for this, but it is not fully working yet.

partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC
block to an in-memory buffer. Under memory pressure, these buffers are spilled to
per-partition temporary files. This keeps memory usage much lower than buffered mode since
data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays.

4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
exceeds the threshold, partitions spill to temporary files.
- **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin
partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes
them in a single pass. When memory pressure exceeds the threshold, partitions spill to
temporary files.

5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
- `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless
of partitioner mode).

4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
- Writes compression type header
- Writes field count header
- Writes compressed IPC stream

6. **Output files**: Two files are produced:
5. **Output files**: Two files are produced:
- **Data file**: Concatenated partition data
- **Index file**: Array of 8-byte little-endian offsets marking partition boundaries

7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
lengths and commits via Spark's `IndexShuffleBlockResolver`.

### Read Path
Expand Down Expand Up @@ -201,10 +221,31 @@ sizes.

## Memory Management

Native shuffle uses DataFusion's memory management with spilling support:
Native shuffle uses DataFusion's memory management. The memory characteristics differ
between the two partitioner modes:

### Immediate Mode

Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives,
rather than buffering all input rows before writing:

- **Per-partition builders**: Each partition has a set of Arrow array builders sized to the
target batch size. When a builder fills up, it is flushed as a compressed IPC block to an
in-memory buffer.
- **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus
the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC
encoding is more compact than raw Arrow arrays.
- **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait,
partition builders are flushed and all IPC buffers are drained to per-partition temporary
files on disk.

### Buffered Mode

Buffered mode holds all input data in memory before writing:

- **Memory pool**: Tracks memory usage across the shuffle operation.
- **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk.
- **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`.
- **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to
temporary files on disk.
- **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a
partition are concatenated when writing the final output.
- **Scratch space**: Reusable buffers for partition ID computation to reduce allocations.
Expand Down Expand Up @@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads.

## Configuration

| Config | Default | Description |
| ------------------------------------------------- | ------- | ---------------------------------------- |
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |
| Config | Default | Description |
| ------------------------------------------------- | ----------- | ------------------------------------------- |
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
| `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` |
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |

## Comparison with JVM Shuffle

Expand Down
14 changes: 14 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ Comet provides a fully native shuffle implementation, which generally provides t
supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type
partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays.

Native shuffle has two partitioner modes, configured via
`spark.comet.exec.shuffle.partitionerMode`:

- **`buffered`** (default): Buffers all input batches in memory, then uses `interleave` to produce
partitioned output one partition at a time. Only one partition's output batch is in memory at
a time during the write phase, so this mode scales well to large numbers of partitions (1000+).
The trade-off is that it must hold all input data in memory (or spill it) before writing begins.

- **`immediate`**: Partitions incoming batches immediately using per-partition Arrow array builders,
flushing compressed IPC blocks when they reach the target batch size. This avoids buffering the
entire input in memory. However, because it maintains builders for all partitions simultaneously
(proportional to `num_partitions × batch_size × num_columns`), memory overhead grows with
partition count. For workloads with many partitions (1000+), `buffered` mode is recommended.

#### Columnar (JVM) Shuffle

Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@ impl PhysicalPlanner {
writer.output_index_file.clone(),
writer.tracing_enabled,
write_buffer_size,
writer.immediate_mode,
)?);

Ok((
Expand Down
4 changes: 4 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ message ShuffleWriter {
// Size of the write buffer in bytes used when writing shuffle data to disk.
// Larger values may improve write performance but use more memory.
int32 write_buffer_size = 8;
// Whether to use immediate mode partitioner. When true, partitioned IPC blocks
// are written immediately as batches arrive. When false, rows are buffered
// before writing (the original behavior).
bool immediate_mode = 9;
}

message ParquetWriter {
Expand Down
6 changes: 3 additions & 3 deletions native/shuffle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ performance outside of Spark. It streams input data directly from Parquet files.
cargo run --release --features shuffle-bench --bin shuffle_bench -- \
--input /data/tpch-sf100/lineitem/ \
--partitions 200 \
--codec lz4 \
--codec zstd --zstd-level 1 \
--hash-columns 0,3
```

Expand All @@ -47,7 +47,7 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \
| `--partitions` | `200` | Number of output shuffle partitions |
| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` |
| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) |
| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` |
| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` |
| `--zstd-level` | `1` | Zstd compression level (1–22) |
| `--batch-size` | `8192` | Batch size for reading Parquet data |
| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded |
Expand All @@ -62,5 +62,5 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \
```sh
cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \
--input /data/tpch-sf100/lineitem/ \
--partitions 200 --codec lz4
--partitions 200 --codec zstd --zstd-level 1
```
1 change: 1 addition & 0 deletions native/shuffle/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec(
"/tmp/index.out".to_string(),
false,
1024 * 1024,
false, // immediate_mode
)
.unwrap()
}
Expand Down
17 changes: 14 additions & 3 deletions native/shuffle/src/bin/shuffle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
//! cargo run --release --bin shuffle_bench -- \
//! --input /data/tpch-sf100/lineitem/ \
//! --partitions 200 \
//! --codec lz4 \
//! --codec zstd --zstd-level 1 \
//! --hash-columns 0,3
//! ```
//!
//! Profile with flamegraph:
//! ```sh
//! cargo flamegraph --release --bin shuffle_bench -- \
//! --input /data/tpch-sf100/lineitem/ \
//! --partitions 200 --codec lz4
//! --partitions 200 --codec zstd --zstd-level 1
//! ```

use arrow::datatypes::{DataType, SchemaRef};
Expand Down Expand Up @@ -79,7 +79,7 @@ struct Args {
hash_columns: String,

/// Compression codec: none, lz4, zstd, snappy
#[arg(long, default_value = "lz4")]
#[arg(long, default_value = "zstd")]
codec: String,

/// Zstd compression level (1-22)
Expand Down Expand Up @@ -114,6 +114,11 @@ struct Args {
/// Each task reads the same input and writes to its own output files.
#[arg(long, default_value_t = 1)]
concurrent_tasks: usize,

/// Shuffle mode: 'immediate' writes IPC blocks per batch as they arrive,
/// 'buffered' buffers all rows before writing (original behavior).
#[arg(long, default_value = "immediate")]
mode: String,
}

fn main() {
Expand Down Expand Up @@ -141,6 +146,7 @@ fn main() {
println!("Partitioning: {}", args.partitioning);
println!("Partitions: {}", args.partitions);
println!("Codec: {:?}", codec);
println!("Mode: {}", args.mode);
println!("Hash columns: {:?}", hash_col_indices);
if let Some(mem_limit) = args.memory_limit {
println!("Memory limit: {}", format_bytes(mem_limit));
Expand Down Expand Up @@ -413,6 +419,7 @@ fn run_shuffle_write(
args.limit,
data_file.to_string(),
index_file.to_string(),
args.mode == "immediate",
)
.await
.unwrap();
Expand All @@ -436,6 +443,7 @@ async fn execute_shuffle_write(
limit: usize,
data_file: String,
index_file: String,
immediate_mode: bool,
) -> datafusion::common::Result<(MetricsSet, MetricsSet)> {
let config = SessionConfig::new().with_batch_size(batch_size);
let mut runtime_builder = RuntimeEnvBuilder::new();
Expand Down Expand Up @@ -477,6 +485,7 @@ async fn execute_shuffle_write(
index_file,
false,
write_buffer_size,
immediate_mode,
)
.expect("Failed to create ShuffleWriterExec");

Expand Down Expand Up @@ -541,6 +550,7 @@ fn run_concurrent_shuffle_writes(
let memory_limit = args.memory_limit;
let write_buffer_size = args.write_buffer_size;
let limit = args.limit;
let immediate_mode = args.mode == "immediate";

handles.push(tokio::spawn(async move {
execute_shuffle_write(
Expand All @@ -553,6 +563,7 @@ fn run_concurrent_shuffle_writes(
limit,
data_file,
index_file,
immediate_mode,
)
.await
.unwrap()
Expand Down
Loading
Loading