Skip to content
30 changes: 20 additions & 10 deletions native/shuffle/src/partitioners/partitioned_batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,25 @@ impl Iterator for PartitionedBatchIterator<'_> {

let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
let indices = &self.indices[self.pos..indices_end];
match interleave_record_batch(&self.record_batches, indices) {
Ok(batch) => {
self.pos = indices_end;
Some(Ok(batch))
}
Err(e) => Some(Err(DataFusionError::ArrowError(
Box::from(e),
Some(DataFusionError::get_back_trace()),
))),
}

// record_batches is guaranteed non-empty when indices is non-empty
// (indices reference rows within the buffered batches)
let schema = self.record_batches[0].schema();

let result = if !schema.fields.is_empty() {
interleave_record_batch(&self.record_batches, indices)
} else {
// For zero-column batches (e.g. COUNT queries), we can't use
// interleave_record_batch because Arrow requires either at least one
// column or an explicit row count. Create the batch directly.
let options =
arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len()));
RecordBatch::try_new_with_options(schema, vec![], &options)
};

self.pos = indices_end;
Some(result.map_err(|e| {
DataFusionError::ArrowError(Box::from(e), Some(DataFusionError::get_back_trace()))
}))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -474,4 +474,24 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper
}
}
}

test("native datafusion scan - repartition count") {
withTempPath { dir =>
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
spark
.range(1000)
.selectExpr("id", "concat('name_', id) as name")
.repartition(100)
.write
.parquet(dir.toString)
}
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") {
Comment on lines +488 to +490
Copy link
Copy Markdown
Member

@andygrove andygrove Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue specific to this combination of scan and shuffle?

interleave_record_batch is used in other parts of the shuffle codebase so those may also need updating?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like native_datafusion is used here just to easily force native shuffle.

I am confused by the comment For zero-column batches (e.g. COUNT queries) when the test isn't using a count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce the crash with both native_datafusion and native_iceberg_compat in combination with native shuffle. the sample query for repro and test case is

spark.read.parquet("hdfs://location").repartition(50).count()

perhaps test can be slightly improved, if it confuses

val count = spark.read.parquet(dir.toString).repartition(10).count()
checkSparkAnswerAndOperator(spark.read.parquet(dir.toString).repartition(10))
assert(count == 1000)
}
}
}
}
Loading