diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 95d9a41b5..358fcf72e 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -97,6 +97,7 @@ "resharding_parallel_copies": 1, "rollback_timeout": 5000, "server_lifetime": 86400000, + "server_lifetime_jitter": 0, "shutdown_termination_timeout": null, "shutdown_timeout": 60000, "stats_period": 15000, @@ -476,6 +477,15 @@ "format": "uint64", "minimum": 0 }, + "server_lifetime_jitter": { + "description": "Overrides the `server_lifetime_jitter` setting for this database.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#server_lifetime_jitter", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0 + }, "shard": { "description": "The shard number for this database. Only required if your database contains more than one shard. Shard numbers start at 0.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#shard", "type": "integer", @@ -1016,6 +1026,13 @@ "default": 86400000, "minimum": 0 }, + "server_lifetime_jitter": { + "description": "Maximum random adjustment applied to `server_lifetime` per backend\nconnection, in milliseconds. Each connection's effective lifetime\nis sampled uniformly from `[server_lifetime - jitter,\nserver_lifetime + jitter]` once at creation time, breaking up\nsynchronized cohorts that would otherwise expire together.\n\n_Default:_ `0` (no jitter; existing behavior).\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#server_lifetime_jitter", + "type": "integer", + "format": "uint64", + "default": 0, + "minimum": 0 + }, "shutdown_termination_timeout": { "description": "How long to wait for active connections to be forcibly terminated after `shutdown_timeout` expires.\n\n**Note:** If set, PgDog will send `CANCEL` requests to PostgreSQL for any remaining active queries before tearing down connection pools.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#shutdown_termination_timeout", "type": [ diff --git a/.schema/users.schema.json b/.schema/users.schema.json index a7e897f6d..b5dbb7104 100644 --- a/.schema/users.schema.json +++ b/.schema/users.schema.json @@ -226,6 +226,15 @@ "format": "uint64", "minimum": 0 }, + "server_lifetime_jitter": { + "description": "Maximum random adjustment applied to `server_lifetime` per backend connection (milliseconds).\nOverrides the database-level and general-level `server_lifetime_jitter` setting for this user.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0 + }, "server_password": { "description": "Which password to connect with when creating backend connections from PgDog to PostgreSQL. By default, the password configured in `password` is used. This setting allows you to override this configuration and use a different password, decoupling server passwords from user passwords given to clients.\n\n**Note:** Values specified in `pgdog.toml` take priority over this configuration.\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#server_password", "type": [ diff --git a/pgdog-config/src/database.rs b/pgdog-config/src/database.rs index a02a75d42..fe8435e28 100644 --- a/pgdog-config/src/database.rs +++ b/pgdog-config/src/database.rs @@ -185,6 +185,10 @@ pub struct Database { /// /// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#server_lifetime pub server_lifetime: Option, + /// Overrides the `server_lifetime_jitter` setting for this database. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#server_lifetime_jitter + pub server_lifetime_jitter: Option, /// Used for resharding only; this database will not serve regular traffic. #[serde(default)] pub resharding_only: bool, diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index 8b2236e48..c3a929a0d 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -406,6 +406,18 @@ pub struct General { #[serde(default = "General::server_lifetime")] pub server_lifetime: u64, + /// Maximum random adjustment applied to `server_lifetime` per backend + /// connection, in milliseconds. Each connection's effective lifetime + /// is sampled uniformly from `[server_lifetime - jitter, + /// server_lifetime + jitter]` once at creation time, breaking up + /// synchronized cohorts that would otherwise expire together. + /// + /// _Default:_ `0` (no jitter; existing behavior). + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#server_lifetime_jitter + #[serde(default = "General::server_lifetime_jitter")] + pub server_lifetime_jitter: u64, + /// How many transactions can wait while the mirror database processes previous requests. /// /// _Default:_ `128` @@ -787,6 +799,7 @@ impl Default for General { Self::two_phase_commit_wal_checkpoint_interval(), expanded_explain: Self::expanded_explain(), server_lifetime: Self::server_lifetime(), + server_lifetime_jitter: Self::server_lifetime_jitter(), stats_period: Self::stats_period(), connection_recovery: Self::connection_recovery(), client_connection_recovery: Self::client_connection_recovery(), @@ -1209,6 +1222,10 @@ impl General { ) } + pub fn server_lifetime_jitter() -> u64 { + Self::env_or_default("PGDOG_SERVER_LIFETIME_JITTER", 0) + } + pub fn connection_recovery() -> ConnectionRecovery { Self::env_enum_or_default("PGDOG_CONNECTION_RECOVERY") } diff --git a/pgdog-config/src/url.rs b/pgdog-config/src/url.rs index 6d746fccb..1a2ab8b4d 100644 --- a/pgdog-config/src/url.rs +++ b/pgdog-config/src/url.rs @@ -78,6 +78,11 @@ impl From<&Url> for Database { database.server_lifetime = Some(lifetime); } } + "server_lifetime_jitter" => { + if let Ok(jitter) = val.parse::() { + database.server_lifetime_jitter = Some(jitter); + } + } _ => {} } } @@ -183,7 +188,7 @@ mod test { #[test] fn test_numeric_fields_from_query_params() { - let url = Url::parse("postgres://user:password@host:5432/name?pool_size=10&min_pool_size=2&statement_timeout=5000&idle_timeout=300&server_lifetime=3600").unwrap(); + let url = Url::parse("postgres://user:password@host:5432/name?pool_size=10&min_pool_size=2&statement_timeout=5000&idle_timeout=300&server_lifetime=3600&server_lifetime_jitter=600").unwrap(); let database = Database::from(&url); assert_eq!(database.pool_size, Some(10)); @@ -191,6 +196,7 @@ mod test { assert_eq!(database.statement_timeout, Some(5000)); assert_eq!(database.idle_timeout, Some(300)); assert_eq!(database.server_lifetime, Some(3600)); + assert_eq!(database.server_lifetime_jitter, Some(600)); } #[test] diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index f855d5535..750e149ee 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -284,6 +284,9 @@ pub struct User { pub two_phase_commit_auto: Option, /// Server connections older than this (in milliseconds) will be closed when returned to the pool. pub server_lifetime: Option, + /// Maximum random adjustment applied to `server_lifetime` per backend connection (milliseconds). + /// Overrides the database-level and general-level `server_lifetime_jitter` setting for this user. + pub server_lifetime_jitter: Option, } impl User { diff --git a/pgdog-stats/src/pool.rs b/pgdog-stats/src/pool.rs index 0271b9e90..28b4c8be5 100644 --- a/pgdog-stats/src/pool.rs +++ b/pgdog-stats/src/pool.rs @@ -288,6 +288,11 @@ pub struct Config { pub connect_attempt_delay: Duration, /// How long a connection can be open. pub max_age: Duration, + /// Maximum random adjustment applied to `max_age` per connection. + /// Each connection samples a per-connection offset uniformly from + /// `[-max_age_jitter, +max_age_jitter]` once at creation, breaking + /// up synchronized retirement of cohorts that connect together. + pub max_age_jitter: Duration, /// Can this pool be banned from serving traffic? pub bannable: bool, /// Healtheck timeout. @@ -349,6 +354,7 @@ impl Default for Config { connect_attempts: 1, connect_attempt_delay: Duration::from_millis(10), max_age: Duration::from_millis(24 * 3600 * 1000), + max_age_jitter: Duration::ZERO, bannable: true, healthcheck_timeout: Duration::from_millis(5_000), healthcheck_interval: Duration::from_millis(30_000), diff --git a/pgdog/src/backend/pool/config.rs b/pgdog/src/backend/pool/config.rs index ac198f2bf..fdcfa3110 100644 --- a/pgdog/src/backend/pool/config.rs +++ b/pgdog/src/backend/pool/config.rs @@ -109,6 +109,13 @@ impl Config { user.server_lifetime .unwrap_or(database.server_lifetime.unwrap_or(general.server_lifetime)), ), + max_age_jitter: Duration::from_millis( + user.server_lifetime_jitter.unwrap_or( + database + .server_lifetime_jitter + .unwrap_or(general.server_lifetime_jitter), + ), + ), healthcheck_interval: Duration::from_millis(general.healthcheck_interval), idle_healthcheck_interval: Duration::from_millis(general.idle_healthcheck_interval), idle_healthcheck_delay: Duration::from_millis(general.idle_healthcheck_delay), @@ -185,6 +192,7 @@ mod test { pool_size: Some(5), min_pool_size: Some(5), server_lifetime: Some(5), + server_lifetime_jitter: Some(1), statement_timeout: Some(5), pooler_mode: Some(PoolerMode::Session), idle_timeout: Some(5), @@ -196,6 +204,7 @@ mod test { pool_size: Some(10), min_pool_size: Some(10), server_lifetime: Some(10), + server_lifetime_jitter: Some(2), statement_timeout: Some(10), pooler_mode: Some(PoolerMode::Transaction), idle_timeout: Some(10), @@ -208,12 +217,48 @@ mod test { assert_eq!(5, config.max); assert_eq!(5, config.min); assert_eq!(Duration::from_millis(5), config.max_age); + assert_eq!(Duration::from_millis(1), config.max_age_jitter); assert_eq!(Some(Duration::from_millis(5)), config.statement_timeout); assert_eq!(PoolerMode::Session, config.pooler_mode); assert_eq!(Duration::from_millis(5), config.idle_timeout); assert!(config.read_only); } + #[test] + fn test_jitter_falls_through_general_to_database_to_user() { + let general = General { + server_lifetime_jitter: 100, + ..General::default() + }; + + // Only general set: pool inherits the general value. + let cfg = Config::new(&general, &Database::default(), &User::default(), false); + assert_eq!(Duration::from_millis(100), cfg.max_age_jitter); + + // Database overrides general. + let database = Database { + server_lifetime_jitter: Some(200), + ..Default::default() + }; + let cfg = Config::new(&general, &database, &User::default(), false); + assert_eq!(Duration::from_millis(200), cfg.max_age_jitter); + + // User overrides both. + let user = User { + server_lifetime_jitter: Some(300), + ..Default::default() + }; + let cfg = Config::new(&general, &database, &user, false); + assert_eq!(Duration::from_millis(300), cfg.max_age_jitter); + } + + #[test] + fn test_jitter_default_is_zero() { + let general = General::default(); + let cfg = Config::new(&general, &Database::default(), &User::default(), false); + assert_eq!(Duration::ZERO, cfg.max_age_jitter); + } + #[test] fn test_role_primary_disables_role_detection() { let general = General::default(); diff --git a/pgdog/src/backend/pool/inner.rs b/pgdog/src/backend/pool/inner.rs index d3ec588c0..7e822f530 100644 --- a/pgdog/src/backend/pool/inner.rs +++ b/pgdog/src/backend/pool/inner.rs @@ -182,12 +182,12 @@ impl Inner { /// Close connections that have exceeded the max age. #[inline] pub(crate) fn close_old(&mut self, now: Instant) -> usize { - let max_age = self.config.max_age; + let base_max_age = self.config.max_age; let mut removed = 0; self.idle_connections.retain_mut(|c| { let age = c.age(now); - let keep = age < max_age; + let keep = age < c.effective_max_age(base_max_age); if !keep { removed += 1; } @@ -354,7 +354,7 @@ impl Inner { } // Close connections exceeding max age. - if server.age(now) >= self.config.max_age { + if server.age(now) >= server.effective_max_age(self.config.max_age) { server.disconnect_reason(DisconnectReason::Old); return Ok(result); } diff --git a/pgdog/src/backend/pool/monitor.rs b/pgdog/src/backend/pool/monitor.rs index 13fb807f4..284c22dfb 100644 --- a/pgdog/src/backend/pool/monitor.rs +++ b/pgdog/src/backend/pool/monitor.rs @@ -337,6 +337,9 @@ impl Monitor { let mut error = Error::ServerError; let now = Instant::now(); + let max_age = pool.config().max_age; + let max_age_jitter = pool.config().max_age_jitter; + for attempt in 0..connect_attempts { match timeout( connect_timeout, @@ -344,7 +347,7 @@ impl Monitor { ) .await { - Ok(Ok(conn)) => { + Ok(Ok(mut conn)) => { let elapsed = now.elapsed(); { let mut guard = pool.lock(); @@ -352,6 +355,7 @@ impl Monitor { guard.stats.counts.connect_time += elapsed; guard.stats.counts.auth_attempts += conn.password_attempts(); } + conn.apply_lifetime_jitter(max_age, max_age_jitter); return Ok(conn); } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 396c3da3c..02372cd18 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -65,6 +65,11 @@ pub struct Server { stream_buffer: MessageBuffer, disconnect_reason: Option, password_attempts: usize, + /// Per-connection lifetime cap. When `Some`, the connection + /// retires once its age reaches the stored value. `None` means + /// "use the pool's configured `max_age`" (no jitter applied). + /// Sampled once at creation by [`Server::apply_lifetime_jitter`]. + max_age: Option, } impl MemoryUsage for Server { @@ -342,6 +347,7 @@ impl Server { stream_buffer: MessageBuffer::new(config.config.memory.message_buffer), disconnect_reason: None, password_attempts: 1, // This is going to be changed by parent caller. + max_age: None, }; server.stats.memory_used(server.memory_stats()); // Stream capacity. @@ -1006,6 +1012,43 @@ impl Server { instant.duration_since(self.stats().created_at()) } + /// Set this connection's lifetime cap directly. Bypasses jitter + /// sampling; mainly useful in tests. + #[inline] + pub fn set_max_age(&mut self, max_age: Duration) { + self.max_age = Some(max_age); + } + + /// Sample and apply a per-connection `max_age` uniformly from + /// `[base - jitter, base + jitter]`, breaking up synchronized + /// retirement of connection cohorts. Saturates at zero on + /// negative overflow. No-op when `jitter` is zero. Called once + /// by the pool after a successful connect. + #[inline] + pub fn apply_lifetime_jitter(&mut self, base: Duration, jitter: Duration) { + if jitter.is_zero() { + return; + } + use rand::Rng; + // Sampling is signed, so drop into ms locally; result goes back + // out as a Duration immediately, no leak across the API. + let jitter_ms = jitter.as_millis() as i64; + let offset_ms = rand::rng().random_range(-jitter_ms..=jitter_ms); + let offset = Duration::from_millis(offset_ms.unsigned_abs()); + self.max_age = Some(if offset_ms >= 0 { + base.saturating_add(offset) + } else { + base.saturating_sub(offset) + }); + } + + /// Effective max_age for this connection: the per-connection + /// jittered value if one was sampled, otherwise `base`. + #[inline] + pub fn effective_max_age(&self, base: Duration) -> Duration { + self.max_age.unwrap_or(base) + } + /// How long this connection has been idle. #[inline] pub fn idle_for(&self, instant: Instant) -> Duration { @@ -1188,6 +1231,7 @@ pub mod test { statement_executed: false, sending_request: false, password_attempts: 1, + max_age: None, } } } @@ -3950,4 +3994,78 @@ pub mod test { "Postgres should have no prepared statements despite many named parses" ); } + + #[test] + fn test_effective_max_age_default_is_base() { + let server = Server::default(); + let base = Duration::from_secs(60); + assert_eq!(base, server.effective_max_age(base)); + } + + #[test] + fn test_effective_max_age_uses_stored_value() { + let mut server = Server::default(); + server.set_max_age(Duration::from_millis(1500)); + let base = Duration::from_secs(60); + // Stored value wins regardless of the supplied base. + assert_eq!(Duration::from_millis(1500), server.effective_max_age(base)); + } + + #[test] + fn test_apply_lifetime_jitter_zero_is_noop() { + let mut server = Server::default(); + server.apply_lifetime_jitter(Duration::from_secs(60), Duration::ZERO); + assert_eq!(None, server.max_age); + } + + #[test] + fn test_apply_lifetime_jitter_stays_within_bounds() { + let base = Duration::from_secs(60); + let jitter = Duration::from_secs(10); + let lower = base - jitter; + let upper = base + jitter; + let mut saw_below_base = false; + let mut saw_above_base = false; + // Many trials so we exercise both signs and the full range. + for _ in 0..1_000 { + let mut server = Server::default(); + server.apply_lifetime_jitter(base, jitter); + let sampled = server + .max_age + .expect("apply_lifetime_jitter with non-zero jitter sets max_age"); + assert!( + sampled >= lower && sampled <= upper, + "sampled {sampled:?} outside [{lower:?}, {upper:?}]", + ); + if sampled < base { + saw_below_base = true; + } + if sampled > base { + saw_above_base = true; + } + } + // 1000 uniform samples should cover both sides. + assert!(saw_below_base, "never sampled below base"); + assert!(saw_above_base, "never sampled above base"); + } + + #[test] + fn test_apply_lifetime_jitter_saturates_at_zero() { + // base < jitter forces the lower bound to clamp at zero + // rather than wrap. Run enough trials that we very likely + // hit the negative half of the range at least once. + let base = Duration::from_millis(10); + let jitter = Duration::from_millis(100); + for _ in 0..1_000 { + let mut server = Server::default(); + server.apply_lifetime_jitter(base, jitter); + let sampled = server.max_age.expect("max_age set"); + assert!( + sampled <= base + jitter, + "sampled {sampled:?} above upper bound", + ); + // Must never go below zero (Duration is unsigned anyway, + // but saturation must not panic). + } + } }