diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 7e291afa04b6e..4e51e189b480f 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -261,7 +261,7 @@ impl SqlToRel<'_, S> { select_exprs: mut select_exprs_post_aggr, having_expr: having_expr_post_aggr, qualify_expr: qualify_expr_post_aggr, - order_by_exprs: order_by_rex, + order_by_exprs: mut order_by_rex, } = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() { self.aggregate( &base_plan, @@ -297,14 +297,23 @@ impl SqlToRel<'_, S> { plan }; - // The outer expressions we will search through for window functions. - // Window functions may be sourced from the SELECT list or from the QUALIFY expression. - let windows_expr_haystack = select_exprs_post_aggr - .iter() - .chain(qualify_expr_post_aggr.iter()); + // The window expressions from SELECT and QUALIFY only, used to validate that + // QUALIFY is used with window functions (ORDER BY window functions don't count). + let qualify_window_func_exprs = find_window_exprs( + select_exprs_post_aggr + .iter() + .chain(qualify_expr_post_aggr.iter()), + ); + // All of the window expressions (deduplicated and rewritten to reference aggregates as - // columns from input). - let window_func_exprs = find_window_exprs(windows_expr_haystack); + // columns from input). Window functions may be sourced from the SELECT list, QUALIFY + // expression, or ORDER BY. + let window_func_exprs = find_window_exprs( + select_exprs_post_aggr + .iter() + .chain(qualify_expr_post_aggr.iter()) + .chain(order_by_rex.iter().map(|s| &s.expr)), + ); // Process window functions after aggregation as they can reference // aggregate functions in their body @@ -319,14 +328,25 @@ impl SqlToRel<'_, S> { .map(|expr| rebase_expr(expr, &window_func_exprs, &plan)) .collect::>>()?; + order_by_rex = order_by_rex + .into_iter() + .map(|sort_expr| { + Ok(sort_expr.with_expr(rebase_expr( + &sort_expr.expr, + &window_func_exprs, + &plan, + )?)) + }) + .collect::>>()?; + plan }; // Process QUALIFY clause after window functions // QUALIFY filters the results of window functions, similar to how HAVING filters aggregates let plan = if let Some(qualify_expr) = qualify_expr_post_aggr { - // Validate that QUALIFY is used with window functions - if window_func_exprs.is_empty() { + // Validate that QUALIFY is used with window functions in SELECT or QUALIFY + if qualify_window_func_exprs.is_empty() { return plan_err!( "QUALIFY clause requires window functions in the SELECT list or QUALIFY clause" ); diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 29c17be69ce5f..346f6929da439 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2805,6 +2805,138 @@ fn over_order_by_with_window_frame_double_end() { ); } +#[test] +fn window_function_only_in_order_by() { + let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id + Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST + Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: orders + " + ); +} + +#[test] +fn window_function_in_select_and_order_by() { + let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id) FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST + Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: orders + " + ); +} + +#[test] +fn window_function_in_order_by_nested_expr() { + let sql = + "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) + 1"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id + Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + Int64(1) ASC NULLS LAST + Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: orders + " + ); +} + +#[test] +fn window_function_in_order_by_desc() { + let sql = + "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) DESC"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id + Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW DESC NULLS FIRST + Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: orders + " + ); +} + +#[test] +fn multiple_window_functions_in_order_by() { + let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id), MIN(qty) OVER (ORDER BY order_id DESC)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id + Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST + Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + WindowAggr: windowExpr=[[min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: orders + " + ); +} + +#[test] +fn window_function_in_order_by_with_group_by() { + let sql = "SELECT order_id, SUM(qty) FROM orders GROUP BY order_id ORDER BY MAX(SUM(qty)) OVER (ORDER BY order_id)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id, sum(orders.qty) + Sort: max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST + Projection: orders.order_id, sum(orders.qty), max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + WindowAggr: windowExpr=[[max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + Aggregate: groupBy=[[orders.order_id]], aggr=[[sum(orders.qty)]] + TableScan: orders + " + ); +} + +#[test] +fn window_function_in_order_by_with_qualify() { + let sql = "SELECT person.id, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn FROM person QUALIFY rn = 1 ORDER BY ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Sort: rn ASC NULLS LAST + Projection: person.id, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn + Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = Int64(1) + WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: person + " + ); +} + +#[test] +fn window_function_in_order_by_not_in_select() { + let sql = + "SELECT order_id FROM orders ORDER BY MIN(qty) OVER (PARTITION BY order_id)"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r" + Projection: orders.order_id + Sort: min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ASC NULLS LAST + Projection: orders.order_id, min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + WindowAggr: windowExpr=[[min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] + TableScan: orders + " + ); +} + #[test] fn over_order_by_with_window_frame_single_end() { let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id ROWS 3 PRECEDING), MIN(qty) OVER (ORDER BY order_id DESC) from orders"; @@ -4256,6 +4388,16 @@ fn test_select_qualify_without_window_function() { ); } +#[test] +fn test_select_qualify_without_window_function_but_window_in_order_by() { + let sql = "SELECT person.id FROM person QUALIFY person.id > 1 ORDER BY ROW_NUMBER() OVER (ORDER BY person.id)"; + let err = logical_plan(sql).unwrap_err(); + assert_eq!( + err.strip_backtrace(), + "Error during planning: QUALIFY clause requires window functions in the SELECT list or QUALIFY clause" + ); +} + #[test] fn test_select_qualify_complex_condition() { let sql = "SELECT person.id, person.age, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn, RANK() OVER (ORDER BY person.salary) as rank FROM person QUALIFY rn <= 2 AND rank <= 5"; diff --git a/datafusion/sqllogictest/test_files/window_order_by.slt b/datafusion/sqllogictest/test_files/window_order_by.slt new file mode 100644 index 0000000000000..e241d9353e926 --- /dev/null +++ b/datafusion/sqllogictest/test_files/window_order_by.slt @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for window functions used in ORDER BY clause + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +# Window function only in ORDER BY +query I +SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5; +---- +4 +2 +5 +2 +2 + +# Window function in both SELECT and ORDER BY (deduplication) +query II +SELECT c2, row_number() OVER (ORDER BY c9) as rn FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5; +---- +4 1 +2 2 +5 3 +2 4 +2 5 + +# Nested expression: ORDER BY window_func(...) + 1 +query I +SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) + 1 LIMIT 5; +---- +4 +2 +5 +2 +2 + +# Multiple window functions in ORDER BY +query I +SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9), max(c3) OVER (ORDER BY c9) LIMIT 5; +---- +4 +2 +5 +2 +2 + +# DESC ordering with window function +query I +SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) DESC LIMIT 5; +---- +5 +1 +1 +2 +1