From 4c76f84d921a9189117dba9c6f00d04b4b004785 Mon Sep 17 00:00:00 2001 From: Giuseppe D'Anna Date: Wed, 10 Jun 2026 15:25:37 +0200 Subject: [PATCH] wip --- .schema/pgdog.schema.json | 87 ++++++ .schema/users.schema.json | 22 ++ README.md | 38 ++- example.pgdog.toml | 17 ++ example.users.toml | 8 + pgdog-config/src/core.rs | 8 +- pgdog-config/src/lib.rs | 2 + pgdog-config/src/users.rs | 77 +++++- pgdog-config/src/vault.rs | 130 +++++++++ pgdog/Cargo.toml | 2 +- .../backend/auth/azure_workload_identity.rs | 2 + pgdog/src/backend/auth/mod.rs | 1 + pgdog/src/backend/auth/rds_iam.rs | 2 + pgdog/src/backend/auth/vault.rs | 255 ++++++++++++++++++ pgdog/src/backend/error.rs | 3 + pgdog/src/backend/pool/address.rs | 107 +++++++- pgdog/src/backend/pool/monitor.rs | 22 +- pgdog/src/backend/pool/password.rs | 2 + pgdog/src/backend/pool/token_cache.rs | 214 ++++++++++++++- pgdog/src/backend/server.rs | 17 +- 20 files changed, 989 insertions(+), 27 deletions(-) create mode 100644 pgdog-config/src/vault.rs create mode 100644 pgdog/src/backend/auth/vault.rs diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 2a715e8d3..99f26a3e5 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -251,6 +251,17 @@ "time": null, "user_timeout": null } + }, + "vault": { + "description": "HashiCorp Vault settings, required for users configured with `server_auth = \"vault\"`.", + "anyOf": [ + { + "$ref": "#/$defs/Vault" + }, + { + "type": "null" + } + ] } }, "additionalProperties": false, @@ -2069,6 +2080,82 @@ } ] }, + "Vault": { + "description": "HashiCorp Vault settings, used by pools configured with `server_auth = \"vault\"`.\n\nPgDog logs into Vault using the configured auth method and fetches\ndynamic database credentials from the per-user `vault_path`.", + "type": "object", + "properties": { + "approle_role_id": { + "description": "AppRole auth: role ID.", + "type": [ + "string", + "null" + ] + }, + "approle_secret_id_file": { + "description": "AppRole auth: path to a file containing the secret ID.\n\n**Note:** If not set, the secret ID is read from the `VAULT_SECRET_ID`\nenvironment variable.", + "type": [ + "string", + "null" + ] + }, + "auth_method": { + "description": "Auth method used to log into Vault.", + "$ref": "#/$defs/VaultAuthMethod" + }, + "auth_mount": { + "description": "Mount path of the auth method.\n\n_Default:_ `kubernetes` for Kubernetes auth, `approle` for AppRole auth.", + "type": [ + "string", + "null" + ] + }, + "kubernetes_jwt_path": { + "description": "Kubernetes auth: path to the service account JWT.\n\n_Default:_ `/var/run/secrets/kubernetes.io/serviceaccount/token`", + "type": [ + "string", + "null" + ] + }, + "kubernetes_role": { + "description": "Kubernetes auth: name of the Vault role to log in as.", + "type": [ + "string", + "null" + ] + }, + "namespace": { + "description": "Vault namespace (Vault Enterprise). Sent as the `X-Vault-Namespace` header.", + "type": [ + "string", + "null" + ] + }, + "url": { + "description": "Vault server URL, e.g. `https://vault.internal:8200`.", + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "url", + "auth_method" + ] + }, + "VaultAuthMethod": { + "description": "How PgDog authenticates to Vault.", + "oneOf": [ + { + "description": "Kubernetes auth: log in with the pod's service account JWT.", + "type": "string", + "const": "kubernetes" + }, + { + "description": "AppRole auth: log in with a role ID and secret ID.", + "type": "string", + "const": "approle" + } + ] + }, "Vector": { "type": "object", "properties": { diff --git a/.schema/users.schema.json b/.schema/users.schema.json index 4f64800ee..48c4b4b57 100644 --- a/.schema/users.schema.json +++ b/.schema/users.schema.json @@ -85,6 +85,11 @@ "description": "Generate an Azure Workload Identity auth token per connection attempt.", "type": "string", "const": "azure_workload_identity" + }, + { + "description": "Fetch dynamic credentials from HashiCorp Vault.", + "type": "string", + "const": "vault" } ] }, @@ -287,6 +292,23 @@ "boolean", "null" ] + }, + "vault_path": { + "description": "Vault path to fetch dynamic database credentials from, e.g. `database/creds/my-role`.\nRequired when `server_auth` is set to `vault`.", + "type": [ + "string", + "null" + ] + }, + "vault_refresh_percent": { + "description": "Percentage of the Vault credential lease after which credentials are refreshed.\n\n_Default:_ `80`", + "type": [ + "integer", + "null" + ], + "format": "uint8", + "maximum": 255, + "minimum": 0 } }, "additionalProperties": false, diff --git a/README.md b/README.md index dd8ab8ac0..ad535ba62 100644 --- a/README.md +++ b/README.md @@ -198,11 +198,12 @@ role = "auto" 📘 **[Authentication](https://docs.pgdog.dev/features/authentication/)** -PgDog supports three authentication methods: +PgDog supports four authentication methods: 1. Password-based 2. AWS RDS IAM 3. Azure Workload Identity +4. HashiCorp Vault dynamic credentials #### Password-based authentication @@ -252,6 +253,41 @@ When any user has `server_auth = "azure_workload_identity"`, the following setti - `tls_verify` must **not** be `"disabled"`. - `passthrough_auth` must be `"disabled"`. +#### HashiCorp Vault authentication + +PgDog can fetch dynamic database credentials (username and password) from HashiCorp Vault's database secrets engine, while keeping client-to-PgDog authentication unchanged. Credentials are cached and rotated automatically after a configured percentage of the Vault lease has elapsed. + +**Example** + +In `users.toml`: + +```toml +[[users]] +name = "alice" +database = "pgdog" +password = "client-password" +server_auth = "vault" +vault_path = "database/creds/pgdog" +# Refresh credentials after 80% of the lease has elapsed (default). +# vault_refresh_percent = 80 +``` + +In `pgdog.toml`: + +```toml +[vault] +url = "https://vault.internal:8200" +auth_method = "kubernetes" # or "approle" +kubernetes_role = "pgdog" +``` + +PgDog logs into Vault with Kubernetes auth (using the pod's service account JWT) or AppRole (`approle_role_id` plus `approle_secret_id_file` or the `VAULT_SECRET_ID` environment variable). + +When any user has `server_auth = "vault"`, the following settings must be configured as well: + +- `tls_verify` must **not** be `"disabled"`. +- `passthrough_auth` must be `"disabled"`. + ### Sharding 📘 **[Sharding](https://docs.pgdog.dev/features/sharding/)** diff --git a/example.pgdog.toml b/example.pgdog.toml index f9ddd22af..5ebbcd6f4 100644 --- a/example.pgdog.toml +++ b/example.pgdog.toml @@ -474,3 +474,20 @@ fingerprint = "2d9944fc9caeaadd" # [3285733254894627549] # destination_db = "pgdog_mirror" # queue_length = 256 # Optional: overrides general.mirror_queue # exposure = 0.5 # Optional: overrides general.mirror_exposure + +# HashiCorp Vault settings, required when any user in users.toml +# sets `server_auth = "vault"`. +# +# [vault] +# url = "https://vault.internal:8200" +# Auth method used to log into Vault: "kubernetes" or "approle". +# auth_method = "kubernetes" +# auth_mount = "kubernetes" # optional; defaults to the auth method name +# kubernetes_role = "pgdog" +# kubernetes_jwt_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" # optional +# For AppRole auth: +# auth_method = "approle" +# approle_role_id = "..." +# approle_secret_id_file = "/etc/pgdog/vault-secret-id" # or set VAULT_SECRET_ID env var +# Vault namespace (Vault Enterprise), optional. +# namespace = "my-namespace" diff --git a/example.users.toml b/example.users.toml index 8bb78cfa5..6fb74814f 100644 --- a/example.users.toml +++ b/example.users.toml @@ -22,3 +22,11 @@ password = "pgdog" # PgDog still authenticates the client as configured by `general.auth_type`; # this only affects how PgDog authenticates to PostgreSQL servers. # server_auth = "azure_workload_identiy" + +# Example: backend authentication with HashiCorp Vault dynamic credentials. +# Requires the [vault] section in pgdog.toml. PgDog fetches a generated +# username and password from `vault_path` and rotates them after +# `vault_refresh_percent` of the lease has elapsed. +# server_auth = "vault" +# vault_path = "database/creds/pgdog" +# vault_refresh_percent = 80 # optional; default 80 diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 536ee522f..a7e6985cd 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -23,6 +23,7 @@ use super::replication::{MirrorConfig, Mirroring, MirroringLevel, ReplicaLag, Re use super::rewrite::Rewrite; use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable}; use super::users::{Admin, Plugin, Users}; +use super::vault::Vault; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct ConfigAndUsers { @@ -124,13 +125,13 @@ impl ConfigAndUsers { if self.config.general.passthrough_auth != PassthroughAuth::Disabled { return Err(Error::ParseError( - "\"passthrough_auth\" must be \"disabled\" when any user has \"server_auth = \\\"rds_iam\\\"\" or \"server_auth = \\\"azure_workload_identity\\\"\"".into(), + "\"passthrough_auth\" must be \"disabled\" when any user has \"server_auth = \\\"rds_iam\\\"\", \"server_auth = \\\"azure_workload_identity\\\"\" or \"server_auth = \\\"vault\\\"\"".into(), )); } if self.config.general.tls_verify == TlsVerifyMode::Disabled { return Err(Error::ParseError( - "\"tls_verify\" cannot be \"disabled\" when any user has \"server_auth = \\\"rds_iam\\\"\" or \"server_auth = \\\"azure_workload_identity\\\"\"".into(), + "\"tls_verify\" cannot be \"disabled\" when any user has \"server_auth = \\\"rds_iam\\\"\", \"server_auth = \\\"azure_workload_identity\\\"\" or \"server_auth = \\\"vault\\\"\"".into(), )); } @@ -275,6 +276,9 @@ pub struct Config { /// https://docs.pgdog.dev/configuration/pgdog.toml/otel/ #[serde(default)] pub otel: Otel, + + /// HashiCorp Vault settings, required for users configured with `server_auth = "vault"`. + pub vault: Option, } impl Config { diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 401978868..ebfd83988 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -20,6 +20,7 @@ pub(crate) mod test_utils; pub mod url; pub mod users; pub mod util; +pub mod vault; pub use auth::{AuthType, PassthroughAuth}; pub use core::{Config, ConfigAndUsers}; @@ -39,6 +40,7 @@ pub use rewrite::{Rewrite, RewriteMode}; pub use sharding::*; pub use system_catalogs::system_catalogs; pub use users::{Admin, Plugin, ServerAuth, User, Users}; +pub use vault::{Vault, VaultAuthMethod}; use std::time::Duration; diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index 7c8a87b27..6d32af4cb 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -88,6 +88,32 @@ impl Users { ); } + if user.server_auth == ServerAuth::Vault { + if user.vault_path.is_none() { + warn!( + r#"user "{}" (database "{}") uses Vault server auth but "vault_path" is not set"#, + user.name, user.database + ); + } + + if config.vault.is_none() { + warn!( + r#"user "{}" (database "{}") uses Vault server auth but the [vault] section is missing from pgdog.toml"#, + user.name, user.database + ); + } + + if let Some(percent) = user.vault_refresh_percent + && (percent == 0 || percent > 100) + { + warn!( + r#"user "{}" (database "{}") has "vault_refresh_percent" of {}, expected 1-100, using default"#, + user.name, user.database, percent + ); + user.vault_refresh_percent = None; + } + } + if !user.database.is_empty() && !user.databases.is_empty() { warn!( r#"user "{}" is configured for both "{}" and "{:?}", defaulting to "{}""#, @@ -153,11 +179,16 @@ pub enum ServerAuth { RdsIam, /// Generate an Azure Workload Identity auth token per connection attempt. AzureWorkloadIdentity, + /// Fetch dynamic credentials from HashiCorp Vault. + Vault, } impl ServerAuth { pub fn is_external_identity(&self) -> bool { - matches!(self, Self::RdsIam | Self::AzureWorkloadIdentity) + matches!( + self, + Self::RdsIam | Self::AzureWorkloadIdentity | Self::Vault + ) } } @@ -251,6 +282,13 @@ pub struct User { pub server_auth: ServerAuth, /// Optional region override for RDS IAM token generation. pub server_iam_region: Option, + /// Vault path to fetch dynamic database credentials from, e.g. `database/creds/my-role`. + /// Required when `server_auth` is set to `vault`. + pub vault_path: Option, + /// Percentage of the Vault credential lease after which credentials are refreshed. + /// + /// _Default:_ `80` + pub vault_refresh_percent: Option, /// Statement timeout. /// /// Sets the `statement_timeout` on all server connections at connection creation. This allows you to set a reasonable default for each user without modifying `postgresql.conf` or using `ALTER USER`. @@ -631,4 +669,41 @@ server_auth = "azure_workload_identity" let user = users.users.first().unwrap(); assert_eq!(user.server_auth, ServerAuth::AzureWorkloadIdentity); } + + #[test] + fn test_user_server_auth_vault() { + let source = r#" +[[users]] +name = "alice" +database = "db" +server_auth = "vault" +vault_path = "database/creds/pgdog" +vault_refresh_percent = 75 +"#; + + let users: Users = toml::from_str(source).unwrap(); + let user = users.users.first().unwrap(); + assert_eq!(user.server_auth, ServerAuth::Vault); + assert!(user.server_auth.is_external_identity()); + assert_eq!(user.vault_path.as_deref(), Some("database/creds/pgdog")); + assert_eq!(user.vault_refresh_percent, Some(75)); + } + + #[test] + fn test_vault_refresh_percent_out_of_range_resets_to_default() { + let mut users = Users { + users: vec![User { + name: "alice".into(), + database: "db".into(), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/pgdog".into()), + vault_refresh_percent: Some(150), + ..Default::default() + }], + ..Default::default() + }; + + users.check(&crate::Config::default()); + assert!(users.users.first().unwrap().vault_refresh_percent.is_none()); + } } diff --git a/pgdog-config/src/vault.rs b/pgdog-config/src/vault.rs new file mode 100644 index 000000000..94306c75a --- /dev/null +++ b/pgdog-config/src/vault.rs @@ -0,0 +1,130 @@ +//! HashiCorp Vault settings. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Default Kubernetes auth mount path. +pub const DEFAULT_KUBERNETES_MOUNT: &str = "kubernetes"; +/// Default AppRole auth mount path. +pub const DEFAULT_APPROLE_MOUNT: &str = "approle"; +/// Default Kubernetes service account JWT path. +pub const DEFAULT_KUBERNETES_JWT_PATH: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token"; +/// Default percentage of a credential lease after which it's refreshed. +pub const DEFAULT_REFRESH_PERCENT: u8 = 80; + +/// How PgDog authenticates to Vault. +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum VaultAuthMethod { + /// Kubernetes auth: log in with the pod's service account JWT. + Kubernetes, + /// AppRole auth: log in with a role ID and secret ID. + Approle, +} + +/// HashiCorp Vault settings, used by pools configured with `server_auth = "vault"`. +/// +/// PgDog logs into Vault using the configured auth method and fetches +/// dynamic database credentials from the per-user `vault_path`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Vault { + /// Vault server URL, e.g. `https://vault.internal:8200`. + pub url: String, + /// Vault namespace (Vault Enterprise). Sent as the `X-Vault-Namespace` header. + pub namespace: Option, + /// Auth method used to log into Vault. + pub auth_method: VaultAuthMethod, + /// Mount path of the auth method. + /// + /// _Default:_ `kubernetes` for Kubernetes auth, `approle` for AppRole auth. + pub auth_mount: Option, + /// Kubernetes auth: name of the Vault role to log in as. + pub kubernetes_role: Option, + /// Kubernetes auth: path to the service account JWT. + /// + /// _Default:_ `/var/run/secrets/kubernetes.io/serviceaccount/token` + pub kubernetes_jwt_path: Option, + /// AppRole auth: role ID. + pub approle_role_id: Option, + /// AppRole auth: path to a file containing the secret ID. + /// + /// **Note:** If not set, the secret ID is read from the `VAULT_SECRET_ID` + /// environment variable. + pub approle_secret_id_file: Option, +} + +impl Vault { + /// Mount path of the auth method, applying defaults. + pub fn auth_mount(&self) -> &str { + match self.auth_mount.as_deref() { + Some(mount) => mount, + None => match self.auth_method { + VaultAuthMethod::Kubernetes => DEFAULT_KUBERNETES_MOUNT, + VaultAuthMethod::Approle => DEFAULT_APPROLE_MOUNT, + }, + } + } + + /// Path to the service account JWT, applying the default. + pub fn kubernetes_jwt_path(&self) -> &str { + self.kubernetes_jwt_path + .as_deref() + .unwrap_or(DEFAULT_KUBERNETES_JWT_PATH) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kubernetes_defaults() { + let vault: Vault = toml::from_str( + r#" +url = "https://vault.internal:8200" +auth_method = "kubernetes" +kubernetes_role = "pgdog" +"#, + ) + .unwrap(); + + assert_eq!(vault.auth_mount(), "kubernetes"); + assert_eq!( + vault.kubernetes_jwt_path(), + "/var/run/secrets/kubernetes.io/serviceaccount/token" + ); + assert_eq!(vault.kubernetes_role.as_deref(), Some("pgdog")); + } + + #[test] + fn test_approle_defaults() { + let vault: Vault = toml::from_str( + r#" +url = "https://vault.internal:8200" +auth_method = "approle" +approle_role_id = "abc-123" +"#, + ) + .unwrap(); + + assert_eq!(vault.auth_mount(), "approle"); + assert_eq!(vault.approle_role_id.as_deref(), Some("abc-123")); + assert!(vault.approle_secret_id_file.is_none()); + } + + #[test] + fn test_mount_override() { + let vault: Vault = toml::from_str( + r#" +url = "https://vault.internal:8200" +auth_method = "kubernetes" +auth_mount = "k8s-prod" +kubernetes_role = "pgdog" +"#, + ) + .unwrap(); + + assert_eq!(vault.auth_mount(), "k8s-prod"); + } +} diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 27da196e3..6c8f821f7 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -78,7 +78,7 @@ azure_core = "0.34.0" crc32c = "0.6.8" bit-vec = "0.8" smallvec = "1" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider", "json"] } hex = "0.4" x509-parser = "0.18" diff --git a/pgdog/src/backend/auth/azure_workload_identity.rs b/pgdog/src/backend/auth/azure_workload_identity.rs index 1e235e434..a566cf16e 100644 --- a/pgdog/src/backend/auth/azure_workload_identity.rs +++ b/pgdog/src/backend/auth/azure_workload_identity.rs @@ -60,6 +60,8 @@ mod tests { database_number: 0, server_auth: ServerAuth::AzureWorkloadIdentity, server_iam_region: None, + vault_path: None, + vault_refresh_percent: None, configured_role: Role::Auto, }; diff --git a/pgdog/src/backend/auth/mod.rs b/pgdog/src/backend/auth/mod.rs index cda4083ee..c89d0c9c9 100644 --- a/pgdog/src/backend/auth/mod.rs +++ b/pgdog/src/backend/auth/mod.rs @@ -1,2 +1,3 @@ pub mod azure_workload_identity; pub mod rds_iam; +pub mod vault; diff --git a/pgdog/src/backend/auth/rds_iam.rs b/pgdog/src/backend/auth/rds_iam.rs index 696a64eb2..590ca0adc 100644 --- a/pgdog/src/backend/auth/rds_iam.rs +++ b/pgdog/src/backend/auth/rds_iam.rs @@ -101,6 +101,8 @@ mod tests { database_number: 0, server_auth: ServerAuth::RdsIam, server_iam_region: Some("us-east-1".into()), + vault_path: None, + vault_refresh_percent: None, configured_role: Role::Auto, } } diff --git a/pgdog/src/backend/auth/vault.rs b/pgdog/src/backend/auth/vault.rs new file mode 100644 index 000000000..ce5fc5dd7 --- /dev/null +++ b/pgdog/src/backend/auth/vault.rs @@ -0,0 +1,255 @@ +//! HashiCorp Vault dynamic database credentials. +//! +//! Pools configured with `server_auth = "vault"` fetch their username and +//! password from the Vault path configured on the user +//! (e.g. `database/creds/my-role`). Credentials are cached in the global +//! [`TokenCache`](crate::backend::pool::token_cache::TokenCache) and +//! refreshed by the pool monitor after a configured percentage of the +//! lease has elapsed. + +use std::time::{Duration, SystemTime}; + +use once_cell::sync::Lazy; +use parking_lot::Mutex; +use serde::Deserialize; +use serde_json::json; +use tracing::debug; + +use crate::backend::pool::token_cache::{Credentials, FetchedCredentials}; +use crate::backend::{Error, pool::Address}; +use crate::config::config; +use pgdog_config::vault::{DEFAULT_REFRESH_PERCENT, Vault, VaultAuthMethod}; + +/// Re-login this long before the Vault client token expires. +const TOKEN_EXPIRY_BUFFER: Duration = Duration::from_secs(60); + +#[derive(Clone)] +struct VaultToken { + token: String, + expires_at: SystemTime, +} + +/// Cached Vault client token, shared by all pools. +static VAULT_TOKEN: Lazy>> = Lazy::new(|| Mutex::new(None)); + +#[derive(Deserialize)] +struct AuthResponse { + auth: AuthData, +} + +#[derive(Deserialize)] +struct AuthData { + client_token: String, + lease_duration: u64, +} + +#[derive(Deserialize)] +struct SecretResponse { + lease_duration: u64, + data: SecretData, +} + +#[derive(Deserialize)] +struct SecretData { + username: String, + password: String, +} + +fn error(message: impl std::fmt::Display) -> Error { + Error::VaultCredentials(message.to_string()) +} + +fn client(vault: &Vault) -> Result { + let mut builder = reqwest::Client::builder(); + + if let Some(namespace) = vault.namespace.as_deref() { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "X-Vault-Namespace", + namespace + .parse() + .map_err(|_| error("invalid Vault namespace"))?, + ); + builder = builder.default_headers(headers); + } + + builder.build().map_err(error) +} + +async fn login(vault: &Vault) -> Result { + let mount = vault.auth_mount(); + let url = format!( + "{}/v1/auth/{}/login", + vault.url.trim_end_matches('/'), + mount + ); + + let payload = match vault.auth_method { + VaultAuthMethod::Kubernetes => { + let role = vault.kubernetes_role.as_deref().ok_or_else(|| { + error(r#""kubernetes_role" is required for Vault Kubernetes auth"#) + })?; + let jwt = tokio::fs::read_to_string(vault.kubernetes_jwt_path()) + .await + .map_err(|err| { + error(format!( + "failed to read service account JWT from \"{}\": {}", + vault.kubernetes_jwt_path(), + err + )) + })?; + json!({ "jwt": jwt.trim(), "role": role }) + } + + VaultAuthMethod::Approle => { + let role_id = vault + .approle_role_id + .as_deref() + .ok_or_else(|| error(r#""approle_role_id" is required for Vault AppRole auth"#))?; + let secret_id = match vault.approle_secret_id_file.as_deref() { + Some(path) => tokio::fs::read_to_string(path) + .await + .map(|secret| secret.trim().to_owned()) + .map_err(|err| { + error(format!( + "failed to read AppRole secret ID from \"{}\": {}", + path, err + )) + })?, + None => std::env::var("VAULT_SECRET_ID").map_err(|_| { + error( + r#"set "approle_secret_id_file" or the VAULT_SECRET_ID environment variable"#, + ) + })?, + }; + json!({ "role_id": role_id, "secret_id": secret_id }) + } + }; + + let response = client(vault)? + .post(&url) + .json(&payload) + .send() + .await + .map_err(|err| { + error(format!( + "Vault login request to \"{}\" failed: {}", + url, err + )) + })?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(error(format!( + "Vault login at \"{}\" returned {}: {}", + url, status, body + ))); + } + + let auth: AuthResponse = response + .json() + .await + .map_err(|err| error(format!("invalid Vault login response: {}", err)))?; + + Ok(VaultToken { + token: auth.auth.client_token, + expires_at: SystemTime::now() + Duration::from_secs(auth.auth.lease_duration), + }) +} + +/// Get a valid Vault client token, logging in if the cached one is +/// missing or about to expire. +async fn vault_token(vault: &Vault) -> Result { + if let Some(cached) = VAULT_TOKEN.lock().clone() + && SystemTime::now() + TOKEN_EXPIRY_BUFFER < cached.expires_at + { + return Ok(cached.token); + } + + let token = login(vault).await?; + let secret = token.token.clone(); + *VAULT_TOKEN.lock() = Some(token); + debug!("logged into Vault"); + + Ok(secret) +} + +/// Fetch fresh dynamic database credentials for `addr` from Vault. +/// +/// This is the raw fetcher passed to [`TokenCache::credentials_or_fetch`] +/// and called by the monitor's refresh loop. Callers should never invoke +/// it directly — go through +/// [`TokenCache::global`](crate::backend::pool::token_cache::TokenCache::global) +/// instead. +pub(crate) async fn credentials(addr: Address) -> Result { + let vault = config() + .config + .vault + .clone() + .ok_or_else(|| error("[vault] section is missing from pgdog.toml"))?; + + let path = addr.vault_path.as_deref().ok_or_else(|| { + error(format!( + r#""vault_path" is not configured for {}@{}:{}"#, + addr.user, addr.host, addr.port + )) + })?; + + let token = vault_token(&vault).await?; + let url = format!( + "{}/v1/{}", + vault.url.trim_end_matches('/'), + path.trim_start_matches('/') + ); + + let response = client(&vault)? + .get(&url) + .header("X-Vault-Token", token) + .send() + .await + .map_err(|err| { + error(format!( + "Vault credentials request to \"{}\" failed: {}", + url, err + )) + })?; + + let status = response.status(); + + // The cached Vault token may have been revoked — drop it so the next + // attempt logs in again. + if status == reqwest::StatusCode::FORBIDDEN { + *VAULT_TOKEN.lock() = None; + } + + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(error(format!( + "Vault credentials read at \"{}\" returned {}: {}", + url, status, body + ))); + } + + let secret: SecretResponse = response + .json() + .await + .map_err(|err| error(format!("invalid Vault credentials response: {}", err)))?; + + let lease = Duration::from_secs(secret.lease_duration); + let refresh_percent = addr + .vault_refresh_percent + .unwrap_or(DEFAULT_REFRESH_PERCENT) + .clamp(1, 100); + let now = SystemTime::now(); + let refresh_at = now + lease.mul_f64(refresh_percent as f64 / 100.0); + + Ok(FetchedCredentials { + credentials: Credentials { + username: Some(secret.data.username), + secret: secret.data.password, + }, + expires_at: now + lease, + refresh_at: Some(refresh_at), + }) +} diff --git a/pgdog/src/backend/error.rs b/pgdog/src/backend/error.rs index 3b385d2c0..1e5e9eb51 100644 --- a/pgdog/src/backend/error.rs +++ b/pgdog/src/backend/error.rs @@ -120,6 +120,9 @@ pub enum Error { #[error("Azure Workload Identity token generation failed: {0}")] AzureWorkloadIdentityToken(String), + #[error("Vault credentials fetch failed: {0}")] + VaultCredentials(String), + #[error("pub/sub channel disabled")] PubSubDisabled, diff --git a/pgdog/src/backend/pool/address.rs b/pgdog/src/backend/pool/address.rs index f9ecdb6b5..f28d51419 100644 --- a/pgdog/src/backend/pool/address.rs +++ b/pgdog/src/backend/pool/address.rs @@ -9,7 +9,7 @@ use url::Url; use super::{Password, password::PasswordSource}; use crate::backend::Error; -use crate::backend::auth::{azure_workload_identity, rds_iam}; +use crate::backend::auth::{azure_workload_identity, rds_iam, vault}; use crate::backend::pool::dns_cache::DnsCache; use crate::backend::pool::token_cache::TokenCache; use crate::config::{Database, ServerAuth, User, config}; @@ -32,6 +32,12 @@ pub struct Address { pub server_auth: ServerAuth, /// Optional IAM region override. pub server_iam_region: Option, + /// Vault path to fetch dynamic credentials from. + #[serde(default)] + pub vault_path: Option, + /// Percentage of the Vault lease after which credentials are refreshed. + #[serde(default)] + pub vault_refresh_percent: Option, /// Database number (in the config). pub database_number: usize, /// Role given to the database at configuration time. @@ -89,6 +95,8 @@ impl Address { }, server_auth, server_iam_region: user.server_iam_region.clone(), + vault_path: user.vault_path.clone(), + vault_refresh_percent: user.vault_refresh_percent, database_number, configured_role: database.role, } @@ -101,6 +109,17 @@ impl Address { /// only blocks on the very first connection before the monitor has had /// a chance to prime the cache. pub async fn auth_secrets(&self) -> Result, Error> { + Ok(self.auth_credentials().await?.1) + } + + /// Get the username and passwords to log into the server with. + /// + /// The username is the configured one, except for Vault-backed pools + /// where the database secrets engine generates it together with the + /// password. + pub async fn auth_credentials(&self) -> Result<(String, Vec), Error> { + let mut user = self.user.clone(); + let mut secrets = match self.server_auth { ServerAuth::Password => self.passwords.clone(), @@ -117,12 +136,22 @@ impl Address { .await?; vec![Password::new(&token, PasswordSource::AzureIdentity)] } + + ServerAuth::Vault => { + let credentials = TokenCache::global() + .credentials_or_fetch(self, vault::credentials) + .await?; + if let Some(username) = credentials.username { + user = username; + } + vec![Password::new(&credentials.secret, PasswordSource::Vault)] + } }; // Give the valid password first. secrets.sort_by_cached_key(|p| !p.is_valid()); - Ok(secrets) + Ok((user, secrets)) } pub async fn addr(&self) -> Result { @@ -153,6 +182,8 @@ impl Address { database_name: "pgdog".into(), server_auth: ServerAuth::Password, server_iam_region: None, + vault_path: None, + vault_refresh_percent: None, database_number: 0, configured_role: Role::Primary, } @@ -452,6 +483,78 @@ mod test { assert_eq!(secret, "azure-token"); } + #[tokio::test] + async fn test_auth_credentials_vault_serves_username_and_password_from_cache() { + use crate::backend::pool::token_cache::{Credentials, FetchedCredentials}; + + let addr = Address { + host: "auth-secrets-vault.internal".into(), + port: 15435, + user: "configured_user".into(), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/pgdog".into()), + ..Default::default() + }; + + let now = SystemTime::now(); + TokenCache::global().set_credentials( + &addr, + FetchedCredentials { + credentials: Credentials { + username: Some("v-generated-user".into()), + secret: "v-generated-pass".into(), + }, + expires_at: now + Duration::from_secs(3600), + refresh_at: Some(now + Duration::from_secs(2880)), + }, + ); + + let (user, secrets) = addr.auth_credentials().await.unwrap(); + + TokenCache::global().evict(&addr); + + // Vault generates the username — it must override the configured one. + assert_eq!(user, "v-generated-user"); + assert_eq!(secrets.first().unwrap(), "v-generated-pass"); + } + + #[tokio::test] + async fn test_auth_credentials_password_mode_uses_configured_user() { + let addr = Address::new_test(); + let (user, secrets) = addr.auth_credentials().await.unwrap(); + assert_eq!(user, "pgdog"); + assert_eq!(secrets.first().unwrap(), "pgdog"); + } + + #[test] + fn test_vault_fields_from_config() { + let database = Database { + name: "pgdog".into(), + host: "127.0.0.1".into(), + port: 6432, + ..Default::default() + }; + + let user = User { + name: "pgdog".into(), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/pgdog".into()), + vault_refresh_percent: Some(50), + password: Some("ignored".into()), + database: "pgdog".into(), + ..Default::default() + }; + + let address = Address::new(&database, &user, 0); + assert!( + address.passwords.is_empty(), + "Vault addresses must not carry static passwords" + ); + assert_eq!(address.server_auth, ServerAuth::Vault); + assert_eq!(address.vault_path.as_deref(), Some("database/creds/pgdog")); + assert_eq!(address.vault_refresh_percent, Some(50)); + } + #[tokio::test] async fn test_auth_secret_stale_token_still_returned() { // A stale token (past its expiry) is still handed to the server. diff --git a/pgdog/src/backend/pool/monitor.rs b/pgdog/src/backend/pool/monitor.rs index e6382b65f..ad82f8025 100644 --- a/pgdog/src/backend/pool/monitor.rs +++ b/pgdog/src/backend/pool/monitor.rs @@ -46,7 +46,7 @@ use std::time::Duration; use super::{Error, Guard, Healtcheck, Oids, Pool, Request}; -use crate::backend::auth::{azure_workload_identity, rds_iam}; +use crate::backend::auth::{azure_workload_identity, rds_iam, vault}; use crate::backend::pool::inner::ShouldCreate; use crate::backend::pool::token_cache::TokenCache; use crate::backend::{ConnectReason, DisconnectReason, Server}; @@ -192,9 +192,22 @@ impl Monitor { select! { _ = sleep(sleep_duration) => { let result = match addr.server_auth { - ServerAuth::RdsIam => rds_iam::token(addr.clone()).await, + ServerAuth::RdsIam => rds_iam::token(addr.clone()).await.map( + |(token, expires_at)| { + TokenCache::global().set(&addr, token, expires_at) + }, + ), ServerAuth::AzureWorkloadIdentity => { - azure_workload_identity::token(addr.clone()).await + azure_workload_identity::token(addr.clone()).await.map( + |(token, expires_at)| { + TokenCache::global().set(&addr, token, expires_at) + }, + ) + } + ServerAuth::Vault => { + vault::credentials(addr.clone()).await.map(|credentials| { + TokenCache::global().set_credentials(&addr, credentials) + }) } // Guard in spawn() ensures we only reach here for // external identity pools. @@ -202,9 +215,8 @@ impl Monitor { }; match result { - Ok((token, expires_at)) => { + Ok(()) => { debug!("token refreshed [{}]", addr); - TokenCache::global().set(&addr, token, expires_at); } Err(err) => { warn!("token refresh failed, evicting cache entry: {err} [{}]", addr); diff --git a/pgdog/src/backend/pool/password.rs b/pgdog/src/backend/pool/password.rs index d398cf375..7ee6d06d4 100644 --- a/pgdog/src/backend/pool/password.rs +++ b/pgdog/src/backend/pool/password.rs @@ -15,6 +15,7 @@ pub enum PasswordSource { Config, RdsIam, AzureIdentity, + Vault, } impl Display for PasswordSource { @@ -23,6 +24,7 @@ impl Display for PasswordSource { Self::Config => write!(f, "config"), Self::RdsIam => write!(f, "rds iam"), Self::AzureIdentity => write!(f, "azure workload identity"), + Self::Vault => write!(f, "vault"), } } } diff --git a/pgdog/src/backend/pool/token_cache.rs b/pgdog/src/backend/pool/token_cache.rs index e53322145..e92f337b5 100644 --- a/pgdog/src/backend/pool/token_cache.rs +++ b/pgdog/src/backend/pool/token_cache.rs @@ -11,15 +11,57 @@ use crate::backend::{Error, pool::Address}; /// know or re-apply this value. const EXPIRY_BUFFER: Duration = Duration::from_secs(45); +/// Credentials fetched from an external identity provider. +/// +/// Token-based providers (RDS IAM, Azure Workload Identity) only produce a +/// secret; the username comes from the configuration. Vault's database +/// secrets engine generates both, so `username` overrides the configured +/// one when present. +#[derive(Clone, Debug)] +pub struct Credentials { + pub username: Option, + pub secret: String, +} + +/// Credentials plus their lifetime, as returned by a fetcher. +/// +/// `refresh_at`, when set, tells the monitor exactly when to fetch a +/// replacement (e.g. a percentage of a Vault lease). When `None`, the +/// monitor refreshes [`EXPIRY_BUFFER`] before `expires_at`. +#[derive(Clone, Debug)] +pub struct FetchedCredentials { + pub credentials: Credentials, + pub expires_at: SystemTime, + pub refresh_at: Option, +} + #[derive(Clone)] struct CachedToken { - token: String, + credentials: Credentials, expires_at: SystemTime, + refresh_at: Option, } impl CachedToken { fn new(token: String, expires_at: SystemTime) -> Self { - Self { token, expires_at } + Self { + credentials: Credentials { + username: None, + secret: token, + }, + expires_at, + refresh_at: None, + } + } +} + +impl From for CachedToken { + fn from(fetched: FetchedCredentials) -> Self { + Self { + credentials: fetched.credentials, + expires_at: fetched.expires_at, + refresh_at: fetched.refresh_at, + } } } @@ -96,15 +138,23 @@ impl TokenCache { /// instant is already in the past, returns [`Duration::ZERO`] so the /// monitor fires immediately. pub fn refresh_in(&self, addr: &Address) -> Duration { - let Some(expires_at) = self + let Some((expires_at, refresh_at)) = self .inner .lock() .get(&CacheKey::from(addr)) - .map(|c| c.expires_at) + .map(|c| (c.expires_at, c.refresh_at)) else { return Duration::ZERO; // cold start or eviction — fetch immediately }; + // An explicit refresh instant (e.g. a percentage of a Vault lease) + // takes precedence over the expiry buffer. + if let Some(refresh_at) = refresh_at { + return refresh_at + .duration_since(SystemTime::now()) + .unwrap_or(Duration::ZERO); + } + // If the token is already expired or expires within the buffer, // fetch immediately. expires_at @@ -122,6 +172,14 @@ impl TokenCache { .insert(CacheKey::from(addr), CachedToken::new(token, expires_at)); } + /// Store freshly fetched credentials for `addr`, including a + /// generated username and an explicit refresh instant when present. + pub fn set_credentials(&self, addr: &Address, fetched: FetchedCredentials) { + self.inner + .lock() + .insert(CacheKey::from(addr), fetched.into()); + } + /// Remove the cached token for `addr`. /// /// Called by the monitor when a refresh fails, so the next @@ -142,7 +200,7 @@ impl TokenCache { Fut: Future>, { if let Some(cached) = self.inner.lock().get(&CacheKey::from(addr)).cloned() { - return Ok(cached.token); + return Ok(cached.credentials.secret); } // Cold miss — block once to prime the cache. @@ -151,6 +209,30 @@ impl TokenCache { self.set(addr, token.clone(), expires_at); Ok(token) } + + /// Like [`get_or_fetch`](Self::get_or_fetch), but for providers that + /// generate full credentials (username and password), e.g. Vault's + /// database secrets engine. + pub async fn credentials_or_fetch( + &self, + addr: &Address, + fetcher: F, + ) -> Result + where + F: Fn(Address) -> Fut + Send + Sync, + Fut: Future>, + { + if let Some(cached) = self.inner.lock().get(&CacheKey::from(addr)).cloned() { + return Ok(cached.credentials); + } + + // Cold miss — block once to prime the cache. + // After this the monitor's refresh loop takes over. + let fetched = fetcher(addr.clone()).await?; + let credentials = fetched.credentials.clone(); + self.set_credentials(addr, fetched); + Ok(credentials) + } } #[cfg(test)] @@ -237,6 +319,128 @@ mod tests { cache().evict(&a); } + #[test] + fn refresh_in_uses_explicit_refresh_at_when_set() { + let a = addr(9919); + let now = SystemTime::now(); + cache().set_credentials( + &a, + FetchedCredentials { + credentials: Credentials { + username: Some("vault-user".into()), + secret: "vault-pass".into(), + }, + expires_at: now + Duration::from_secs(3600), + // Refresh well before the expiry buffer would. + refresh_at: Some(now + Duration::from_secs(100)), + }, + ); + let d = cache().refresh_in(&a); + assert!(d > Duration::from_secs(95), "expected ~100s, got {d:?}"); + assert!(d <= Duration::from_secs(100), "expected ~100s, got {d:?}"); + cache().evict(&a); + } + + #[test] + fn refresh_in_returns_zero_for_past_refresh_at() { + let a = addr(9920); + let now = SystemTime::now(); + cache().set_credentials( + &a, + FetchedCredentials { + credentials: Credentials { + username: None, + secret: "tok".into(), + }, + expires_at: now + Duration::from_secs(3600), + refresh_at: Some(now - Duration::from_secs(10)), + }, + ); + assert_eq!(cache().refresh_in(&a), Duration::ZERO); + cache().evict(&a); + } + + #[test] + fn credentials_or_fetch_returns_cached_username() { + let rt = tokio::runtime::Runtime::new().unwrap(); + let a = addr(9921); + let now = SystemTime::now(); + cache().set_credentials( + &a, + FetchedCredentials { + credentials: Credentials { + username: Some("v-user".into()), + secret: "v-pass".into(), + }, + expires_at: now + Duration::from_secs(3600), + refresh_at: None, + }, + ); + + let credentials = rt + .block_on(cache().credentials_or_fetch(&a, |_| async { + panic!("fetcher must not be called on a cache hit"); + })) + .unwrap(); + + assert_eq!(credentials.username.as_deref(), Some("v-user")); + assert_eq!(credentials.secret, "v-pass"); + cache().evict(&a); + } + + #[test] + fn credentials_or_fetch_cold_miss_primes_cache() { + let rt = tokio::runtime::Runtime::new().unwrap(); + let a = addr(9922); + cache().evict(&a); + + let now = SystemTime::now(); + let credentials = rt + .block_on(cache().credentials_or_fetch(&a, move |_| async move { + Ok(FetchedCredentials { + credentials: Credentials { + username: Some("fresh-user".into()), + secret: "fresh-pass".into(), + }, + expires_at: now + Duration::from_secs(3600), + refresh_at: Some(now + Duration::from_secs(2880)), + }) + })) + .unwrap(); + + assert_eq!(credentials.username.as_deref(), Some("fresh-user")); + assert!(cache().expires_at(&a).is_some()); + cache().evict(&a); + } + + #[test] + fn get_or_fetch_returns_secret_of_cached_credentials() { + // A pool whose cache was primed via set_credentials still serves + // the secret through the token-only accessor. + let rt = tokio::runtime::Runtime::new().unwrap(); + let a = addr(9923); + cache().set_credentials( + &a, + FetchedCredentials { + credentials: Credentials { + username: Some("u".into()), + secret: "s".into(), + }, + expires_at: SystemTime::now() + Duration::from_secs(3600), + refresh_at: None, + }, + ); + + let token = rt.block_on(cache().get_or_fetch(&a, |_| async { + panic!("fetcher must not be called on a cache hit"); + #[allow(unreachable_code)] + Ok(("unreachable".into(), SystemTime::now())) + })); + + assert_eq!(token.unwrap(), "s"); + cache().evict(&a); + } + #[test] fn refresh_in_returns_duration_until_refresh_window() { let a = addr(9918); diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5d9356b3e..18b31ebd5 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -97,11 +97,12 @@ impl Server { options: ServerOptions, connect_reason: ConnectReason, ) -> Result { - let auth_secrets = addr.auth_secrets().await?; + let (user, auth_secrets) = addr.auth_credentials().await?; let total = auth_secrets.len(); for (idx, auth_secret) in auth_secrets.into_iter().enumerate() { match Self::connect_with_auth_secret( addr, + &user, options.clone(), connect_reason, &auth_secret, @@ -142,6 +143,7 @@ impl Server { /// Create new PostgreSQL server connection with the given auth secret (e.g. password). async fn connect_with_auth_secret( addr: &Address, + user: &str, options: ServerOptions, connect_reason: ConnectReason, auth_secret: &super::pool::Password, @@ -220,14 +222,12 @@ impl Server { } stream - .write_all( - &Startup::new(&addr.user, &addr.database_name, options.params.clone()).to_bytes(), - ) + .write_all(&Startup::new(user, &addr.database_name, options.params.clone()).to_bytes()) .await?; stream.flush().await?; // Perform authentication. - let mut scram = Client::new(&addr.user, auth_secret); + let mut scram = Client::new(user, auth_secret); let mut auth_type = AuthType::Trust; loop { let message = stream.read().await?; @@ -263,11 +263,8 @@ impl Server { } Authentication::Md5(salt) => { auth_type = AuthType::Md5; - let client = md5::Client::new_salt( - &addr.user, - &[auth_secret.to_string()], - &salt, - )?; + let client = + md5::Client::new_salt(user, &[auth_secret.to_string()], &salt)?; stream.send_flush(&client.response()?).await?; } }