Skip to content

Commit 1df8f81

Browse files
andygroveclaude
andcommitted
fix: update dictionary unpack tests for DF52 stream polling changes
DataFusion 52 changed how FilterExec's batch coalescer works - streams now return Poll::Pending when accumulating input instead of blocking on a channel. Update test_unpack_dictionary_primitive and test_unpack_dictionary_string to poll the stream directly and send EOF on Pending, rather than using a separate mpsc channel/spawned task to feed batches. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8c76d2f commit 1df8f81

1 file changed

Lines changed: 24 additions & 43 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 24 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3539,42 +3539,33 @@ mod tests {
35393539
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();
35403540

35413541
let runtime = tokio::runtime::Runtime::new().unwrap();
3542-
let (tx, mut rx) = mpsc::channel(1);
3543-
3544-
// Separate thread to send the EOF signal once we've processed the only input batch
3545-
runtime.spawn(async move {
3546-
// Create a dictionary array with 100 values, and use it as input to the execution.
3547-
let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None);
3548-
let values = Int32Array::from(vec![0, 1, 2, 3]);
3549-
let input_array = DictionaryArray::new(keys, Arc::new(values));
3550-
let input_batch1 = InputBatch::Batch(vec![Arc::new(input_array)], row_count);
3551-
let input_batch2 = InputBatch::EOF;
3552-
3553-
let batches = vec![input_batch1, input_batch2];
3554-
3555-
for batch in batches.into_iter() {
3556-
tx.send(batch).await.unwrap();
3557-
}
3558-
});
3559-
35603542
runtime.block_on(async move {
3543+
let mut eof_sent = false;
3544+
let mut got_result = false;
35613545
loop {
3562-
let batch = rx.recv().await.unwrap();
3563-
scans[0].set_input_batch(batch);
35643546
match poll!(stream.next()) {
35653547
Poll::Ready(Some(batch)) => {
35663548
assert!(batch.is_ok(), "got error {}", batch.unwrap_err());
35673549
let batch = batch.unwrap();
35683550
assert_eq!(batch.num_rows(), row_count / 4);
35693551
// dictionary should be unpacked
35703552
assert!(matches!(batch.column(0).data_type(), DataType::Int32));
3553+
got_result = true;
35713554
}
35723555
Poll::Ready(None) => {
35733556
break;
35743557
}
3575-
_ => {}
3558+
Poll::Pending => {
3559+
// Stream needs more input (e.g. FilterExec's batch coalescer
3560+
// is accumulating). Send EOF to flush.
3561+
if !eof_sent {
3562+
scans[0].set_input_batch(InputBatch::EOF);
3563+
eof_sent = true;
3564+
}
3565+
}
35763566
}
35773567
}
3568+
assert!(got_result, "Expected at least one result batch");
35783569
});
35793570
}
35803571

@@ -3624,43 +3615,33 @@ mod tests {
36243615
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();
36253616

36263617
let runtime = tokio::runtime::Runtime::new().unwrap();
3627-
let (tx, mut rx) = mpsc::channel(1);
3628-
3629-
// Separate thread to send the EOF signal once we've processed the only input batch
3630-
runtime.spawn(async move {
3631-
// Create a dictionary array with 100 values, and use it as input to the execution.
3632-
let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None);
3633-
let values = StringArray::from(vec!["foo", "bar", "hello", "comet"]);
3634-
let input_array = DictionaryArray::new(keys, Arc::new(values));
3635-
let input_batch1 = InputBatch::Batch(vec![Arc::new(input_array)], row_count);
3636-
3637-
let input_batch2 = InputBatch::EOF;
3638-
3639-
let batches = vec![input_batch1, input_batch2];
3640-
3641-
for batch in batches.into_iter() {
3642-
tx.send(batch).await.unwrap();
3643-
}
3644-
});
3645-
36463618
runtime.block_on(async move {
3619+
let mut eof_sent = false;
3620+
let mut got_result = false;
36473621
loop {
3648-
let batch = rx.recv().await.unwrap();
3649-
scans[0].set_input_batch(batch);
36503622
match poll!(stream.next()) {
36513623
Poll::Ready(Some(batch)) => {
36523624
assert!(batch.is_ok(), "got error {}", batch.unwrap_err());
36533625
let batch = batch.unwrap();
36543626
assert_eq!(batch.num_rows(), row_count / 4);
36553627
// string/binary should no longer be packed with dictionary
36563628
assert!(matches!(batch.column(0).data_type(), DataType::Utf8));
3629+
got_result = true;
36573630
}
36583631
Poll::Ready(None) => {
36593632
break;
36603633
}
3661-
_ => {}
3634+
Poll::Pending => {
3635+
// Stream needs more input (e.g. FilterExec's batch coalescer
3636+
// is accumulating). Send EOF to flush.
3637+
if !eof_sent {
3638+
scans[0].set_input_batch(InputBatch::EOF);
3639+
eof_sent = true;
3640+
}
3641+
}
36623642
}
36633643
}
3644+
assert!(got_result, "Expected at least one result batch");
36643645
});
36653646
}
36663647

0 commit comments

Comments
 (0)