diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index b39b23e30f4e8..8f5f5c0454e20 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -222,14 +222,31 @@ pub fn check_subquery_expr( check_correlations_in_subquery(inner_plan) } else { if let Expr::InSubquery(subquery) = expr { - // InSubquery should only return one column - if subquery.subquery.subquery.schema().fields().len() > 1 { + // InSubquery should only return one column UNLESS the left expression is a struct + // (multi-column IN like: (a, b) NOT IN (SELECT x, y FROM ...)) + let is_struct = matches!(*subquery.expr, Expr::ScalarFunction(ref func) if func.func.name() == "struct"); + + let num_subquery_cols = subquery.subquery.subquery.schema().fields().len(); + + if !is_struct && num_subquery_cols > 1 { return plan_err!( "InSubquery should only return one column, but found {}: {}", - subquery.subquery.subquery.schema().fields().len(), + num_subquery_cols, subquery.subquery.subquery.schema().field_names().join(", ") ); } + + // For struct expressions, validate that the number of fields matches + if is_struct && let Expr::ScalarFunction(ref func) = *subquery.expr { + let num_tuple_cols = func.args.len(); + if num_tuple_cols != num_subquery_cols { + return plan_err!( + "The number of columns in the tuple ({}) must match the number of columns in the subquery ({})", + num_tuple_cols, + num_subquery_cols + ); + } + } } if let Expr::SetComparison(set_comparison) = expr && set_comparison.subquery.subquery.schema().fields().len() > 1 diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index a98678f7cf9c4..7a54404cab272 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -482,23 +482,43 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Arc::unwrap_or_clone(subquery.subquery), )? .data; - let expr_type = expr.get_type(self.schema)?; - let subquery_type = new_plan.schema().field(0).data_type(); - let common_type = comparison_coercion(&expr_type, subquery_type).ok_or( - plan_datafusion_err!( - "expr type {expr_type} can't cast to {subquery_type} in InSubquery" - ), - )?; - let new_subquery = Subquery { - subquery: Arc::new(new_plan), - outer_ref_columns: subquery.outer_ref_columns, - spans: subquery.spans, - }; - Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( - Box::new(expr.cast_to(&common_type, self.schema)?), - cast_subquery(new_subquery, &common_type)?, - negated, - )))) + + // Check if this is a multi-column IN (struct expression) + let is_struct = matches!(*expr, Expr::ScalarFunction(ref func) if func.func.name() == "struct"); + + if is_struct { + // For multi-column IN, we don't need type coercion at this level + // The decorrelation phase will handle this by creating join conditions + let new_subquery = Subquery { + subquery: Arc::new(new_plan), + outer_ref_columns: subquery.outer_ref_columns, + spans: subquery.spans, + }; + Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( + expr, + new_subquery, + negated, + )))) + } else { + // Single-column IN: apply type coercion as before + let expr_type = expr.get_type(self.schema)?; + let subquery_type = new_plan.schema().field(0).data_type(); + let common_type = comparison_coercion(&expr_type, subquery_type).ok_or( + plan_datafusion_err!( + "expr type {expr_type} can't cast to {subquery_type} in InSubquery" + ), + )?; + let new_subquery = Subquery { + subquery: Arc::new(new_plan), + outer_ref_columns: subquery.outer_ref_columns, + spans: subquery.spans, + }; + Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( + Box::new(expr.cast_to(&common_type, self.schema)?), + cast_subquery(new_subquery, &common_type)?, + negated, + )))) + } } Expr::SetComparison(SetComparison { expr, diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index b9d160d55589f..552f8b6aeceb1 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -386,9 +386,57 @@ fn build_join( right, })), ) => { - let right_col = create_col_from_scalar_expr(right.deref(), alias)?; - let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); - in_predicate.and(join_filter) + // Check if this is a multi-column IN (struct expression) + if let Expr::ScalarFunction(func) = left.deref() { + if func.func.name() == "struct" { + // Decompose struct into individual field comparisons + let struct_args = &func.args; + + // The right side should be the subquery result + // Note: After pull-up, the subquery may have additional correlated columns + // We only care about the first N columns that match our struct fields + let subquery_fields = sub_query_alias.schema().fields(); + + if struct_args.len() > subquery_fields.len() { + return plan_err!( + "Struct field count ({}) exceeds subquery column count ({})", + struct_args.len(), + subquery_fields.len() + ); + } + + // Create equality conditions for each field + let mut conditions = Vec::new(); + for (i, arg) in struct_args.iter().enumerate() { + let field = &subquery_fields[i]; + let right_col = Expr::Column(Column::new( + Some(alias.clone()), + field.name().to_string(), + )); + conditions.push(Expr::eq(arg.clone(), right_col)); + } + + // Combine all conditions with AND + let in_predicate = conditions + .into_iter() + .reduce(|acc, cond| acc.and(cond)) + .unwrap_or_else(|| lit(true)); + + in_predicate.and(join_filter) + } else { + // Regular scalar function, handle as before + let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + let in_predicate = + Expr::eq(left.deref().clone(), Expr::Column(right_col)); + in_predicate.and(join_filter) + } + } else { + // Not a struct, handle as before + let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + let in_predicate = + Expr::eq(left.deref().clone(), Expr::Column(right_col)); + in_predicate.and(join_filter) + } } (Some(join_filter), _) => join_filter, ( @@ -399,9 +447,51 @@ fn build_join( right, })), ) => { - let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + // Check if this is a multi-column IN (struct expression) + if let Expr::ScalarFunction(func) = left.deref() { + if func.func.name() == "struct" { + // Decompose struct into individual field comparisons + let struct_args = &func.args; + + // The right side should be the subquery result + // Note: After pull-up, the subquery may have additional correlated columns + // We only care about the first N columns that match our struct fields + let subquery_fields = sub_query_alias.schema().fields(); + + if struct_args.len() > subquery_fields.len() { + return plan_err!( + "Struct field count ({}) exceeds subquery column count ({})", + struct_args.len(), + subquery_fields.len() + ); + } + + // Create equality conditions for each field + let mut conditions = Vec::new(); + for (i, arg) in struct_args.iter().enumerate() { + let field = &subquery_fields[i]; + let right_col = Expr::Column(Column::new( + Some(alias.clone()), + field.name().to_string(), + )); + conditions.push(Expr::eq(arg.clone(), right_col)); + } - Expr::eq(left.deref().clone(), Expr::Column(right_col)) + // Combine all conditions with AND + conditions + .into_iter() + .reduce(|acc, cond| acc.and(cond)) + .unwrap_or_else(|| lit(true)) + } else { + // Regular scalar function, handle as before + let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + Expr::eq(left.deref().clone(), Expr::Column(right_col)) + } + } else { + // Not a struct, handle as before + let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + Expr::eq(left.deref().clone(), Expr::Column(right_col)) + } } (None, None) => lit(true), _ => return Ok(None), diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a330ad54cb338..9666896984982 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -339,18 +339,10 @@ impl HashJoinExecBuilder { check_join_is_valid(&left_schema, &right_schema, &on)?; // Validate null_aware flag - if null_aware { - if !matches!(join_type, JoinType::LeftAnti) { - return plan_err!( - "null_aware can only be true for LeftAnti joins, got {join_type}" - ); - } - if on.len() != 1 { - return plan_err!( - "null_aware anti join only supports single column join key, got {} columns", - on.len() - ); - } + if null_aware && !matches!(join_type, JoinType::LeftAnti) { + return plan_err!( + "null_aware can only be true for LeftAnti joins, got {join_type}" + ); } let (join_schema, column_indices) = @@ -5659,26 +5651,41 @@ mod tests { ); } - /// Test that null_aware validation rejects multi-column joins + /// Test null-aware anti join with multi-column (a, b) NOT IN (SELECT x, y FROM ...) + /// When probe side (right) has NULL in ANY column, result should be empty + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn test_null_aware_validation_multi_column() { - let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3])); - let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3])); + async fn test_null_aware_anti_join_multi_column_probe_null( + batch_size: usize, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table (rows to potentially output) + let left = build_table_three_cols( + ("a", &vec![Some(1), Some(3), Some(5)]), + ("b", &vec![Some(2), Some(4), Some(6)]), + ("dummy", &vec![Some(10), Some(30), Some(50)]), + ); + + // Build right table (has NULL in second column) + let right = build_table_two_cols( + ("x", &vec![Some(1), Some(7)]), + ("y", &vec![Some(2), None]), // NULL in y column + ); - // Try multi-column join let on = vec![ ( - Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("a", &left.schema())?) as _, + Arc::new(Column::new_with_schema("x", &right.schema())?) as _, ), ( - Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b", &left.schema())?) as _, + Arc::new(Column::new_with_schema("y", &right.schema())?) as _, ), ]; - // Try to create null-aware anti join with 2 columns (should fail) - let result = HashJoinExec::try_new( + // Create null-aware anti join + let join = HashJoinExec::try_new( left, right, on, @@ -5687,15 +5694,165 @@ mod tests { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, - true, // null_aware = true (invalid for multi-column) + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: empty result (probe side has NULL in y column, so no rows should be output) + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + ++ + ++ + "); + } + Ok(()) + } + + /// Test null-aware anti join with multi-column when probe side has no NULLs + /// Expected: rows that don't match should be output, but rows with NULL keys should be filtered + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_anti_join_multi_column_no_null( + batch_size: usize, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table with some NULL keys + let left = build_table_three_cols( + ("a", &vec![Some(1), Some(3), Some(5), None]), + ("b", &vec![Some(2), Some(4), Some(6), Some(8)]), + ("dummy", &vec![Some(10), Some(30), Some(50), Some(0)]), ); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("null_aware anti join only supports single column join key") + // Build right table (no NULLs) + let right = build_table_two_cols(("x", &vec![Some(1)]), ("y", &vec![Some(2)])); + + let on = vec![ + ( + Arc::new(Column::new_with_schema("a", &left.schema())?) as _, + Arc::new(Column::new_with_schema("x", &right.schema())?) as _, + ), + ( + Arc::new(Column::new_with_schema("b", &left.schema())?) as _, + Arc::new(Column::new_with_schema("y", &right.schema())?) as _, + ), + ]; + + // Create null-aware anti join + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: (3, 4, 30) and (5, 6, 50) + // Row (1, 2, 10) matches right side, so filtered out + // Row (NULL, 8, 0) has NULL in a column, so filtered out + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +---+---+-------+ + | a | b | dummy | + +---+---+-------+ + | 3 | 4 | 30 | + | 5 | 6 | 50 | + +---+---+-------+ + "); + } + Ok(()) + } + + /// Test null-aware anti join with three columns (a, b, c) NOT IN (SELECT x, y, z FROM ...) + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_anti_join_three_columns(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table + let left = build_table_three_cols( + ("a", &vec![Some(1), Some(4), Some(7)]), + ("b", &vec![Some(2), Some(5), Some(8)]), + ("c", &vec![Some(3), Some(6), Some(9)]), + ); + + // Build right table with NULL in third column + let right = build_table_three_cols( + ("x", &vec![Some(1)]), + ("y", &vec![Some(2)]), + ("z", &vec![None]), // NULL in z column ); + + let on = vec![ + ( + Arc::new(Column::new_with_schema("a", &left.schema())?) as _, + Arc::new(Column::new_with_schema("x", &right.schema())?) as _, + ), + ( + Arc::new(Column::new_with_schema("b", &left.schema())?) as _, + Arc::new(Column::new_with_schema("y", &right.schema())?) as _, + ), + ( + Arc::new(Column::new_with_schema("c", &left.schema())?) as _, + Arc::new(Column::new_with_schema("z", &right.schema())?) as _, + ), + ]; + + // Create null-aware anti join + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: empty result (probe side has NULL in z column) + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + ++ + ++ + "); + } + Ok(()) + } + + /// Helper to build a table with three columns supporting nullable values + fn build_table_three_cols( + a: (&str, &Vec>), + b: (&str, &Vec>), + c: (&str, &Vec>), + ) -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new(a.0, DataType::Int32, true), + Field::new(b.0, DataType::Int32, true), + Field::new(c.0, DataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(a.1.clone())), + Arc::new(Int32Array::from(b.1.clone())), + Arc::new(Int32Array::from(c.1.clone())), + ], + ) + .unwrap(); + TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 54e620f99de7a..cb66ad65333c6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -619,10 +619,10 @@ impl HashJoinStream { .store(true, Ordering::Relaxed); } - // Check if probe side (RIGHT) contains NULL - // Since null_aware validation ensures single column join, we only check the first column - let probe_key_column = &state.values[0]; - if probe_key_column.null_count() > 0 { + // Check if probe side (RIGHT) contains NULL in ANY column + // For multi-column joins, if any column has NULL, the entire tuple is considered NULL + let has_null = state.values.iter().any(|col| col.null_count() > 0); + if has_null { // Found NULL in probe side - set shared flag to prevent any output build_side .left_data @@ -849,15 +849,19 @@ impl HashJoinStream { .probe_side_non_empty .load(Ordering::Relaxed) { - // Since null_aware validation ensures single column join, we only check the first column - let build_key_column = &build_side.left_data.values()[0]; + // Filter out indices where ANY key column is NULL + // For multi-column joins, if any column has NULL, the entire tuple should be filtered + let build_key_columns = build_side.left_data.values(); // Filter out indices where the key is NULL let filtered_indices: Vec = left_side .iter() .filter_map(|idx| { let idx_usize = idx.unwrap() as usize; - if build_key_column.is_null(idx_usize) { + // Check if any column has NULL at this index + let has_null = + build_key_columns.iter().any(|col| col.is_null(idx_usize)); + if has_null { None // Skip rows with NULL keys } else { Some(idx.unwrap()) diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 6837b2671cb1c..fab4ad8721bf1 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -72,15 +72,42 @@ impl SqlToRel<'_, S> { let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.set_outer_query_schema(old_outer_query_schema); - self.validate_single_column( - &sub_plan, - &spans, - "Too many columns! The subquery should only return one column", - "Select only one column in the subquery", - )?; + // Check if the left expression is a tuple (multi-column IN) + // For tuples like (a, b) NOT IN (SELECT x, y FROM ...), allow multiple columns + let is_tuple = matches!(expr, SQLExpr::Tuple(_)); + + if !is_tuple { + // Single-column IN: validate subquery returns exactly one column + self.validate_single_column( + &sub_plan, + &spans, + "Too many columns! The subquery should only return one column", + "Select only one column in the subquery", + )?; + } let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?; + // For multi-column IN, validate that the number of columns match + if is_tuple { + // Tuples are converted to struct expressions + // Extract the number of fields by checking if it's a ScalarFunction call + let tuple_len = if let Expr::ScalarFunction(func) = &expr_obj { + func.args.len() + } else { + 1 // Fallback to single column if we can't determine + }; + + let subquery_len = sub_plan.schema().fields().len(); + if tuple_len != subquery_len { + return plan_err!( + "The number of columns in the tuple ({}) must match the number of columns in the subquery ({})", + tuple_len, + subquery_len + ); + } + } + Ok(Expr::InSubquery(InSubquery::new( Box::new(expr_obj), Subquery { diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index 5907a85a9b923..80bf59083fa1e 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -394,16 +394,202 @@ query II rowsort SELECT * FROM test_table WHERE (c1 NOT IN (SELECT c2 FROM test_table)) = true; ---- -# NOTE: The correlated subquery version from issue #10583: -# SELECT * FROM test_table t1 WHERE c1 NOT IN (SELECT c2 FROM test_table t2 WHERE t1.c1 = t2.c1) -# is not yet supported because it creates a multi-column join (correlation + NOT IN condition). -# This is a known limitation - currently only supports single column null-aware anti joins. -# This will be addressed in next Phase (multi-column support). +############# +## Test 19: Multi-column NOT IN with NULL in subquery +############# + +statement ok +CREATE TABLE multi_col_outer(a INT, b INT, value TEXT) AS VALUES +(1, 2, 'x'), +(3, 4, 'y'), +(5, 6, 'z'), +(NULL, 8, 'w'); + +statement ok +CREATE TABLE multi_col_inner_with_null(x INT, y INT) AS VALUES +(1, 2), +(NULL, 4); + +# Should return empty because subquery has NULL +query IIT rowsort +SELECT * FROM multi_col_outer +WHERE (a, b) NOT IN (SELECT x, y FROM multi_col_inner_with_null); +---- + +############# +## Test 20: Multi-column NOT IN without NULL +############# + +statement ok +CREATE TABLE multi_col_inner_no_null(x INT, y INT) AS VALUES +(1, 2); + +# Should return (3, 4, 'y'), (5, 6, 'z') +# The row (NULL, 8, 'w') is not returned because NULL is not comparable +query IIT rowsort +SELECT * FROM multi_col_outer +WHERE (a, b) NOT IN (SELECT x, y FROM multi_col_inner_no_null); +---- +3 4 y +5 6 z + +############# +## Test 21: Multi-column with NULL in both left and build side columns +############# + +statement ok +CREATE TABLE multi_left_null(a INT, b INT) AS VALUES +(1, 2), +(3, NULL), +(NULL, 4), +(5, 6); + +statement ok +CREATE TABLE multi_right_no_null(x INT, y INT) AS VALUES +(1, 2); + +# Should return (5, 6) only +# (3, NULL) filtered because b is NULL +# (NULL, 4) filtered because a is NULL +query II rowsort +SELECT * FROM multi_left_null +WHERE (a, b) NOT IN (SELECT x, y FROM multi_right_no_null); +---- +5 6 + +############# +## Test 22: Three-column NOT IN +############# + +statement ok +CREATE TABLE three_col_outer(a INT, b INT, c INT) AS VALUES +(1, 2, 3), +(4, 5, 6), +(7, 8, 9); + +statement ok +CREATE TABLE three_col_inner(x INT, y INT, z INT) AS VALUES +(1, 2, 3); + +# Should return (4, 5, 6) and (7, 8, 9) +query III rowsort +SELECT * FROM three_col_outer +WHERE (a, b, c) NOT IN (SELECT x, y, z FROM three_col_inner); +---- +4 5 6 +7 8 9 + +############# +## Test 23: Three-column NOT IN with NULL +############# + +statement ok +CREATE TABLE three_col_inner_null(x INT, y INT, z INT) AS VALUES +(1, 2, 3), +(4, NULL, 6); + +# Should return empty because subquery has NULL +query III rowsort +SELECT * FROM three_col_outer +WHERE (a, b, c) NOT IN (SELECT x, y, z FROM three_col_inner_null); +---- + +############# +## Test 24: Correlated multi-column NOT IN (from issue #10583) +############# + +statement ok +CREATE TABLE test_multi(c1 INT, c2 INT, c3 INT) AS VALUES +(1, 1, 1), +(2, 2, 2), +(3, 3, 3), +(4, 4, NULL); + +# Correlated NOT IN with multi-column +# Should return rows where (c2, c3) doesn't match in same c1 group +# Row (4, 4, NULL) should not appear because it has NULL in c3 +query III rowsort +SELECT * FROM test_multi t1 +WHERE (c2, c3) NOT IN ( + SELECT c2, c3 FROM test_multi t2 WHERE t1.c1 = t2.c1 +); +---- + +# Verify the EXPLAIN plan shows LeftAnti join with multi-column join conditions +query TT +EXPLAIN SELECT * FROM test_multi t1 +WHERE (c2, c3) NOT IN ( + SELECT c2, c3 FROM test_multi t2 WHERE t1.c1 = t2.c1 +); +---- +logical_plan +01)LeftAnti Join: t1.c2 = __correlated_sq_1.c2, t1.c3 = __correlated_sq_1.c3, t1.c1 = __correlated_sq_1.c1 +02)--SubqueryAlias: t1 +03)----TableScan: test_multi projection=[c1, c2, c3] +04)--SubqueryAlias: __correlated_sq_1 +05)----Projection: t2.c2, t2.c3, t2.c1 +06)------SubqueryAlias: t2 +07)--------TableScan: test_multi projection=[c1, c2, c3] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(c2@1, c2@0), (c3@2, c3@1), (c1@0, c1@2)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +############# +## Test 25: Multi-column NOT IN with empty subquery +############# + +statement ok +CREATE TABLE multi_empty(x INT, y INT); + +# Empty subquery should return all rows +query IIT rowsort +SELECT * FROM multi_col_outer +WHERE (a, b) NOT IN (SELECT x, y FROM multi_empty); +---- +1 2 x +3 4 y +5 6 z +NULL 8 w ############# ## Cleanup ############# +statement ok +DROP TABLE multi_col_outer; + +statement ok +DROP TABLE multi_col_inner_with_null; + +statement ok +DROP TABLE multi_col_inner_no_null; + +statement ok +DROP TABLE multi_left_null; + +statement ok +DROP TABLE multi_right_no_null; + +statement ok +DROP TABLE three_col_outer; + +statement ok +DROP TABLE three_col_inner; + +statement ok +DROP TABLE three_col_inner_null; + +statement ok +DROP TABLE test_multi; + +statement ok +DROP TABLE multi_empty; + +############# +## Cleanup (original tables) +############# + statement ok DROP TABLE test_table; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e73f4ec3e32da..be2f5ba2be2ec 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1528,3 +1528,196 @@ logical_plan 20)--------SubqueryAlias: set_cmp_s 21)----------Projection: column1 AS v 22)------------Values: (Int64(5)), (Int64(NULL)) + +############# +## Multi-column IN (LeftSemi) Tests +############# +## These tests verify that multi-column IN subqueries work correctly +## Multi-column IN uses LeftSemi join (not null-aware) + +############# +## Test 1: Basic two-column IN +############# + +statement ok +CREATE TABLE multi_in_left(a INT, b INT, value TEXT) AS VALUES +(1, 2, 'match'), +(3, 4, 'no_match'), +(5, 6, 'match'); + +statement ok +CREATE TABLE multi_in_right(x INT, y INT) AS VALUES +(1, 2), +(5, 6); + +# Should return rows where (a, b) matches (x, y) +query IIT rowsort +SELECT * FROM multi_in_left +WHERE (a, b) IN (SELECT x, y FROM multi_in_right); +---- +1 2 match +5 6 match + +############# +## Test 2: Multi-column IN with no matches +############# + +statement ok +CREATE TABLE multi_in_right_no_match(x INT, y INT) AS VALUES +(10, 20), +(30, 40); + +# Should return empty result +query IIT rowsort +SELECT * FROM multi_in_left +WHERE (a, b) IN (SELECT x, y FROM multi_in_right_no_match); +---- + +############# +## Test 3: Multi-column IN with NULL values +############# +## Note: Unlike NOT IN, regular IN does NOT use null-aware semantics +## NULL = NULL is always FALSE (not unknown) in regular semi joins + +statement ok +CREATE TABLE multi_in_left_null(a INT, b INT, value TEXT) AS VALUES +(1, 2, 'x'), +(3, NULL, 'y'), +(NULL, 6, 'z'); + +statement ok +CREATE TABLE multi_in_right_null(x INT, y INT) AS VALUES +(1, 2), +(NULL, 4); + +# Should return only (1, 2, 'x') +# (3, NULL, 'y') doesn't match because NULL doesn't equal anything +# (NULL, 6, 'z') doesn't match because NULL doesn't equal anything +query IIT rowsort +SELECT * FROM multi_in_left_null +WHERE (a, b) IN (SELECT x, y FROM multi_in_right_null); +---- +1 2 x + +############# +## Test 4: Three-column IN +############# + +statement ok +CREATE TABLE three_col_left(a INT, b INT, c INT, value TEXT) AS VALUES +(1, 2, 3, 'match1'), +(4, 5, 6, 'no_match'), +(7, 8, 9, 'match2'); + +statement ok +CREATE TABLE three_col_right(x INT, y INT, z INT) AS VALUES +(1, 2, 3), +(7, 8, 9); + +# Should return rows with matching three-column tuples +query IIIT rowsort +SELECT * FROM three_col_left +WHERE (a, b, c) IN (SELECT x, y, z FROM three_col_right); +---- +1 2 3 match1 +7 8 9 match2 + +############# +## Test 5: Correlated multi-column IN +############# + +statement ok +CREATE TABLE correlated_outer(id INT, a INT, b INT) AS VALUES +(1, 10, 20), +(2, 30, 40), +(3, 10, 20); + +statement ok +CREATE TABLE correlated_inner(id INT, x INT, y INT) AS VALUES +(1, 10, 20), +(1, 30, 40), +(2, 50, 60), +(3, 10, 20); + +# Should return outer rows where (a, b) matches (x, y) for the same id +query III rowsort +SELECT * FROM correlated_outer o +WHERE (a, b) IN ( + SELECT x, y FROM correlated_inner i WHERE i.id = o.id +); +---- +1 10 20 +3 10 20 + +############# +## Test 6: Verify logical plan shows LeftSemi join with multiple conditions +############# + +query TT +EXPLAIN SELECT * FROM multi_in_left +WHERE (a, b) IN (SELECT x, y FROM multi_in_right); +---- +logical_plan +01)LeftSemi Join: multi_in_left.a = __correlated_sq_1.x, multi_in_left.b = __correlated_sq_1.y +02)--TableScan: multi_in_left projection=[a, b, value] +03)--SubqueryAlias: __correlated_sq_1 +04)----TableScan: multi_in_right projection=[x, y] + +############# +## Test 7: Multi-column IN with empty subquery +############# + +statement ok +CREATE TABLE multi_in_right_empty(x INT, y INT); + +# Should return empty result (empty subquery) +query IIT rowsort +SELECT * FROM multi_in_left +WHERE (a, b) IN (SELECT x, y FROM multi_in_right_empty); +---- + +############# +## Test 8: Multi-column IN with WHERE clause in subquery +############# + +# Should return only rows matching filtered subquery +query IIT rowsort +SELECT * FROM multi_in_left +WHERE (a, b) IN (SELECT x, y FROM multi_in_right WHERE x > 0 AND y < 10); +---- +1 2 match +5 6 match + +############# +## Cleanup +############# + +statement ok +DROP TABLE multi_in_left; + +statement ok +DROP TABLE multi_in_right; + +statement ok +DROP TABLE multi_in_right_no_match; + +statement ok +DROP TABLE multi_in_left_null; + +statement ok +DROP TABLE multi_in_right_null; + +statement ok +DROP TABLE three_col_left; + +statement ok +DROP TABLE three_col_right; + +statement ok +DROP TABLE correlated_outer; + +statement ok +DROP TABLE correlated_inner; + +statement ok +DROP TABLE multi_in_right_empty;