diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..3c07ff6c2e 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -523,6 +523,18 @@ object CometConf extends ShimCometConf { "Should not be larger than batch size `spark.comet.batchSize`") .createWithDefault(8192) + val COMET_SHUFFLE_PARTITIONER_MODE: ConfigEntry[String] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.partitionerMode") + .category(CATEGORY_SHUFFLE) + .doc( + "The partitioner mode used by the native shuffle writer. " + + "'immediate' writes partitioned IPC blocks immediately as batches arrive, " + + "reducing memory usage. 'buffered' buffers all rows before writing, which may " + + "improve performance for small datasets but uses more memory.") + .stringConf + .checkValues(Set("immediate", "buffered")) + .createWithDefault("buffered") + val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize") .category(CATEGORY_SHUFFLE) diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md index 18e80a90c8..bd76537a34 100644 --- a/docs/source/contributor-guide/native_shuffle.md +++ b/docs/source/contributor-guide/native_shuffle.md @@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ -┌───────────────────────────────────┐ ┌───────────────────────────────────┐ -│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │ -│ (hash/range partitioning) │ │ (single partition case) │ -└───────────────────────────────────┘ └───────────────────────────────────┘ +┌───────────────────────────────────────────────────────────────────────┐ +│ Partitioner Selection │ +│ Controlled by spark.comet.exec.shuffle.partitionerMode │ +├───────────────────────────┬───────────────────────────────────────────┤ +│ immediate │ buffered (default) │ +│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │ +│ (hash/range/round-robin) │ (hash/range/round-robin) │ +│ Writes IPC blocks as │ Buffers all rows in memory │ +│ batches arrive │ before writing │ +├───────────────────────────┴───────────────────────────────────────────┤ +│ SinglePartitionShufflePartitioner (single partition case) │ +└───────────────────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────┐ @@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition ### Rust Side -| File | Location | Description | -| ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ | -| `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. | -| `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | -| `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | +| File | Location | Description | +| ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | +| `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. | +| `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. | +| `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. | +| `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | +| `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | ## Data Flow @@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition 2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust. -3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner: - - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning - - `SinglePartitionShufflePartitioner`: For single partition (simpler path) +3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner + based on the `partitionerMode` configuration: + - **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning. + As each batch arrives, rows are scattered into per-partition Arrow array builders. When a + partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC + block to an in-memory buffer. Under memory pressure, these buffers are spilled to + per-partition temporary files. This keeps memory usage much lower than buffered mode since + data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays. -4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure - exceeds the threshold, partitions spill to temporary files. + - **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin + partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes + them in a single pass. When memory pressure exceeds the threshold, partitions spill to + temporary files. -5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: + - `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless + of partitioner mode). + +4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: - Writes compression type header - Writes field count header - Writes compressed IPC stream -6. **Output files**: Two files are produced: +5. **Output files**: Two files are produced: - **Data file**: Concatenated partition data - **Index file**: Array of 8-byte little-endian offsets marking partition boundaries -7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition +6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition lengths and commits via Spark's `IndexShuffleBlockResolver`. ### Read Path @@ -201,10 +221,31 @@ sizes. ## Memory Management -Native shuffle uses DataFusion's memory management with spilling support: +Native shuffle uses DataFusion's memory management. The memory characteristics differ +between the two partitioner modes: + +### Immediate Mode + +Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives, +rather than buffering all input rows before writing: + +- **Per-partition builders**: Each partition has a set of Arrow array builders sized to the + target batch size. When a builder fills up, it is flushed as a compressed IPC block to an + in-memory buffer. +- **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus + the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC + encoding is more compact than raw Arrow arrays. +- **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait, + partition builders are flushed and all IPC buffers are drained to per-partition temporary + files on disk. + +### Buffered Mode + +Buffered mode holds all input data in memory before writing: -- **Memory pool**: Tracks memory usage across the shuffle operation. -- **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk. +- **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`. +- **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to + temporary files on disk. - **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output. - **Scratch space**: Reusable buffers for partition ID computation to reduce allocations. @@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads. ## Configuration -| Config | Default | Description | -| ------------------------------------------------- | ------- | ---------------------------------------- | -| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | -| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | -| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | -| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | -| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | -| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | +| Config | Default | Description | +| ------------------------------------------------- | ----------- | ------------------------------------------- | +| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | +| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | +| `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` | +| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | +| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | +| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | +| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | ## Comparison with JVM Shuffle diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..48ca4aa250 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -144,6 +144,20 @@ Comet provides a fully native shuffle implementation, which generally provides t supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays. +Native shuffle has two partitioner modes, configured via +`spark.comet.exec.shuffle.partitionerMode`: + +- **`buffered`** (default): Buffers all input batches in memory, then uses `interleave` to produce + partitioned output one partition at a time. Only one partition's output batch is in memory at + a time during the write phase, so this mode scales well to large numbers of partitions (1000+). + The trade-off is that it must hold all input data in memory (or spill it) before writing begins. + +- **`immediate`**: Partitions incoming batches immediately using per-partition Arrow array builders, + flushing compressed IPC blocks when they reach the target batch size. This avoids buffering the + entire input in memory. However, because it maintains builders for all partitions simultaneously + (proportional to `num_partitions × batch_size × num_columns`), memory overhead grows with + partition count. For workloads with many partitions (1000+), `buffered` mode is recommended. + #### Columnar (JVM) Shuffle Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ac35925ace..6206dd1b6b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1379,6 +1379,7 @@ impl PhysicalPlanner { writer.output_index_file.clone(), writer.tracing_enabled, write_buffer_size, + writer.immediate_mode, )?); Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..0402401825 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -294,6 +294,10 @@ message ShuffleWriter { // Size of the write buffer in bytes used when writing shuffle data to disk. // Larger values may improve write performance but use more memory. int32 write_buffer_size = 8; + // Whether to use immediate mode partitioner. When true, partitioned IPC blocks + // are written immediately as batches arrive. When false, rows are buffered + // before writing (the original behavior). + bool immediate_mode = 9; } message ParquetWriter { diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 0f53604fa3..4ba5daebda 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -35,7 +35,7 @@ performance outside of Spark. It streams input data directly from Parquet files. cargo run --release --features shuffle-bench --bin shuffle_bench -- \ --input /data/tpch-sf100/lineitem/ \ --partitions 200 \ - --codec lz4 \ + --codec zstd --zstd-level 1 \ --hash-columns 0,3 ``` @@ -47,7 +47,7 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \ | `--partitions` | `200` | Number of output shuffle partitions | | `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | | `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | -| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | | `--zstd-level` | `1` | Zstd compression level (1–22) | | `--batch-size` | `8192` | Batch size for reading Parquet data | | `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | @@ -62,5 +62,5 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \ ```sh cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \ --input /data/tpch-sf100/lineitem/ \ - --partitions 200 --codec lz4 + --partitions 200 --codec zstd --zstd-level 1 ``` diff --git a/native/shuffle/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs index 27abd919fa..873e872adf 100644 --- a/native/shuffle/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec( "/tmp/index.out".to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap() } diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index bb8c2a0380..17999b482a 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -24,7 +24,7 @@ //! cargo run --release --bin shuffle_bench -- \ //! --input /data/tpch-sf100/lineitem/ \ //! --partitions 200 \ -//! --codec lz4 \ +//! --codec zstd --zstd-level 1 \ //! --hash-columns 0,3 //! ``` //! @@ -32,7 +32,7 @@ //! ```sh //! cargo flamegraph --release --bin shuffle_bench -- \ //! --input /data/tpch-sf100/lineitem/ \ -//! --partitions 200 --codec lz4 +//! --partitions 200 --codec zstd --zstd-level 1 //! ``` use arrow::datatypes::{DataType, SchemaRef}; @@ -79,7 +79,7 @@ struct Args { hash_columns: String, /// Compression codec: none, lz4, zstd, snappy - #[arg(long, default_value = "lz4")] + #[arg(long, default_value = "zstd")] codec: String, /// Zstd compression level (1-22) @@ -114,6 +114,11 @@ struct Args { /// Each task reads the same input and writes to its own output files. #[arg(long, default_value_t = 1)] concurrent_tasks: usize, + + /// Shuffle mode: 'immediate' writes IPC blocks per batch as they arrive, + /// 'buffered' buffers all rows before writing (original behavior). + #[arg(long, default_value = "immediate")] + mode: String, } fn main() { @@ -141,6 +146,7 @@ fn main() { println!("Partitioning: {}", args.partitioning); println!("Partitions: {}", args.partitions); println!("Codec: {:?}", codec); + println!("Mode: {}", args.mode); println!("Hash columns: {:?}", hash_col_indices); if let Some(mem_limit) = args.memory_limit { println!("Memory limit: {}", format_bytes(mem_limit)); @@ -413,6 +419,7 @@ fn run_shuffle_write( args.limit, data_file.to_string(), index_file.to_string(), + args.mode == "immediate", ) .await .unwrap(); @@ -436,6 +443,7 @@ async fn execute_shuffle_write( limit: usize, data_file: String, index_file: String, + immediate_mode: bool, ) -> datafusion::common::Result<(MetricsSet, MetricsSet)> { let config = SessionConfig::new().with_batch_size(batch_size); let mut runtime_builder = RuntimeEnvBuilder::new(); @@ -477,6 +485,7 @@ async fn execute_shuffle_write( index_file, false, write_buffer_size, + immediate_mode, ) .expect("Failed to create ShuffleWriterExec"); @@ -541,6 +550,7 @@ fn run_concurrent_shuffle_writes( let memory_limit = args.memory_limit; let write_buffer_size = args.write_buffer_size; let limit = args.limit; + let immediate_mode = args.mode == "immediate"; handles.push(tokio::spawn(async move { execute_shuffle_write( @@ -553,6 +563,7 @@ fn run_concurrent_shuffle_writes( limit, data_file, index_file, + immediate_mode, ) .await .unwrap() diff --git a/native/shuffle/src/partitioners/immediate_mode.rs b/native/shuffle/src/partitioners/immediate_mode.rs new file mode 100644 index 0000000000..a6070dffc3 --- /dev/null +++ b/native/shuffle/src/partitioners/immediate_mode.rs @@ -0,0 +1,1079 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; +use crate::partitioners::ShufflePartitioner; +use crate::{CometPartitioning, CompressionCodec}; +use arrow::array::builder::{ + make_builder, ArrayBuilder, BinaryBuilder, BinaryViewBuilder, BooleanBuilder, + LargeBinaryBuilder, LargeStringBuilder, NullBuilder, PrimitiveBuilder, StringBuilder, + StringViewBuilder, +}; +use arrow::array::{ + Array, ArrayRef, AsArray, BinaryViewArray, RecordBatch, StringViewArray, UInt32Array, +}; +use arrow::compute::take; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, +}; +use arrow::ipc::writer::StreamWriter; +use datafusion::common::{DataFusionError, Result}; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryLimit, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +macro_rules! scatter_byte_array { + ($builder:expr, $source:expr, $indices:expr, $offset_type:ty, $builder_type:ty, $cast:ident) => {{ + let src = $source.$cast::<$offset_type>(); + let dst = $builder + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +macro_rules! scatter_byte_view { + ($builder:expr, $source:expr, $indices:expr, $array_type:ty, $builder_type:ty) => {{ + let src = $source + .as_any() + .downcast_ref::<$array_type>() + .expect("array type mismatch"); + let dst = $builder + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +macro_rules! scatter_primitive { + ($builder:expr, $source:expr, $indices:expr, $arrow_type:ty) => {{ + let src = $source.as_primitive::<$arrow_type>(); + let dst = $builder + .as_any_mut() + .downcast_mut::>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +/// Scatter-append selected rows from `source` into `builder`. +fn scatter_append( + builder: &mut dyn ArrayBuilder, + source: &dyn Array, + indices: &[usize], +) -> Result<()> { + use DataType::*; + match source.data_type() { + Boolean => { + let src = source.as_boolean(); + let dst = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + if src.null_count() == 0 { + for &idx in indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + } + Int8 => scatter_primitive!(builder, source, indices, Int8Type), + Int16 => scatter_primitive!(builder, source, indices, Int16Type), + Int32 => scatter_primitive!(builder, source, indices, Int32Type), + Int64 => scatter_primitive!(builder, source, indices, Int64Type), + UInt8 => scatter_primitive!(builder, source, indices, UInt8Type), + UInt16 => scatter_primitive!(builder, source, indices, UInt16Type), + UInt32 => scatter_primitive!(builder, source, indices, UInt32Type), + UInt64 => scatter_primitive!(builder, source, indices, UInt64Type), + Float32 => scatter_primitive!(builder, source, indices, Float32Type), + Float64 => scatter_primitive!(builder, source, indices, Float64Type), + Date32 => scatter_primitive!(builder, source, indices, Date32Type), + Date64 => scatter_primitive!(builder, source, indices, Date64Type), + Timestamp(TimeUnit::Second, _) => { + scatter_primitive!(builder, source, indices, TimestampSecondType) + } + Timestamp(TimeUnit::Millisecond, _) => { + scatter_primitive!(builder, source, indices, TimestampMillisecondType) + } + Timestamp(TimeUnit::Microsecond, _) => { + scatter_primitive!(builder, source, indices, TimestampMicrosecondType) + } + Timestamp(TimeUnit::Nanosecond, _) => { + scatter_primitive!(builder, source, indices, TimestampNanosecondType) + } + Decimal128(_, _) => scatter_primitive!(builder, source, indices, Decimal128Type), + Decimal256(_, _) => scatter_primitive!(builder, source, indices, Decimal256Type), + Utf8 => scatter_byte_array!(builder, source, indices, i32, StringBuilder, as_string), + LargeUtf8 => { + scatter_byte_array!(builder, source, indices, i64, LargeStringBuilder, as_string) + } + Binary => scatter_byte_array!(builder, source, indices, i32, BinaryBuilder, as_binary), + LargeBinary => { + scatter_byte_array!(builder, source, indices, i64, LargeBinaryBuilder, as_binary) + } + Utf8View => { + scatter_byte_view!(builder, source, indices, StringViewArray, StringViewBuilder) + } + BinaryView => { + scatter_byte_view!(builder, source, indices, BinaryViewArray, BinaryViewBuilder) + } + Null => { + let dst = builder.as_any_mut().downcast_mut::().unwrap(); + dst.append_nulls(indices.len()); + } + dt => { + return Err(DataFusionError::NotImplemented(format!( + "Scatter append not implemented for {dt}" + ))); + } + } + Ok(()) +} + +/// Per-column strategy: scatter-write via builder for primitive/string types, +/// or accumulate taken sub-arrays for complex types (List, Map, Struct, etc.). +enum ColumnBuffer { + /// Fast path: direct scatter into a pre-allocated builder. + Builder(Box), + /// Fallback for complex types: accumulate `take`-produced sub-arrays, + /// concatenate at flush time. + Accumulator(Vec), +} + +/// Returns true if `scatter_append` can handle this data type directly. +fn has_scatter_support(dt: &DataType) -> bool { + use DataType::*; + matches!( + dt, + Boolean + | Int8 + | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float32 + | Float64 + | Date32 + | Date64 + | Timestamp(_, _) + | Decimal128(_, _) + | Decimal256(_, _) + | Utf8 + | LargeUtf8 + | Binary + | LargeBinary + | Utf8View + | BinaryView + | Null + ) +} + +struct PartitionBuffer { + columns: Vec, + schema: SchemaRef, + num_rows: usize, + target_batch_size: usize, +} + +impl PartitionBuffer { + fn new(schema: &SchemaRef, target_batch_size: usize) -> Self { + let columns = schema + .fields() + .iter() + .map(|f| { + if has_scatter_support(f.data_type()) { + ColumnBuffer::Builder(make_builder(f.data_type(), target_batch_size)) + } else { + ColumnBuffer::Accumulator(Vec::new()) + } + }) + .collect(); + Self { + columns, + schema: Arc::clone(schema), + num_rows: 0, + target_batch_size, + } + } + + fn is_full(&self) -> bool { + self.num_rows >= self.target_batch_size + } + + /// Finish all columns into a RecordBatch. Builders are reset (retaining + /// capacity); accumulators are concatenated and cleared. + fn flush(&mut self) -> Result { + let arrays: Vec = self + .columns + .iter_mut() + .map(|col| match col { + ColumnBuffer::Builder(b) => b.finish(), + ColumnBuffer::Accumulator(chunks) => { + let refs: Vec<&dyn Array> = chunks.iter().map(|a| a.as_ref()).collect(); + let result = arrow::compute::concat(&refs) + .expect("concat failed for accumulated arrays"); + chunks.clear(); + result + } + }) + .collect(); + let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + self.num_rows = 0; + Ok(batch) + } + + fn has_data(&self) -> bool { + self.num_rows > 0 + } +} + +pub(crate) struct PartitionOutputStream { + schema: SchemaRef, + codec: CompressionCodec, + buffer: Vec, +} + +impl PartitionOutputStream { + pub(crate) fn try_new(schema: SchemaRef, codec: CompressionCodec) -> Result { + Ok(Self { + schema, + codec, + buffer: Vec::new(), + }) + } + + fn write_ipc_block(&mut self, batch: &RecordBatch) -> Result { + let start_pos = self.buffer.len(); + + self.buffer.extend_from_slice(&0u64.to_le_bytes()); + let field_count = self.schema.fields().len(); + self.buffer + .extend_from_slice(&(field_count as u64).to_le_bytes()); + let codec_tag: &[u8; 4] = match &self.codec { + CompressionCodec::Snappy => b"SNAP", + CompressionCodec::Lz4Frame => b"LZ4_", + CompressionCodec::Zstd(_) => b"ZSTD", + CompressionCodec::None => b"NONE", + }; + self.buffer.extend_from_slice(codec_tag); + + match &self.codec { + CompressionCodec::None => { + let mut w = StreamWriter::try_new(&mut self.buffer, &batch.schema())?; + w.write(batch)?; + w.finish()?; + w.into_inner()?; + } + CompressionCodec::Lz4Frame => { + let mut wtr = lz4_flex::frame::FrameEncoder::new(&mut self.buffer); + let mut w = StreamWriter::try_new(&mut wtr, &batch.schema())?; + w.write(batch)?; + w.finish()?; + wtr.finish().map_err(|e| { + DataFusionError::Execution(format!("lz4 compression error: {e}")) + })?; + } + CompressionCodec::Zstd(level) => { + let enc = zstd::Encoder::new(&mut self.buffer, *level)?; + let mut w = StreamWriter::try_new(enc, &batch.schema())?; + w.write(batch)?; + w.finish()?; + w.into_inner()?.finish()?; + } + CompressionCodec::Snappy => { + let mut wtr = snap::write::FrameEncoder::new(&mut self.buffer); + let mut w = StreamWriter::try_new(&mut wtr, &batch.schema())?; + w.write(batch)?; + w.finish()?; + wtr.into_inner().map_err(|e| { + DataFusionError::Execution(format!("snappy compression error: {e}")) + })?; + } + } + + let end_pos = self.buffer.len(); + let ipc_length = (end_pos - start_pos - 8) as u64; + if ipc_length > i32::MAX as u64 { + return Err(DataFusionError::Execution(format!( + "Shuffle block size {ipc_length} exceeds maximum size of {}", + i32::MAX + ))); + } + self.buffer[start_pos..start_pos + 8].copy_from_slice(&ipc_length.to_le_bytes()); + + Ok(end_pos - start_pos) + } + + fn drain_buffer(&mut self) -> Vec { + std::mem::take(&mut self.buffer) + } + + #[cfg(test)] + fn finish(self) -> Result> { + Ok(self.buffer) + } +} + +struct SpillFile { + _temp_file: datafusion::execution::disk_manager::RefCountedTempFile, + file: File, +} + +/// A partitioner that scatter-writes incoming rows directly into pre-allocated +/// per-partition column builders. When a partition's builders reach +/// `target_batch_size`, the batch is flushed to a compressed IPC block. +/// No intermediate sub-batches or coalescers are created. +pub(crate) struct ImmediateModePartitioner { + output_data_file: String, + output_index_file: String, + partition_buffers: Vec, + streams: Vec, + spill_files: Vec>, + partitioning: CometPartitioning, + runtime: Arc, + reservation: MemoryReservation, + metrics: ShufflePartitionerMetrics, + hashes_buf: Vec, + partition_ids: Vec, + /// Reusable per-partition row index scratch space. + partition_row_indices: Vec>, + /// Maximum bytes this partitioner will reserve from the memory pool. + /// Computed as memory_pool_size * memory_fraction at construction. + memory_limit: usize, +} + +impl ImmediateModePartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + ) -> Result { + let num_output_partitions = partitioning.partition_count(); + + let partition_buffers = (0..num_output_partitions) + .map(|_| PartitionBuffer::new(&schema, batch_size)) + .collect(); + + let streams = (0..num_output_partitions) + .map(|_| PartitionOutputStream::try_new(Arc::clone(&schema), codec.clone())) + .collect::>>()?; + + let spill_files: Vec> = + (0..num_output_partitions).map(|_| None).collect(); + + let hashes_buf = match &partitioning { + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0u32; batch_size] + } + _ => vec![], + }; + + let memory_limit = match runtime.memory_pool.memory_limit() { + MemoryLimit::Finite(pool_size) => pool_size, + _ => usize::MAX, + }; + + let reservation = MemoryConsumer::new(format!("ImmediateModePartitioner[{partition}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); + + let partition_row_indices = (0..num_output_partitions).map(|_| Vec::new()).collect(); + + Ok(Self { + output_data_file, + output_index_file, + partition_buffers, + streams, + spill_files, + partitioning, + runtime, + reservation, + metrics, + hashes_buf, + partition_ids: vec![0u32; batch_size], + partition_row_indices, + memory_limit, + }) + } + + fn compute_partition_ids(&mut self, batch: &RecordBatch) -> Result { + let num_rows = batch.num_rows(); + + // Ensure scratch buffers are large enough for this batch + if self.hashes_buf.len() < num_rows { + self.hashes_buf.resize(num_rows, 0); + } + if self.partition_ids.len() < num_rows { + self.partition_ids.resize(num_rows, 0); + } + + match &self.partitioning { + CometPartitioning::Hash(exprs, num_output_partitions) => { + let num_output_partitions = *num_output_partitions; + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(batch)?.into_array(num_rows)) + .collect::>>()?; + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&arrays, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + assign_hash_partition_ids(hashes_buf, partition_ids, num_output_partitions); + Ok(num_output_partitions) + } + CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { + let num_output_partitions = *num_output_partitions; + let max_hash_columns = *max_hash_columns; + let num_columns_to_hash = if max_hash_columns == 0 { + batch.num_columns() + } else { + max_hash_columns.min(batch.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(batch.column(i))) + .collect(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + assign_hash_partition_ids(hashes_buf, partition_ids, num_output_partitions); + Ok(num_output_partitions) + } + CometPartitioning::RangePartitioning( + lex_ordering, + num_output_partitions, + row_converter, + bounds, + ) => { + let num_output_partitions = *num_output_partitions; + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(batch)?.into_array(num_rows)) + .collect::>>()?; + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut self.partition_ids[..num_rows]; + assign_range_partition_ids(&row_batch, partition_ids, bounds); + Ok(num_output_partitions) + } + other => Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))), + } + } + + /// Scatter-write rows from batch into per-partition builders, flushing + /// any partition that reaches target_batch_size. Returns + /// `(flushed_builder_bytes, ipc_bytes_written)`. + /// + /// Uses column-first iteration so each column's type dispatch happens once + /// per batch (num_columns times) rather than once per partition per column + /// (num_columns × num_partitions times). + fn repartition_batch(&mut self, batch: &RecordBatch) -> Result<(usize, usize)> { + let num_partitions = self.partition_buffers.len(); + let num_rows = batch.num_rows(); + + // Build per-partition row indices, reusing scratch vecs + for indices in self.partition_row_indices.iter_mut() { + indices.clear(); + } + for row_idx in 0..num_rows { + let pid = self.partition_ids[row_idx] as usize; + self.partition_row_indices[pid].push(row_idx); + } + + // Column-first scatter: resolve each column's type once, then + // scatter across all partitions with the same typed path. + for col_idx in 0..batch.num_columns() { + let source = batch.column(col_idx); + for pid in 0..num_partitions { + let indices = &self.partition_row_indices[pid]; + if indices.is_empty() { + continue; + } + match &mut self.partition_buffers[pid].columns[col_idx] { + ColumnBuffer::Builder(builder) => { + scatter_append(builder.as_mut(), source.as_ref(), indices)?; + } + ColumnBuffer::Accumulator(chunks) => { + let idx_array = + UInt32Array::from_iter_values(indices.iter().map(|&i| i as u32)); + let taken = take(source.as_ref(), &idx_array, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + chunks.push(taken); + } + } + } + } + + // Update row counts and flush full partitions + let mut flushed_builder_bytes = 0usize; + let mut ipc_bytes = 0usize; + for pid in 0..num_partitions { + let added = self.partition_row_indices[pid].len(); + if added == 0 { + continue; + } + self.partition_buffers[pid].num_rows += added; + if self.partition_buffers[pid].is_full() { + let (builder_bytes, written) = self.flush_partition(pid)?; + flushed_builder_bytes += builder_bytes; + ipc_bytes += written; + } + } + + Ok((flushed_builder_bytes, ipc_bytes)) + } + + /// Flush a partition's builders to an IPC block in its output stream. + /// Returns `(flushed_batch_memory, ipc_bytes_written)`. + fn flush_partition(&mut self, pid: usize) -> Result<(usize, usize)> { + let output_batch = self.partition_buffers[pid].flush()?; + let batch_mem = output_batch.get_array_memory_size(); + let mut encode_timer = self.metrics.encode_time.timer(); + let ipc_bytes = self.streams[pid].write_ipc_block(&output_batch)?; + encode_timer.stop(); + Ok((batch_mem, ipc_bytes)) + } + + /// Spill all partition IPC buffers to per-partition temp files. + fn spill_all(&mut self) -> Result<()> { + let mut spilled_bytes = 0usize; + + // Flush any partially-filled partition builders + for pid in 0..self.partition_buffers.len() { + if self.partition_buffers[pid].has_data() { + self.flush_partition(pid)?; + } + } + + // Drain IPC buffers to disk + for pid in 0..self.streams.len() { + let buf = self.streams[pid].drain_buffer(); + if buf.is_empty() { + continue; + } + + if self.spill_files[pid].is_none() { + let temp_file = self + .runtime + .disk_manager + .create_tmp_file(&format!("imm_shuffle_p{pid}"))?; + let path = temp_file.path().to_owned(); + let file = OpenOptions::new().append(true).open(&path).map_err(|e| { + DataFusionError::Execution(format!("Failed to open spill file: {e}")) + })?; + self.spill_files[pid] = Some(SpillFile { + _temp_file: temp_file, + file, + }); + } + + if let Some(spill) = &mut self.spill_files[pid] { + spill.file.write_all(&buf).map_err(|e| { + DataFusionError::Execution(format!("Failed to write spill: {e}")) + })?; + spilled_bytes += buf.len(); + } + } + + for spill in self.spill_files.iter_mut().flatten() { + spill.file.flush()?; + } + + self.reservation.free(); + if spilled_bytes > 0 { + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(spilled_bytes); + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for ImmediateModePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let start_time = Instant::now(); + + let batch_mem = batch.get_array_memory_size(); + self.metrics.data_size.add(batch_mem); + self.metrics.baseline.record_output(batch.num_rows()); + + let repart_start = Instant::now(); + self.compute_partition_ids(&batch)?; + self.metrics + .repart_time + .add_duration(repart_start.elapsed()); + + let (flushed_builder_bytes, ipc_growth) = self.repartition_batch(&batch)?; + let builder_growth = batch_mem; + + // Net memory change: data entered builders, some was flushed to IPC + let net_growth = (builder_growth + ipc_growth).saturating_sub(flushed_builder_bytes); + + if net_growth > 0 { + // Use our own memory limit rather than relying solely on the pool, + // since the pool doesn't see builder allocations directly. + if self.reservation.size() + net_growth > self.memory_limit + || self.reservation.try_grow(net_growth).is_err() + { + self.spill_all()?; + } + } + + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } + + fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); + let num_output_partitions = self.streams.len(); + let mut offsets = vec![0i64; num_output_partitions + 1]; + + let mut output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + #[allow(clippy::needless_range_loop)] + for pid in 0..num_output_partitions { + offsets[pid] = output_data.stream_position()? as i64; + + if let Some(spill) = &self.spill_files[pid] { + let path = spill._temp_file.path().to_owned(); + let spill_reader = File::open(&path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to open spill file for reading: {e}" + )) + })?; + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut &spill_reader, &mut output_data)?; + write_timer.stop(); + } + + if self.partition_buffers[pid].has_data() { + self.flush_partition(pid)?; + } + + let buf = self.streams[pid].drain_buffer(); + if !buf.is_empty() { + let mut write_timer = self.metrics.write_time.timer(); + output_data.write_all(&buf)?; + write_timer.stop(); + } + } + + for spill in self.spill_files.iter_mut() { + *spill = None; + } + + offsets[num_output_partitions] = output_data.stream_position()? as i64; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(&self.output_index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + for offset in &offsets { + output_index.write_all(&offset.to_le_bytes())?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::read_ipc_compressed; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::execution::memory_pool::GreedyMemoryPool; + use datafusion::execution::runtime_env::RuntimeEnvBuilder; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + + fn make_test_batch(values: &[i32]) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let array = Int32Array::from(values.to_vec()); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + + #[test] + fn test_scatter_append_primitives() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])); + let mut builder = make_builder(&DataType::Int32, 8); + scatter_append(builder.as_mut(), array.as_ref(), &[0, 2, 4]).unwrap(); + let result = builder.finish(); + let result = result.as_primitive::(); + assert_eq!(result.values().as_ref(), &[10, 30, 50]); + } + + #[test] + fn test_scatter_append_strings() { + let array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let mut builder = make_builder(&DataType::Utf8, 4); + scatter_append(builder.as_mut(), array.as_ref(), &[1, 3]).unwrap(); + let result = builder.finish(); + let result = result.as_string::(); + assert_eq!(result.value(0), "b"); + assert_eq!(result.value(1), "d"); + } + + #[test] + fn test_scatter_append_nulls() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); + let mut builder = make_builder(&DataType::Int32, 4); + scatter_append(builder.as_mut(), array.as_ref(), &[0, 1, 2]).unwrap(); + let result = builder.finish(); + let result = result.as_primitive::(); + assert!(result.is_valid(0)); + assert!(result.is_null(1)); + assert!(result.is_valid(2)); + } + + #[test] + fn test_partition_buffer_flush_reuse() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = make_test_batch(&[1, 2, 3, 4, 5]); + + let mut buf = PartitionBuffer::new(&schema, 3); + match &mut buf.columns[0] { + ColumnBuffer::Builder(b) => { + scatter_append(b.as_mut(), batch.column(0).as_ref(), &[0, 1, 2]).unwrap() + } + _ => panic!("expected Builder"), + } + buf.num_rows += 3; + assert!(buf.is_full()); + + let flushed = buf.flush().unwrap(); + assert_eq!(flushed.num_rows(), 3); + assert_eq!(buf.num_rows, 0); + + // Builders are reused after flush + match &mut buf.columns[0] { + ColumnBuffer::Builder(b) => { + scatter_append(b.as_mut(), batch.column(0).as_ref(), &[3, 4]).unwrap() + } + _ => panic!("expected Builder"), + } + buf.num_rows += 2; + assert_eq!(buf.num_rows, 2); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_partition_output_stream_write_and_read() { + let batch = make_test_batch(&[1, 2, 3, 4, 5]); + let schema = batch.schema(); + + for codec in [ + CompressionCodec::None, + CompressionCodec::Lz4Frame, + CompressionCodec::Zstd(1), + CompressionCodec::Snappy, + ] { + let mut stream = PartitionOutputStream::try_new(Arc::clone(&schema), codec).unwrap(); + stream.write_ipc_block(&batch).unwrap(); + + let buf = stream.finish().unwrap(); + assert!(!buf.is_empty()); + + let ipc_length = u64::from_le_bytes(buf[0..8].try_into().unwrap()) as usize; + assert!(ipc_length > 0); + + let block_end = 8 + ipc_length; + let ipc_data = &buf[16..block_end]; + let batch2 = read_ipc_compressed(ipc_data).unwrap(); + assert_eq!(batch2.num_rows(), 5); + } + } + + fn make_hash_partitioning(col_name: &str, num_partitions: usize) -> CometPartitioning { + use datafusion::physical_expr::expressions::Column; + let expr: Arc = + Arc::new(Column::new(col_name, 0)); + CometPartitioning::Hash(vec![expr], num_partitions) + } + + #[tokio::test] + async fn test_immediate_mode_partitioner_hash() { + let batch = make_test_batch(&[1, 2, 3, 4, 5, 6, 7, 8]); + let schema = batch.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path, + index_path, + schema, + make_hash_partitioning("a", 4), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + partitioner.insert_batch(batch).await.unwrap(); + + let total_rows: usize = partitioner + .partition_buffers + .iter() + .map(|b| b.num_rows) + .sum(); + assert_eq!(total_rows, 8); + } + + #[tokio::test] + async fn test_immediate_mode_shuffle_write() { + let batch1 = make_test_batch(&[1, 2, 3, 4, 5, 6]); + let batch2 = make_test_batch(&[7, 8, 9, 10, 11, 12]); + let schema = batch1.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 3; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + schema, + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + partitioner.insert_batch(batch1).await.unwrap(); + partitioner.insert_batch(batch2).await.unwrap(); + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + assert_eq!(index_data.len(), (num_partitions + 1) * 8); + + let first_offset = i64::from_le_bytes(index_data[0..8].try_into().unwrap()); + assert_eq!(first_offset, 0); + + let data_file_size = std::fs::metadata(&data_path).unwrap().len(); + let last_offset = i64::from_le_bytes( + index_data[num_partitions * 8..(num_partitions + 1) * 8] + .try_into() + .unwrap(), + ); + assert_eq!(last_offset as u64, data_file_size); + assert!(data_file_size > 0); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] // spill uses std::io::copy which triggers copy_file_range + async fn test_immediate_mode_spill() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 2; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(GreedyMemoryPool::new(256))) + .build() + .unwrap(), + ); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + Arc::clone(&schema), + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + for i in 0..10 { + let values: Vec = ((i * 10)..((i + 1) * 10)).collect(); + let batch = make_test_batch(&values); + partitioner.insert_batch(batch).await.unwrap(); + } + + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + assert_eq!(index_data.len(), (num_partitions + 1) * 8); + + let data_file_size = std::fs::metadata(&data_path).unwrap().len(); + let last_offset = i64::from_le_bytes( + index_data[num_partitions * 8..(num_partitions + 1) * 8] + .try_into() + .unwrap(), + ); + assert_eq!(last_offset as u64, data_file_size); + assert!(data_file_size > 0); + } + + #[tokio::test] + async fn test_block_format_compatible_with_read_ipc_compressed() { + let batch = make_test_batch(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let schema = batch.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 2; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + // Small target to trigger flush during insert + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + Arc::clone(&schema), + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 4, + CompressionCodec::Lz4Frame, + ) + .unwrap(); + + partitioner.insert_batch(batch).await.unwrap(); + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + let mut offsets = Vec::new(); + for i in 0..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + offsets.push(offset as usize); + } + + let data = std::fs::read(&data_path).unwrap(); + let mut total_rows = 0; + for pid in 0..num_partitions { + let (start, end) = (offsets[pid], offsets[pid + 1]); + if start == end { + continue; + } + let mut pos = start; + while pos < end { + let payload_len = + u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize; + assert!(payload_len > 0); + let block_end = pos + 8 + payload_len; + let ipc_data = &data[pos + 16..block_end]; + let decoded = read_ipc_compressed(ipc_data).unwrap(); + assert_eq!(decoded.num_columns(), 1); + assert!(decoded.num_rows() > 0); + let col = decoded + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..col.len() { + assert!((1..=10).contains(&col.value(i))); + } + total_rows += decoded.num_rows(); + pos = block_end; + } + assert_eq!(pos, end); + } + assert_eq!(total_rows, 10); + } +} diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index a0bc652b4b..a6fee92414 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -16,12 +16,15 @@ // under the License. mod empty_schema; +mod immediate_mode; mod multi_partition; +mod partition_id; mod partitioned_batch_iterator; mod single_partition; mod traits; pub(crate) use empty_schema::EmptySchemaShufflePartitioner; +pub(crate) use immediate_mode::ImmediateModePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7de9314f54..a87eb0fa26 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -16,12 +16,13 @@ // under the License. use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; @@ -237,15 +238,13 @@ impl MultiPartitionShuffleRepartitioner { // Generate partition ids for every row. { - // Hash arrays and compute partition ids based on number of partitions. let partition_ids = &mut scratch.partition_ids[..num_rows]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); + create_murmur3_hashes(&arrays, hashes_buf)?; + assign_hash_partition_ids( + hashes_buf, + partition_ids, + *num_output_partitions, + ); } // We now have partition ids for every input row, map that to partition starts @@ -292,13 +291,7 @@ impl MultiPartitionShuffleRepartitioner { { let row_batch = row_converter.convert_columns(arrays.as_slice())?; let partition_ids = &mut scratch.partition_ids[..num_rows]; - - row_batch.iter().enumerate().for_each(|(row_idx, row)| { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32 - }); + assign_range_partition_ids(&row_batch, partition_ids, bounds); } // We now have partition ids for every input row, map that to partition starts @@ -356,10 +349,7 @@ impl MultiPartitionShuffleRepartitioner { // Assign partition IDs based on hash (same as hash partitioning) let partition_ids = &mut scratch.partition_ids[..num_rows]; - hashes_buf.iter().enumerate().for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); + assign_hash_partition_ids(hashes_buf, partition_ids, *num_output_partitions); // We now have partition ids for every input row, map that to partition starts // and partition indices to eventually write these rows to partition buffers. diff --git a/native/shuffle/src/partitioners/partition_id.rs b/native/shuffle/src/partitioners/partition_id.rs new file mode 100644 index 0000000000..a574a75daa --- /dev/null +++ b/native/shuffle/src/partitioners/partition_id.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared partition ID computation used by both immediate and buffered shuffle modes. + +use crate::comet_partitioning; +use arrow::row::Rows; + +/// Assign partition IDs from pre-computed hash values using Spark-compatible pmod. +pub(crate) fn assign_hash_partition_ids( + hashes: &[u32], + partition_ids: &mut [u32], + num_partitions: usize, +) { + for (idx, hash) in hashes.iter().enumerate() { + partition_ids[idx] = comet_partitioning::pmod(*hash, num_partitions) as u32; + } +} + +/// Assign partition IDs using binary search on range boundaries. +pub(crate) fn assign_range_partition_ids( + rows: &Rows, + partition_ids: &mut [u32], + bounds: &[arrow::row::OwnedRow], +) { + for (row_idx, row) in rows.iter().enumerate() { + partition_ids[row_idx] = bounds.as_ref().partition_point(|bound| bound.row() <= row) as u32; + } +} diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 4ac4fc287b..5f8c8a6aab 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -19,8 +19,8 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ - EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, - SinglePartitionShufflePartitioner, + EmptySchemaShufflePartitioner, ImmediateModePartitioner, MultiPartitionShuffleRepartitioner, + ShufflePartitioner, SinglePartitionShufflePartitioner, }; use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; @@ -68,6 +68,8 @@ pub struct ShuffleWriterExec { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// When true, use ImmediateModePartitioner instead of MultiPartitionShuffleRepartitioner + immediate_mode: bool, } impl ShuffleWriterExec { @@ -81,6 +83,7 @@ impl ShuffleWriterExec { output_index_file: String, tracing_enabled: bool, write_buffer_size: usize, + immediate_mode: bool, ) -> Result { let cache = Arc::new(PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -99,6 +102,7 @@ impl ShuffleWriterExec { codec, tracing_enabled, write_buffer_size, + immediate_mode, }) } } @@ -159,6 +163,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_index_file.clone(), self.tracing_enabled, self.write_buffer_size, + self.immediate_mode, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -186,6 +191,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.codec.clone(), self.tracing_enabled, self.write_buffer_size, + self.immediate_mode, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -206,6 +212,7 @@ async fn external_shuffle( codec: CompressionCodec, tracing_enabled: bool, write_buffer_size: usize, + immediate_mode: bool, ) -> Result { with_trace_async("external_shuffle", tracing_enabled, || async { let schema = input.schema(); @@ -233,6 +240,17 @@ async fn external_shuffle( write_buffer_size, )?) } + _ if immediate_mode => Box::new(ImmediateModePartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, @@ -473,6 +491,7 @@ mod test { "/tmp/index.out".to_string(), false, 1024 * 1024, // write_buffer_size: 1MB default + false, // immediate_mode ) .unwrap(); @@ -532,6 +551,7 @@ mod test { index_file.clone(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); @@ -736,6 +756,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); @@ -824,6 +845,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..96c140300b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -192,6 +192,8 @@ class CometNativeShuffleWriter[K, V]( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) shuffleWriterBuilder.setWriteBufferSize( CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) + shuffleWriterBuilder.setImmediateMode( + CometConf.COMET_SHUFFLE_PARTITIONER_MODE.get() == "immediate") outputPartitioning match { case p if isSinglePartitioning(p) =>