From dee0d1fd0507168eb3fd6548b37bd047ee9356e5 Mon Sep 17 00:00:00 2001 From: Munawwar Date: Sat, 25 Apr 2026 14:40:20 +0400 Subject: [PATCH 1/4] fix: schema-sharded fallback routing (#819) --- .../frontend/router/parser/query/delete.rs | 34 +++-- pgdog/src/frontend/router/parser/query/mod.rs | 32 +++-- .../frontend/router/parser/query/select.rs | 22 ++- .../frontend/router/parser/query/update.rs | 34 +++-- pgdog/src/frontend/router/parser/statement.rs | 133 ++++++++++++++++++ 5 files changed, 223 insertions(+), 32 deletions(-) diff --git a/pgdog/src/frontend/router/parser/query/delete.rs b/pgdog/src/frontend/router/parser/query/delete.rs index ddba1b9e3..6a9dea93d 100644 --- a/pgdog/src/frontend/router/parser/query/delete.rs +++ b/pgdog/src/frontend/router/parser/query/delete.rs @@ -13,12 +13,6 @@ impl QueryParser { self.recorder_mut(), ); - let is_sharded = parser.is_sharded( - &context.router_context.schema, - context.router_context.cluster.user(), - context.router_context.parameter_hints.search_path, - ); - let shard = parser.shard()?; if let Some(shard) = shard { @@ -32,14 +26,34 @@ impl QueryParser { .shards_calculator .push(ShardWithPriority::new_table(shard)); } else { - if let Some(recorder) = self.recorder_mut() { - recorder.record_entry(None, "DELETE fell back to broadcast"); - } - if is_sharded { + let schema_shard_state = parser.schema_shard_state( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + let is_sharded = parser.is_sharded( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + if let SchemaShardState::Resolved { shard, schema } = schema_shard_state { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(Some(shard.clone()), "DELETE matched schema"); + } + context + .shards_calculator + .push(ShardWithPriority::new_search_path(shard, &schema)); + } else if matches!(schema_shard_state, SchemaShardState::Ambiguous) || is_sharded { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(None, "DELETE fell back to broadcast"); + } context .shards_calculator .push(ShardWithPriority::new_table(Shard::All)); } else { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(None, "DELETE fell back to omnisharded"); + } context .shards_calculator .push(ShardWithPriority::new_rr_omni(Shard::All)); diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 36eb8c11f..d405c8257 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -17,6 +17,7 @@ use crate::{ plugin::plugins, }; +use super::statement::SchemaShardState; use super::{ explain_trace::{ExplainRecorder, ExplainSummary}, *, @@ -530,18 +531,29 @@ impl QueryParser { ) .with_schema_lookup(schema_lookup); - let is_sharded = parser.is_sharded( - &context.router_context.schema, - context.router_context.cluster.user(), - context.router_context.parameter_hints.search_path, - ); - - let shard = parser.shard()?.unwrap_or(Shard::All); + let shard = parser.shard()?; - context.shards_calculator.push(if is_sharded { - ShardWithPriority::new_table(shard.clone()) + context.shards_calculator.push(if let Some(shard) = shard { + ShardWithPriority::new_table(shard) } else { - ShardWithPriority::new_table_omni(shard) + let schema_shard_state = parser.schema_shard_state( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + let is_sharded = parser.is_sharded( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + + if let SchemaShardState::Resolved { shard, schema } = schema_shard_state { + ShardWithPriority::new_search_path(shard, &schema) + } else if matches!(schema_shard_state, SchemaShardState::Ambiguous) || is_sharded { + ShardWithPriority::new_table(Shard::All) + } else { + ShardWithPriority::new_table_omni(Shard::All) + } }); let shard = context.shards_calculator.shard(); diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index bcfc38370..5c8e17660 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -57,7 +57,7 @@ impl QueryParser { let mut shards = HashSet::new(); - let (shard, is_sharded, tables, advisory_locks) = { + let (shard, schema_shard_state, is_sharded, tables, advisory_locks) = { let mut statement_parser = StatementParser::from_select( stmt, context.router_context.bind, @@ -72,10 +72,16 @@ impl QueryParser { let advisory_locks = statement_parser.extract_advisory_locks(); if shard.is_some() { - (shard, true, vec![], advisory_locks) + (shard, SchemaShardState::None, true, vec![], advisory_locks) } else { + let schema_shard_state = statement_parser.schema_shard_state( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); ( None, + schema_shard_state, statement_parser.is_sharded( &context.router_context.schema, context.router_context.cluster.user(), @@ -148,6 +154,18 @@ impl QueryParser { context .shards_calculator .push(ShardWithPriority::new_table(shard)); + } else if let SchemaShardState::Resolved { shard, schema } = schema_shard_state { + debug!("resolved schema-sharded query via search_path/default schema"); + + context + .shards_calculator + .push(ShardWithPriority::new_search_path(shard, &schema)); + } else if matches!(schema_shard_state, SchemaShardState::Ambiguous) { + debug!("schema-sharded query is ambiguous, routing as cross-shard"); + + context + .shards_calculator + .push(ShardWithPriority::new_table(Shard::All)); } else if is_sharded { debug!("table is sharded, but no sharding key detected"); diff --git a/pgdog/src/frontend/router/parser/query/update.rs b/pgdog/src/frontend/router/parser/query/update.rs index f4e78f4f0..17034f5a7 100644 --- a/pgdog/src/frontend/router/parser/query/update.rs +++ b/pgdog/src/frontend/router/parser/query/update.rs @@ -13,12 +13,6 @@ impl QueryParser { self.recorder_mut(), ); - let is_sharded = parser.is_sharded( - &context.router_context.schema, - context.router_context.cluster.user(), - context.router_context.parameter_hints.search_path, - ); - let shard = parser.shard()?; if let Some(shard) = shard { if let Some(recorder) = self.recorder_mut() { @@ -31,14 +25,34 @@ impl QueryParser { .shards_calculator .push(ShardWithPriority::new_table(shard)); } else { - if let Some(recorder) = self.recorder_mut() { - recorder.record_entry(None, "UPDATE fell back to broadcast"); - } - if is_sharded { + let schema_shard_state = parser.schema_shard_state( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + let is_sharded = parser.is_sharded( + &context.router_context.schema, + context.router_context.cluster.user(), + context.router_context.parameter_hints.search_path, + ); + if let SchemaShardState::Resolved { shard, schema } = schema_shard_state { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(Some(shard.clone()), "UPDATE matched schema"); + } + context + .shards_calculator + .push(ShardWithPriority::new_search_path(shard, &schema)); + } else if matches!(schema_shard_state, SchemaShardState::Ambiguous) || is_sharded { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(None, "UPDATE fell back to broadcast"); + } context .shards_calculator .push(ShardWithPriority::new_table(Shard::All)); } else { + if let Some(recorder) = self.recorder_mut() { + recorder.record_entry(None, "UPDATE fell back to omnisharded"); + } context .shards_calculator .push(ShardWithPriority::new_table_omni(Shard::All)); diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index cec05c806..3b3ecb087 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -407,6 +407,13 @@ enum Statement<'a> { Insert(&'a InsertStmt), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum SchemaShardState { + None, + Resolved { shard: Shard, schema: String }, + Ambiguous, +} + /// Context for looking up table columns from the database schema. /// Used for INSERT statements without explicit column lists. pub struct SchemaLookupContext<'a> { @@ -604,6 +611,52 @@ impl<'a, 'b, 'c> StatementParser<'a, 'b, 'c> { Ok(None) } + pub(crate) fn schema_shard_state( + &mut self, + db_schema: &Schema, + user: &str, + search_path: Option<&ParameterValue>, + ) -> SchemaShardState { + if self.schema.schemas.is_empty() { + return SchemaShardState::None; + } + + let tables = self.tables().to_vec(); + let mut schema_sharder = SchemaSharder::default(); + let default_schema_mapping = self.schema.schemas.get(None).is_some(); + let mut ambiguous = false; + + for table in tables { + if let Some(schema) = table.schema { + schema_sharder.resolve(Some(schema.into()), &self.schema.schemas); + continue; + } + + if let Some(relation) = db_schema.table(table, user, search_path) { + schema_sharder.resolve(Some(relation.schema().into()), &self.schema.schemas); + continue; + } + + ambiguous |= default_schema_mapping + || self + .schema + .schemas + .keys() + .any(|schema| db_schema.get(schema, table.name).is_some()); + } + + if ambiguous { + SchemaShardState::Ambiguous + } else if let Some((shard, schema)) = schema_sharder.get() { + SchemaShardState::Resolved { + shard, + schema: schema.to_owned(), + } + } else { + SchemaShardState::None + } + } + /// Check that the query references a table that contains a sharded /// column. This check is needed in case sharded tables config /// doesn't specify a table name and should short-circuit if it does. @@ -2549,6 +2602,86 @@ mod test { assert_eq!(result2, Some(Shard::Direct(2))); } + fn make_test_schema_with_sharded_relations() -> crate::backend::Schema { + let relations = HashMap::from([ + ( + ("sales".into(), "products".into()), + Relation::test_table("sales", "products", IndexMap::new()), + ), + ( + ("inventory".into(), "products".into()), + Relation::test_table("inventory", "products", IndexMap::new()), + ), + ( + ("public".into(), "unsharded_table".into()), + Relation::test_table("public", "unsharded_table", IndexMap::new()), + ), + ]); + crate::backend::Schema::from_parts(vec!["$user".into(), "public".into()], relations) + } + + fn run_schema_shard_state_test( + stmt: &str, + search_path: Option, + ) -> Result { + let schema = ShardingSchema { + shards: 3, + schemas: ShardedSchemas::new(vec![ + ShardedSchema { + database: "test".to_string(), + name: Some("sales".to_string()), + shard: 1, + all: false, + }, + ShardedSchema { + database: "test".to_string(), + name: Some("inventory".to_string()), + shard: 2, + all: false, + }, + ]), + ..Default::default() + }; + let db_schema = make_test_schema_with_sharded_relations(); + let raw = pg_query::parse(stmt) + .unwrap() + .protobuf + .stmts + .first() + .cloned() + .unwrap(); + let mut parser = StatementParser::from_raw(&raw, None, &schema, None)?; + Ok(parser.schema_shard_state(&db_schema, "pgdog", search_path.as_ref())) + } + + #[test] + fn test_schema_shard_state_ambiguous_without_search_path() { + let result = run_schema_shard_state_test("SELECT * FROM products", None).unwrap(); + assert_eq!(result, SchemaShardState::Ambiguous); + } + + #[test] + fn test_schema_shard_state_resolved_from_search_path() { + let result = run_schema_shard_state_test( + "SELECT * FROM products", + Some(ParameterValue::Tuple(vec!["sales".into(), "public".into()])), + ) + .unwrap(); + assert_eq!( + result, + SchemaShardState::Resolved { + shard: Shard::Direct(1), + schema: "sales".into(), + } + ); + } + + #[test] + fn test_schema_shard_state_none_for_unsharded_table() { + let result = run_schema_shard_state_test("SELECT * FROM unsharded_table", None).unwrap(); + assert_eq!(result, SchemaShardState::None); + } + // Column-only sharded table detection tests (using loaded schema) fn run_test_column_only(stmt: &str, bind: Option<&Bind>) -> Result, Error> { From b9393e042f72fcd947b681d0626fc0b9fba6f823 Mon Sep 17 00:00:00 2001 From: Munawwar Date: Sat, 2 May 2026 11:29:58 +0400 Subject: [PATCH 2/4] Add ambiguous schema sharding regression test --- .../query_engine/test/set_schema_sharding.rs | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pgdog/src/frontend/client/query_engine/test/set_schema_sharding.rs b/pgdog/src/frontend/client/query_engine/test/set_schema_sharding.rs index 671c7f977..c7bcdf7a3 100644 --- a/pgdog/src/frontend/client/query_engine/test/set_schema_sharding.rs +++ b/pgdog/src/frontend/client/query_engine/test/set_schema_sharding.rs @@ -37,3 +37,27 @@ async fn test_set_works_cross_shard_disabled() { let reply = client.read_until('Z').await.unwrap(); assert_eq!(reply.len(), 2); } + +#[tokio::test] +async fn test_ambiguous_schema_sharded_query_errors_when_cross_shard_disabled() { + let table = "schema_shard_ambiguous_test"; + + let mut setup = TestClient::new_sharded(Parameters::default()).await; + for stmt in [ + "CREATE SCHEMA IF NOT EXISTS acustomer".to_string(), + "CREATE SCHEMA IF NOT EXISTS bcustomer".to_string(), + format!("CREATE TABLE IF NOT EXISTS acustomer.{table} (id INT)"), + format!("CREATE TABLE IF NOT EXISTS bcustomer.{table} (id INT)"), + ] { + setup.send_simple(Query::new(&stmt)).await; + setup.read_until('Z').await.unwrap(); + } + + let mut client = TestClient::new_cross_shard_disabled(Parameters::default()).await; + client + .send_simple(Query::new(&format!("SELECT * FROM {table}"))) + .await; + let err = client.read_until('Z').await.unwrap_err(); + assert_eq!(err.code, "58000"); + assert_eq!(err.message, "cross-shard queries are disabled"); +} From 1b915e744efb9ee64d0948be5994ba5c1e7bed6e Mon Sep 17 00:00:00 2001 From: Munawwar Date: Sat, 2 May 2026 18:51:11 +0400 Subject: [PATCH 3/4] more tests --- .../parser/query/test/test_search_path.rs | 223 +++++++++++++----- 1 file changed, 161 insertions(+), 62 deletions(-) diff --git a/pgdog/src/frontend/router/parser/query/test/test_search_path.rs b/pgdog/src/frontend/router/parser/query/test/test_search_path.rs index 63f0b15de..36886e5b5 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_search_path.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_search_path.rs @@ -1,17 +1,44 @@ -use crate::frontend::router::parser::route::RoundRobinReason; +use crate::frontend::router::parser::route::{RoundRobinReason, TableReason}; use crate::frontend::router::parser::{route::ShardSource, Shard}; +use crate::frontend::{router::parser::Route, Command}; use crate::net::parameter::ParameterValue; use super::setup::{QueryParserTest, *}; // --- search_path routing for DML --- +fn tuple(path: &[&str]) -> ParameterValue { + ParameterValue::Tuple(path.iter().map(|schema| (*schema).into()).collect()) +} + +fn assert_search_path_route(route: &Route, shard: usize, schema: &str) { + assert_eq!(route.shard(), &Shard::Direct(shard)); + assert!(route.is_search_path_driven()); + assert_eq!( + route.shard_with_priority().source(), + &ShardSource::SearchPath(schema.into()) + ); +} + +fn assert_sharded_broadcast(command: &Command) { + assert!(command.route().is_write()); + assert_eq!(command.route().shard(), &Shard::All); + assert_eq!( + command.route().shard_with_priority().source(), + &ShardSource::Table(TableReason::Sharded) + ); +} + +fn assert_omni_write(command: &Command, source: ShardSource) { + assert!(command.route().is_write()); + assert!(command.route().is_omni()); + assert_eq!(command.route().shard_with_priority().source(), &source); +} + #[test] fn test_search_path_shard_0_routes_select() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["$user".into(), "shard_0".into(), "public".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["$user", "shard_0", "public"])); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -19,12 +46,21 @@ fn test_search_path_shard_0_routes_select() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } +#[test] +fn test_search_path_shard_0_routes_select_without_shard_key() { + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["$user", "shard_0", "public"])); + + let command = test.execute(vec![Query::new("SELECT * FROM users").into()]); + + assert!(command.route().is_read()); + assert_search_path_route(command.route(), 0, "shard_0"); +} + #[test] fn test_search_path_shard_1_routes_select() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["public".into(), "shard_1".into(), "$user".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["public", "shard_1", "$user"])); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -34,10 +70,8 @@ fn test_search_path_shard_1_routes_select() { #[test] fn test_search_path_shard_0_routes_insert() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["pg_catalog".into(), "shard_0".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["pg_catalog", "shard_0"])); let command = test.execute(vec![Query::new( "INSERT INTO users (id, name) VALUES (1, 'test')", @@ -48,16 +82,23 @@ fn test_search_path_shard_0_routes_insert() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } +#[test] +fn test_search_path_shard_0_routes_insert_without_shard_key() { + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["pg_catalog", "shard_0"])); + + let command = test.execute(vec![ + Query::new("INSERT INTO users (name) VALUES ('test')").into() + ]); + + assert_search_path_route(command.route(), 0, "shard_0"); +} + #[test] fn test_search_path_shard_1_routes_insert() { let mut test = QueryParserTest::new().with_param( "search_path", - ParameterValue::Tuple(vec![ - "$user".into(), - "public".into(), - "pg_temp".into(), - "shard_1".into(), - ]), + tuple(&["$user", "public", "pg_temp", "shard_1"]), ); let command = test.execute(vec![Query::new( @@ -71,10 +112,8 @@ fn test_search_path_shard_1_routes_insert() { #[test] fn test_search_path_shard_0_routes_update() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["shard_0".into(), "pg_catalog".into(), "public".into()]), - ); + let mut test = QueryParserTest::new() + .with_param("search_path", tuple(&["shard_0", "pg_catalog", "public"])); let command = test.execute(vec![Query::new( "UPDATE users SET name = 'updated' WHERE id = 1", @@ -85,12 +124,21 @@ fn test_search_path_shard_0_routes_update() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } +#[test] +fn test_search_path_shard_0_routes_update_without_shard_key() { + let mut test = QueryParserTest::new() + .with_expanded_explain() + .with_param("search_path", tuple(&["shard_0", "pg_catalog", "public"])); + + let command = test.execute(vec![Query::new("UPDATE users SET name = 'updated'").into()]); + + assert_search_path_route(command.route(), 0, "shard_0"); +} + #[test] fn test_search_path_shard_1_routes_update() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["information_schema".into(), "shard_1".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["information_schema", "shard_1"])); let command = test.execute(vec![Query::new( "UPDATE users SET name = 'updated' WHERE id = 1", @@ -103,10 +151,8 @@ fn test_search_path_shard_1_routes_update() { #[test] fn test_search_path_shard_0_routes_delete() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["public".into(), "$user".into(), "shard_0".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["public", "$user", "shard_0"])); let command = test.execute(vec![Query::new("DELETE FROM users WHERE id = 1").into()]); @@ -114,12 +160,20 @@ fn test_search_path_shard_0_routes_delete() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } +#[test] +fn test_search_path_shard_0_routes_delete_without_shard_key() { + let mut test = QueryParserTest::new() + .with_expanded_explain() + .with_param("search_path", tuple(&["public", "$user", "shard_0"])); + + let command = test.execute(vec![Query::new("DELETE FROM users").into()]); + + assert_search_path_route(command.route(), 0, "shard_0"); +} + #[test] fn test_search_path_shard_1_routes_delete() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["shard_1".into(), "public".into()]), - ); + let mut test = QueryParserTest::new().with_param("search_path", tuple(&["shard_1", "public"])); let command = test.execute(vec![Query::new("DELETE FROM users WHERE id = 1").into()]); @@ -133,12 +187,7 @@ fn test_search_path_shard_1_routes_delete() { fn test_search_path_first_shard_wins_shard_0() { let mut test = QueryParserTest::new().with_param( "search_path", - ParameterValue::Tuple(vec![ - "public".into(), - "shard_0".into(), - "shard_1".into(), - "$user".into(), - ]), + tuple(&["public", "shard_0", "shard_1", "$user"]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -151,12 +200,7 @@ fn test_search_path_first_shard_wins_shard_0() { fn test_search_path_first_shard_wins_shard_1() { let mut test = QueryParserTest::new().with_param( "search_path", - ParameterValue::Tuple(vec![ - "$user".into(), - "pg_catalog".into(), - "shard_1".into(), - "shard_0".into(), - ]), + tuple(&["$user", "pg_catalog", "shard_1", "shard_0"]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -169,12 +213,12 @@ fn test_search_path_first_shard_wins_shard_1() { fn test_search_path_shard_at_end_still_matches() { let mut test = QueryParserTest::new().with_param( "search_path", - ParameterValue::Tuple(vec![ - "$user".into(), - "public".into(), - "pg_catalog".into(), - "information_schema".into(), - "shard_1".into(), + tuple(&[ + "$user", + "public", + "pg_catalog", + "information_schema", + "shard_1", ]), ); @@ -188,10 +232,8 @@ fn test_search_path_shard_at_end_still_matches() { #[test] fn test_search_path_no_sharded_schema_routes_to_rr() { - let mut test = QueryParserTest::new().with_param( - "search_path", - ParameterValue::Tuple(vec!["$user".into(), "public".into(), "pg_catalog".into()]), - ); + let mut test = + QueryParserTest::new().with_param("search_path", tuple(&["$user", "public", "pg_catalog"])); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -202,15 +244,72 @@ fn test_search_path_no_sharded_schema_routes_to_rr() { ); } +#[test] +fn test_no_search_path_ambiguous_insert_routes_to_all() { + let mut test = QueryParserTest::new(); + + let command = test.execute(vec![Query::new("INSERT INTO orders DEFAULT VALUES").into()]); + + assert_sharded_broadcast(&command); +} + +#[test] +fn test_sharded_update_without_shard_key_routes_to_all() { + let mut test = QueryParserTest::new().with_expanded_explain(); + + let command = test.execute(vec![ + Query::new("UPDATE sharded SET email = 'updated'").into() + ]); + + assert_sharded_broadcast(&command); +} + +#[test] +fn test_sharded_delete_without_shard_key_routes_to_all() { + let mut test = QueryParserTest::new().with_expanded_explain(); + + let command = test.execute(vec![Query::new("DELETE FROM sharded").into()]); + + assert_sharded_broadcast(&command); +} + +#[test] +fn test_public_schema_insert_routes_to_omni() { + let mut test = QueryParserTest::new(); + + let command = test.execute(vec![Query::new( + "INSERT INTO public.users (name) VALUES ('test')", + ) + .into()]); + + assert_omni_write(&command, ShardSource::Table(TableReason::Omni)); +} + +#[test] +fn test_public_schema_update_routes_to_omni() { + let mut test = QueryParserTest::new().with_expanded_explain(); + + let command = test.execute(vec![ + Query::new("UPDATE public.users SET name = 'updated'").into() + ]); + + assert_omni_write(&command, ShardSource::Table(TableReason::Omni)); +} + +#[test] +fn test_public_schema_delete_routes_to_omni() { + let mut test = QueryParserTest::new().with_expanded_explain(); + + let command = test.execute(vec![Query::new("DELETE FROM public.users").into()]); + + assert_omni_write(&command, ShardSource::RoundRobin(RoundRobinReason::Omni)); +} + #[test] fn test_search_path_only_system_schemas_routes_to_rr() { let mut test = QueryParserTest::new().with_param( "search_path", - ParameterValue::Tuple(vec![ - "pg_catalog".into(), - "information_schema".into(), - "pg_temp".into(), - ]), + tuple(&["pg_catalog", "information_schema", "pg_temp"]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); From 5f035d642c8e37863d23a4fb2fd4218fa638c11b Mon Sep 17 00:00:00 2001 From: Munawwar Date: Sat, 2 May 2026 19:12:07 +0400 Subject: [PATCH 4/4] Revert "more tests" This reverts commit 1b915e744efb9ee64d0948be5994ba5c1e7bed6e. --- .../parser/query/test/test_search_path.rs | 223 +++++------------- 1 file changed, 62 insertions(+), 161 deletions(-) diff --git a/pgdog/src/frontend/router/parser/query/test/test_search_path.rs b/pgdog/src/frontend/router/parser/query/test/test_search_path.rs index 36886e5b5..63f0b15de 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_search_path.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_search_path.rs @@ -1,44 +1,17 @@ -use crate::frontend::router::parser::route::{RoundRobinReason, TableReason}; +use crate::frontend::router::parser::route::RoundRobinReason; use crate::frontend::router::parser::{route::ShardSource, Shard}; -use crate::frontend::{router::parser::Route, Command}; use crate::net::parameter::ParameterValue; use super::setup::{QueryParserTest, *}; // --- search_path routing for DML --- -fn tuple(path: &[&str]) -> ParameterValue { - ParameterValue::Tuple(path.iter().map(|schema| (*schema).into()).collect()) -} - -fn assert_search_path_route(route: &Route, shard: usize, schema: &str) { - assert_eq!(route.shard(), &Shard::Direct(shard)); - assert!(route.is_search_path_driven()); - assert_eq!( - route.shard_with_priority().source(), - &ShardSource::SearchPath(schema.into()) - ); -} - -fn assert_sharded_broadcast(command: &Command) { - assert!(command.route().is_write()); - assert_eq!(command.route().shard(), &Shard::All); - assert_eq!( - command.route().shard_with_priority().source(), - &ShardSource::Table(TableReason::Sharded) - ); -} - -fn assert_omni_write(command: &Command, source: ShardSource) { - assert!(command.route().is_write()); - assert!(command.route().is_omni()); - assert_eq!(command.route().shard_with_priority().source(), &source); -} - #[test] fn test_search_path_shard_0_routes_select() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["$user", "shard_0", "public"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["$user".into(), "shard_0".into(), "public".into()]), + ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -46,21 +19,12 @@ fn test_search_path_shard_0_routes_select() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } -#[test] -fn test_search_path_shard_0_routes_select_without_shard_key() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["$user", "shard_0", "public"])); - - let command = test.execute(vec![Query::new("SELECT * FROM users").into()]); - - assert!(command.route().is_read()); - assert_search_path_route(command.route(), 0, "shard_0"); -} - #[test] fn test_search_path_shard_1_routes_select() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["public", "shard_1", "$user"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["public".into(), "shard_1".into(), "$user".into()]), + ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -70,8 +34,10 @@ fn test_search_path_shard_1_routes_select() { #[test] fn test_search_path_shard_0_routes_insert() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["pg_catalog", "shard_0"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["pg_catalog".into(), "shard_0".into()]), + ); let command = test.execute(vec![Query::new( "INSERT INTO users (id, name) VALUES (1, 'test')", @@ -82,23 +48,16 @@ fn test_search_path_shard_0_routes_insert() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } -#[test] -fn test_search_path_shard_0_routes_insert_without_shard_key() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["pg_catalog", "shard_0"])); - - let command = test.execute(vec![ - Query::new("INSERT INTO users (name) VALUES ('test')").into() - ]); - - assert_search_path_route(command.route(), 0, "shard_0"); -} - #[test] fn test_search_path_shard_1_routes_insert() { let mut test = QueryParserTest::new().with_param( "search_path", - tuple(&["$user", "public", "pg_temp", "shard_1"]), + ParameterValue::Tuple(vec![ + "$user".into(), + "public".into(), + "pg_temp".into(), + "shard_1".into(), + ]), ); let command = test.execute(vec![Query::new( @@ -112,8 +71,10 @@ fn test_search_path_shard_1_routes_insert() { #[test] fn test_search_path_shard_0_routes_update() { - let mut test = QueryParserTest::new() - .with_param("search_path", tuple(&["shard_0", "pg_catalog", "public"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["shard_0".into(), "pg_catalog".into(), "public".into()]), + ); let command = test.execute(vec![Query::new( "UPDATE users SET name = 'updated' WHERE id = 1", @@ -124,21 +85,12 @@ fn test_search_path_shard_0_routes_update() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } -#[test] -fn test_search_path_shard_0_routes_update_without_shard_key() { - let mut test = QueryParserTest::new() - .with_expanded_explain() - .with_param("search_path", tuple(&["shard_0", "pg_catalog", "public"])); - - let command = test.execute(vec![Query::new("UPDATE users SET name = 'updated'").into()]); - - assert_search_path_route(command.route(), 0, "shard_0"); -} - #[test] fn test_search_path_shard_1_routes_update() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["information_schema", "shard_1"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["information_schema".into(), "shard_1".into()]), + ); let command = test.execute(vec![Query::new( "UPDATE users SET name = 'updated' WHERE id = 1", @@ -151,8 +103,10 @@ fn test_search_path_shard_1_routes_update() { #[test] fn test_search_path_shard_0_routes_delete() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["public", "$user", "shard_0"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["public".into(), "$user".into(), "shard_0".into()]), + ); let command = test.execute(vec![Query::new("DELETE FROM users WHERE id = 1").into()]); @@ -160,20 +114,12 @@ fn test_search_path_shard_0_routes_delete() { assert_eq!(command.route().shard(), &Shard::Direct(0)); } -#[test] -fn test_search_path_shard_0_routes_delete_without_shard_key() { - let mut test = QueryParserTest::new() - .with_expanded_explain() - .with_param("search_path", tuple(&["public", "$user", "shard_0"])); - - let command = test.execute(vec![Query::new("DELETE FROM users").into()]); - - assert_search_path_route(command.route(), 0, "shard_0"); -} - #[test] fn test_search_path_shard_1_routes_delete() { - let mut test = QueryParserTest::new().with_param("search_path", tuple(&["shard_1", "public"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["shard_1".into(), "public".into()]), + ); let command = test.execute(vec![Query::new("DELETE FROM users WHERE id = 1").into()]); @@ -187,7 +133,12 @@ fn test_search_path_shard_1_routes_delete() { fn test_search_path_first_shard_wins_shard_0() { let mut test = QueryParserTest::new().with_param( "search_path", - tuple(&["public", "shard_0", "shard_1", "$user"]), + ParameterValue::Tuple(vec![ + "public".into(), + "shard_0".into(), + "shard_1".into(), + "$user".into(), + ]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -200,7 +151,12 @@ fn test_search_path_first_shard_wins_shard_0() { fn test_search_path_first_shard_wins_shard_1() { let mut test = QueryParserTest::new().with_param( "search_path", - tuple(&["$user", "pg_catalog", "shard_1", "shard_0"]), + ParameterValue::Tuple(vec![ + "$user".into(), + "pg_catalog".into(), + "shard_1".into(), + "shard_0".into(), + ]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -213,12 +169,12 @@ fn test_search_path_first_shard_wins_shard_1() { fn test_search_path_shard_at_end_still_matches() { let mut test = QueryParserTest::new().with_param( "search_path", - tuple(&[ - "$user", - "public", - "pg_catalog", - "information_schema", - "shard_1", + ParameterValue::Tuple(vec![ + "$user".into(), + "public".into(), + "pg_catalog".into(), + "information_schema".into(), + "shard_1".into(), ]), ); @@ -232,8 +188,10 @@ fn test_search_path_shard_at_end_still_matches() { #[test] fn test_search_path_no_sharded_schema_routes_to_rr() { - let mut test = - QueryParserTest::new().with_param("search_path", tuple(&["$user", "public", "pg_catalog"])); + let mut test = QueryParserTest::new().with_param( + "search_path", + ParameterValue::Tuple(vec!["$user".into(), "public".into(), "pg_catalog".into()]), + ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]); @@ -244,72 +202,15 @@ fn test_search_path_no_sharded_schema_routes_to_rr() { ); } -#[test] -fn test_no_search_path_ambiguous_insert_routes_to_all() { - let mut test = QueryParserTest::new(); - - let command = test.execute(vec![Query::new("INSERT INTO orders DEFAULT VALUES").into()]); - - assert_sharded_broadcast(&command); -} - -#[test] -fn test_sharded_update_without_shard_key_routes_to_all() { - let mut test = QueryParserTest::new().with_expanded_explain(); - - let command = test.execute(vec![ - Query::new("UPDATE sharded SET email = 'updated'").into() - ]); - - assert_sharded_broadcast(&command); -} - -#[test] -fn test_sharded_delete_without_shard_key_routes_to_all() { - let mut test = QueryParserTest::new().with_expanded_explain(); - - let command = test.execute(vec![Query::new("DELETE FROM sharded").into()]); - - assert_sharded_broadcast(&command); -} - -#[test] -fn test_public_schema_insert_routes_to_omni() { - let mut test = QueryParserTest::new(); - - let command = test.execute(vec![Query::new( - "INSERT INTO public.users (name) VALUES ('test')", - ) - .into()]); - - assert_omni_write(&command, ShardSource::Table(TableReason::Omni)); -} - -#[test] -fn test_public_schema_update_routes_to_omni() { - let mut test = QueryParserTest::new().with_expanded_explain(); - - let command = test.execute(vec![ - Query::new("UPDATE public.users SET name = 'updated'").into() - ]); - - assert_omni_write(&command, ShardSource::Table(TableReason::Omni)); -} - -#[test] -fn test_public_schema_delete_routes_to_omni() { - let mut test = QueryParserTest::new().with_expanded_explain(); - - let command = test.execute(vec![Query::new("DELETE FROM public.users").into()]); - - assert_omni_write(&command, ShardSource::RoundRobin(RoundRobinReason::Omni)); -} - #[test] fn test_search_path_only_system_schemas_routes_to_rr() { let mut test = QueryParserTest::new().with_param( "search_path", - tuple(&["pg_catalog", "information_schema", "pg_temp"]), + ParameterValue::Tuple(vec![ + "pg_catalog".into(), + "information_schema".into(), + "pg_temp".into(), + ]), ); let command = test.execute(vec![Query::new("SELECT * FROM users WHERE id = 1").into()]);