Skip to content
Closed

WIP #20951

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 22 additions & 80 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,9 @@ async fn collect_left_input(
offset += batch.num_rows();
}

// Flatten linked-list chains into contiguous storage for faster probing
hashmap.flatten();

// Merge all batches into a single batch, so we can directly index into the arrays
let batch = concat_batches(&schema, batches_iter.clone())?;

Expand Down Expand Up @@ -4295,9 +4298,10 @@ mod tests {
Ok(())
}

#[test]
fn join_with_hash_collisions_64() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
/// Tests that hash collisions are resolved by equal_rows_arr.
/// Both build rows get the same collision hash; the probe should still
/// match each to the correct build row via value comparison.
fn hash_collision_test<T: JoinHashMapType>(mut join_hash_map: T) -> Result<()> {
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
Expand All @@ -4306,36 +4310,27 @@ mod tests {

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;

// Maps both values to both indices (1 and 2, representing input 0 and 1)
// 0 -> (0, 1)
// 1 -> (0, 2)
// The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;

hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);

let next = vec![2, 0];
// Force all build rows to have the same hash (collision)
let collision_hash = hashes_buff[0];
let fake_hashes = vec![collision_hash; left.num_rows()];
join_hash_map.update_from_iter(Box::new(fake_hashes.iter().enumerate()), 0);
join_hash_map.flatten();

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

// Join key column for both join sides
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;

let join_hash_map = JoinHashMapU64::new(hashmap_left, next);

let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

// Probe hashes also use collision_hash for all entries
let hashes_buffer = vec![collision_hash; right.num_rows()];

let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
Expand All @@ -4352,74 +4347,21 @@ mod tests {
)?;

let left_ids: UInt64Array = vec![0, 1].into();

let right_ids: UInt32Array = vec![0, 1].into();

assert_eq!(left_ids, l);

assert_eq!(right_ids, r);

Ok(())
}

#[test]
fn join_with_hash_collisions_u32() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;

hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);

let next: Vec<u32> = vec![2, 0];

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;

let join_hash_map = JoinHashMapU32::new(hashmap_left, next);

let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;

// We still expect to match rows 0 and 1 on both sides
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();

assert_eq!(left_ids, l);
assert_eq!(right_ids, r);
fn join_with_hash_collisions_64() -> Result<()> {
hash_collision_test(JoinHashMapU64::with_capacity(2))
}

Ok(())
#[test]
fn join_with_hash_collisions_u32() -> Result<()> {
hash_collision_test(JoinHashMapU32::with_capacity(2))
}

#[tokio::test]
Expand Down
Loading
Loading