Fix duplicate filtering path in Arrow task batches#3448
Fix duplicate filtering path in Arrow task batches#3448GayathriSrividya wants to merge 4 commits into
Conversation
Replace the two weak tests that passed on the unfixed code with proper regression coverage: 1. test_task_to_record_batches_does_not_use_table_filter_without_positional_deletes Replaces pyarrow.Table with a sentinel class whose from_batches raises AssertionError. A custom metaclass preserves isinstance checks so the rest of the code-path is unaffected. With the fix the sentinel is never reached; without the fix it fires immediately, detecting the old pa.Table.from_batches / filter / combine_chunks / to_batches[0] path that caused the SIGSEGV on Apple Silicon. 2. test_task_to_record_batches_filter_applied_after_positional_deletes Uses data [1,2,3,4,5] with positional deletes on positions 1 and 3 (removing values 2 and 4), then applies filter id > 2. Expected result [3, 5] would be wrong if either deletes or the subsequent filter were skipped.
Fokko
left a comment
There was a problem hiding this comment.
This is a great catch @GayathriSrividya This logic must have been messed up somewhere throughout refactoring.
I left one comment to avoid having two blocks for positional_deletes, let me know what you think. This will reduce the number of lines quite a bit.
- Combine the two positional-delete handling blocks into one: apply
take(indices) and the row filter together inside the single
'if positional_deletes' block, removing the redundant mid-loop
empty-batch check (filtering an empty batch is free).
- Replace the sentinel-based regression test (which passed even when
the fix was reverted) with a behavioral test that picks a scenario
where the old bug produces a distinct wrong answer:
data=[1,2,3,4], pos_delete=2 (value 3), filter=id>2
correct: [4] old bug (scanner pre-filters): [3,4]
The assertion on [4] will fail against any regression.
Addresses review feedback from @ebyhr and @Fokko.
Fokko
left a comment
There was a problem hiding this comment.
Amazing, thanks @GayathriSrividya 🙌
| indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) | ||
| current_batch = current_batch.take(indices) | ||
| if pyarrow_filter is not None: | ||
| current_batch = current_batch.filter(pyarrow_filter) |
There was a problem hiding this comment.
This change replaces the Table-based filtering workaround.
PyArrow 17-20 raise IndexError when RecordBatch.filter(Expression) produces zero rows. PyArrow 21 fixed this behavior, but we still declares support for pyarrow>=17.0.0.
On PyArrow 17-20 this fails with IndexError: list index out of range
We should either keep that workaround for supported PyArrow versions or raise the minimum supported version to pyarrow>=21.0.0.
A regression test should cover the positional-delete path where the post-delete row filter produces an empty result, for example:
def test_task_to_record_batches_filter_after_positional_deletes_empty_result(tmpdir: str) -> None:
from pyiceberg.expressions.visitors import bind
arrow_schema = pa.schema(
(
pa.field(
"id",
pa.int32(),
nullable=True,
metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"},
),
)
)
arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema)
data_file = _write_table_to_data_file(
f"{tmpdir}/test_filter_after_positional_deletes_empty_result.parquet",
arrow_schema,
arrow_table,
)
table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))
positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])]
result_batches = list(
_task_to_record_batches(
PyArrowFileIO(),
FileScanTask(data_file),
bound_row_filter=bind(table_schema, GreaterThan("id", 10), case_sensitive=True),
projected_schema=table_schema,
table_schema=table_schema,
projected_field_ids={1},
positional_deletes=positional_deletes,
case_sensitive=True,
)
)
assert result_batches == []PyArrow < 21 raises IndexError when RecordBatch.filter(Expression) produces zero rows. Wrap the call in try/except and fall back to an empty slice, which is already handled by the num_rows == 0 guard below. Add regression test covering the positional-delete path where the post-delete filter eliminates all remaining rows. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| if pyarrow_filter is not None: | ||
| try: | ||
| current_batch = current_batch.filter(pyarrow_filter) | ||
| except IndexError: | ||
| # PyArrow < 21 raises IndexError when filter produces zero rows | ||
| # (fixed in https://github.com/apache/arrow/pull/46057) | ||
| current_batch = current_batch.slice(0, 0) |
There was a problem hiding this comment.
How about?
| if pyarrow_filter is not None: | |
| try: | |
| current_batch = current_batch.filter(pyarrow_filter) | |
| except IndexError: | |
| # PyArrow < 21 raises IndexError when filter produces zero rows | |
| # (fixed in https://github.com/apache/arrow/pull/46057) | |
| current_batch = current_batch.slice(0, 0) | |
| if pyarrow_filter is not None and len(current_batch) > 0: | |
| current_batch = current_batch.filter(pyarrow_filter) |
Closes #3272
What this changes
This PR updates the Arrow scan path in
_task_to_record_batchesto avoid redundant filtering when there are no positional deletes.Scanner.from_fragmentas the only filter path whenpositional_deletesis absent.current_batch.filter(pyarrow_filter)only in the positional-delete path, after deletes are applied.Why
The previous flow could perform an extra table-level refilter even when the scanner already applied the predicate. This change removes that stale workaround path while keeping correct behavior for positional delete scenarios.
Tests
Added regression coverage in
tests/io/test_pyarrow.py:test_task_to_record_batches_filter_without_positional_deletes_avoids_table_refiltertest_task_to_record_batches_filter_with_positional_deletes_handles_empty_batchValidated locally:
python -m pytest tests/io/test_pyarrow.py -q -k "task_to_record_batches_nanos or filter_without_positional_deletes_avoids_table_refilter or filter_with_positional_deletes_handles_empty_batch"make lint