Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pgdog-stats/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct Counts {
pub reads: usize,
/// Number of write transactions.
pub writes: usize,
/// Password attempts.
pub auth_attempts: usize,
}

impl Sub for Counts {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Sub for Counts {
connect_count: self.connect_count.saturating_sub(rhs.connect_count),
reads: self.reads.saturating_sub(rhs.reads),
writes: self.writes.saturating_sub(rhs.writes),
auth_attempts: self.auth_attempts.saturating_sub(rhs.auth_attempts),
}
}
}
Expand Down Expand Up @@ -124,6 +127,7 @@ impl Add for Counts {
connect_time: self.connect_time.saturating_add(rhs.connect_time),
reads: self.reads.saturating_add(rhs.reads),
writes: self.writes.saturating_add(rhs.writes),
auth_attempts: self.auth_attempts.saturating_add(rhs.auth_attempts),
}
}
}
Expand Down Expand Up @@ -161,6 +165,7 @@ impl Div<usize> for Counts {
connect_count: self.connect_count.checked_div(rhs).unwrap_or(0),
reads: self.reads.checked_div(rhs).unwrap_or(0),
writes: self.writes.checked_div(rhs).unwrap_or(0),
auth_attempts: self.auth_attempts.checked_div(rhs).unwrap_or(0),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pgdog-stats/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ impl Add<Counts> for PoolCounts {
errors: self.errors + rhs.errors,
cleaned: self.cleaned + rhs.cleaned,
prepared_sync: self.prepared_sync + rhs.prepared_sync,
// These are not counted by each server stats.
connect_count: self.connect_count,
connect_time: self.connect_time,
writes: self.writes,
reads: self.reads,
auth_attempts: self.auth_attempts,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pgdog/src/admin/show_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl Command for ShowStats {
Field::numeric(&format!("{}_connect_count", prefix)),
Field::numeric(&format!("{}_reads", prefix)),
Field::numeric(&format!("{}_writes", prefix)),
Field::numeric(&format!("{}_auth_attempts", prefix)),
]
})
.collect::<Vec<Field>>(),
Expand Down Expand Up @@ -99,7 +100,8 @@ impl Command for ShowStats {
.add(millis(stat.connect_time))
.add(stat.connect_count)
.add(stat.reads)
.add(stat.writes);
.add(stat.writes)
.add(stat.auth_attempts);
}

messages.push(dr.message()?);
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/auth/azure_workload_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
port: 5432,
database_name: "postgres".into(),
user: "db_user".into(),
passwords: vec![String::new()],
passwords: vec![String::new().into()],
database_number: 0,
server_auth: ServerAuth::AzureWorkloadIdentity,
server_iam_region: None,
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/auth/rds_iam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ mod tests {
port: 5432,
database_name: "postgres".into(),
user: "db_user".into(),
passwords: vec![String::new()],
passwords: vec![String::new().into()],
database_number: 0,
server_auth: ServerAuth::RdsIam,
server_iam_region: Some("us-east-1".into()),
Expand Down
8 changes: 8 additions & 0 deletions pgdog/src/backend/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,12 @@ impl Error {
_ => false,
}
}

pub fn is_auth(&self) -> bool {
match self {
Self::Auth(_) => true,
Self::ConnectionError(err) => err.code == "28000" || err.is_bad_password(),
_ => false,
}
}
}
111 changes: 97 additions & 14 deletions pgdog/src/backend/pool/address.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! Server address.
use std::net::{SocketAddr, ToSocketAddrs};
use std::ops::Deref;

use pgdog_config::users::PasswordKind;
use pgdog_config::Role;
use serde::{Deserialize, Serialize};
use url::Url;

use super::Password;
use crate::backend::{pool::dns_cache::DnsCache, Error};
use crate::config::{config, Database, ServerAuth, User};

