feat: Add native sort merge join operator [experimental]#3871
Draft
andygrove wants to merge 12 commits intoapache:mainfrom
Draft
feat: Add native sort merge join operator [experimental]#3871andygrove wants to merge 12 commits intoapache:mainfrom
andygrove wants to merge 12 commits intoapache:mainfrom
Conversation
Add OutputBuilder that accumulates matched/null-joined index pairs during the sort merge join's Joining state and materializes them into Arrow RecordBatches. Includes BufferedMatchGroup stub with spill support and join filter evaluation module.
…anti Handle JoinSide::None variant in build_filter_candidate_batch to fix compilation with DataFusion 52.4.0 which includes a None variant in the JoinSide enum.
Implement the streaming sort merge join state machine that drives two sorted input streams, compares join keys, and produces joined output batches. The state machine handles all join types (inner, left/right outer, full outer, semi, anti) with key-reuse optimization via RowConverter, multi-batch match group collection, null key handling, optional join filter evaluation, and memory-aware spilling.
Simplify Init state logic to avoid identical if-branches and use Arc::clone instead of .clone() on ref-counted pointers.
Adds spark.comet.exec.sortMergeJoin.useNative config (default true) to switch between Comet's native sort merge join and DataFusion's SortMergeJoinExec. Passes spark config through JNI to the native planner for runtime selection.
Fix two bugs discovered by the tests: - Output builder indices referenced stale streamed batch after it was cleared; defer batch clearing until after flush - Advancing buffered side discarded entire batch instead of advancing past just the first row; use slice to preserve remaining rows
Add 7 new test cases covering left/right/full outer joins, left semi/anti joins, null key handling in outer joins, and an inner join under memory pressure that forces spilling. Fix two bugs found by the new tests: - Full/right outer join now correctly emits unmatched buffered rows that remain after the streamed side is exhausted (new DrainBuffered state in the join stream state machine). - Spill read-back no longer panics inside an async runtime by using block_in_place to allow the nested block_on call.
- Fix N×M spill read issue: cache loaded batches across all columns instead of reloading per column. Extracted group_by_batch helpers. - Remove unused streamed_schema field from OutputBuilder - Encapsulate BufferedBatch.matched behind mark_matched/unmatched_indices - Remove duplicate take_buffered_matched/take_buffered_null_joins with shared take_from_groups helper
RowConverter.convert_columns() was called on the full streamed batch for every row during key-reuse checks. Now converted once when a new streamed batch arrives and cached as streamed_rows. Key-reuse and cache_streamed_key index directly into the cached Rows.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
N/A - New experimental feature
Rationale for this change
This PR introduces a Comet-owned sort merge join operator (
CometSortMergeJoinExec) that replaces DataFusion'sSortMergeJoinExec. The motivations are:A configuration toggle (
spark.comet.exec.sortMergeJoin.useNative, defaulttrue) allows switching between the new and DataFusion implementations for A/B benchmarking.This is experimental. The implementation passes all existing tests but has known limitations listed below.
What changes are included in this PR?
New files (
native/core/src/execution/joins/)sort_merge_join.rs—CometSortMergeJoinExecimplementing DataFusion'sExecutionPlantraitsort_merge_join_stream.rs— Streaming state machine (Init, PollStreamed, PollBuffered, Comparing, CollectingBuffered, Joining, OutputReady, DrainUnmatched, DrainBuffered, Exhausted)buffered_batch.rs—BufferedMatchGroupwith batch-level Arrow IPC spilling viaSpillManageroutput_builder.rs— Batch materialization using Arrowtake/concatkernelsfilter.rs— Join filter evaluation with corrected masks for outer/semi/anti joinsmetrics.rs— Metrics matchingCometMetricNode.scala(input/output rows/batches, join_time, peak_mem, spill counts)tests.rs— 11 unit tests covering all join types and spillingModified files
CometConf.scala— Newspark.comet.exec.sortMergeJoin.useNativeconfigplanner.rs— Conditional operator construction based on configjni_api.rs— Pass spark config through to plannerspark_config.rs— Config constantSupported join types
All 6: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti
Key design decisions
NullEqualsNothing— null keys never match for inner/semi, emit with null counterpart for outer/anti)SpillManagerand Arrow IPC whenMemoryReservation::try_grow()failsOwnedRowvia Arrow'sRowConverterKnown limitations / future work
How are these changes tested?
CometJoinSuite(10 tests) passes with the new implementation as a drop-in replacement, including SortMergeJoin with and without join filters across all join types