Expand All @@ -21,7 +23,7 @@ pub struct Address {
/// Username.
pub user: String,
/// Password.
pub passwords: Vec<String>,
pub passwords: Vec<Password>,
/// Server auth mode for backend connections.
#[serde(default)]
pub server_auth: ServerAuth,
Expand All @@ -41,7 +43,7 @@ impl From<Address> for pgdog_stats::Address {
port: value.port,
database_name: value.database_name,
user: value.user,
passwords: value.passwords.clone(),
passwords: value.passwords.iter().map(|p| p.deref().clone()).collect(),
server_auth: value.server_auth,
server_iam_region: value.server_iam_region,
database_number: value.database_number,
Expand Down Expand Up @@ -72,14 +74,14 @@ impl Address {
passwords: if server_auth.is_external_identity() {
vec![]
} else if let Some(password) = database.password.clone() {
vec![password]
vec![password.into()]
} else if let Some(password) = user.server_password.clone() {
vec![password]
vec![password.into()]
} else {
user.passwords()
.into_iter()
.filter(|p| matches!(p, PasswordKind::Plain(_)))
.map(|p| p.to_string())
.map(|p| p.to_string().into())
.collect()
},
server_auth,
Expand All @@ -89,14 +91,22 @@ impl Address {
}
}

pub async fn auth_secrets(&self) -> Result<Vec<String>, Error> {
match self.server_auth {
ServerAuth::Password => Ok(self.passwords.clone()),
ServerAuth::RdsIam => Ok(vec![crate::backend::auth::rds_iam::token(self).await?]),
ServerAuth::AzureWorkloadIdentity => Ok(vec![
crate::backend::auth::azure_workload_identity::token(self).await?,
]),
}
/// Get address passwords, in valid order.
pub async fn auth_secrets(&self) -> Result<Vec<Password>, Error> {
let mut secrets = match self.server_auth {
ServerAuth::Password => self.passwords.clone(),
ServerAuth::RdsIam => vec![crate::backend::auth::rds_iam::token(self).await?.into()],
ServerAuth::AzureWorkloadIdentity => {
vec![crate::backend::auth::azure_workload_identity::token(self)
.await?
.into()]
}
};

// Give the valid password first.
secrets.sort_by_cached_key(|p| !p.is_valid());

Ok(secrets)
}

pub async fn addr(&self) -> Result<SocketAddr, Error> {
Expand Down Expand Up @@ -159,7 +169,7 @@ impl TryFrom<Url> for Address {
Ok(Self {
host,
port,
passwords: vec![password],
passwords: vec![password.into()],
user,
database_name,
server_auth: ServerAuth::Password,
Expand Down Expand Up @@ -304,6 +314,79 @@ mod test {
assert_eq!(secret, "token-from-iam");
}

#[tokio::test]
async fn test_auth_secrets_returns_valid_password_first() {
let mut addr = Address::new_test();
let invalid1: Password = "invalid1".into();
let invalid2: Password = "invalid2".into();
let valid: Password = "valid".into();
invalid1.valid(false);
invalid2.valid(false);
addr.passwords = vec![invalid1, valid, invalid2];

let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.len(), 3);
assert_eq!(secrets.first().unwrap(), "valid");
assert!(secrets.first().unwrap().is_valid());

// Even if the valid password is last, it should still come first.
let mut addr = Address::new_test();
let invalid1: Password = "invalid1".into();
let invalid2: Password = "invalid2".into();
let valid: Password = "valid".into();
invalid1.valid(false);
invalid2.valid(false);
addr.passwords = vec![invalid1, invalid2, valid];

let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.first().unwrap(), "valid");

// With multiple valid passwords, a valid one is still first.
let mut addr = Address::new_test();
let invalid: Password = "invalid".into();
invalid.valid(false);
addr.passwords = vec![invalid, "valid_a".into(), "valid_b".into()];

let secrets = addr.auth_secrets().await.unwrap();
let head = secrets.first().unwrap();
assert!(head.is_valid());
assert!(head == "valid_a" || head == "valid_b");

// Flipping validity at runtime changes which password comes first.
let mut addr = Address::new_test();
let first: Password = "first".into();
let second: Password = "second".into();
addr.passwords = vec![first.clone(), second.clone()];

// Both valid: order is preserved (sort is stable on the !is_valid key).
let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.first().unwrap(), "first");
assert_eq!(secrets.get(1).unwrap(), "second");

// Mark "first" invalid — "second" must now win.
first.valid(false);
let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.first().unwrap(), "second");
assert!(secrets.first().unwrap().is_valid());
assert_eq!(secrets.get(1).unwrap(), "first");
assert!(!secrets.get(1).unwrap().is_valid());

// Mark "second" invalid too — no valid password, but order stays stable.
second.valid(false);
let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.first().unwrap(), "first");
assert!(!secrets.first().unwrap().is_valid());
assert_eq!(secrets.get(1).unwrap(), "second");

// Restore "first" — it should be returned first again.
first.valid(true);
let secrets = addr.auth_secrets().await.unwrap();
assert_eq!(secrets.first().unwrap(), "first");
assert!(secrets.first().unwrap().is_valid());
assert_eq!(secrets.get(1).unwrap(), "second");
assert!(!secrets.get(1).unwrap().is_valid());
}

#[tokio::test]
async fn test_auth_secret_azure_workload_identity_mode_uses_generator() {
let mut addr = Address::new_test();
Expand Down
2 changes: 2 additions & 0 deletions pgdog/src/backend/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod mapping;
pub mod mirror_stats;
pub mod monitor;
pub mod oids;
pub mod password;
pub mod pool_impl;
pub mod request;
pub mod shard;
Expand All @@ -38,6 +39,7 @@ pub use lsn_monitor::LsnStats;
pub use mirror_stats::MirrorStats;
pub use monitor::Monitor;
pub use oids::Oids;
pub use password::Password;
pub use pool_impl::Pool;
pub use request::Request;
pub use shard::Shard;
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/backend/pool/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,16 @@ impl Monitor {
let mut guard = pool.lock();
guard.stats.counts.connect_count += 1;
guard.stats.counts.connect_time += elapsed;
guard.stats.counts.auth_attempts += conn.password_attempts();
}
return Ok(conn);
}

Ok(Err(err)) => {
// We tried all passwords and they were all wrong.
if err.is_auth() {
pool.lock().stats.counts.auth_attempts += pool.addr().passwords.len();
}
error!(
"{}error connecting to server: {} [{}]",
if attempt > 0 {
Expand Down
79 changes: 79 additions & 0 deletions pgdog/src/backend/pool/password.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{
hash::Hash,
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Password {
pub(crate) password: String,
pub(crate) valid: Arc<AtomicBool>,
}

impl From<String> for Password {
fn from(password: String) -> Self {
Self {
password,
valid: Arc::new(AtomicBool::new(true)),
}
}
}

impl From<&str> for Password {
fn from(password: &str) -> Self {
Self {
password: password.to_string(),
valid: Arc::new(AtomicBool::new(true)),
}
}
}

impl Hash for Password {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.password.hash(state);
self.is_valid().hash(state);
}
}

impl Eq for Password {}

impl PartialEq<&str> for Password {
fn eq(&self, other: &&str) -> bool {
self.password.as_str() == *other
}
}

impl PartialEq<str> for Password {
fn eq(&self, other: &str) -> bool {
self.password.as_str() == other
}
}

impl PartialEq for Password {
fn eq(&self, other: &Self) -> bool {
self.password == other.password
}
}

impl Deref for Password {
type Target = String;

fn deref(&self) -> &Self::Target {
&self.password
}
}

impl Password {
pub(crate) fn is_valid(&self) -> bool {
self.valid.load(Ordering::Relaxed)
}

pub(crate) fn valid(&self, valid: bool) {
self.valid.store(valid, Ordering::Relaxed)
}
}
Loading
Loading