diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index f0d777879..ed2ecb838 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -23,7 +23,7 @@ use actix_web::{ Error, HttpMessage, HttpRequest, Route, dev::{Service, ServiceRequest, ServiceResponse, Transform, forward_ready}, error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized}, - http::header::{self, HeaderName, HeaderValue}, + http::header::{self, HeaderMap, HeaderName, HeaderValue}, }; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use chrono::{Duration, TimeDelta, Utc}; @@ -194,7 +194,7 @@ where } let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); - + let headers = req.headers().clone(); let fut = self.service.call(req); Box::pin(async move { let Ok(key) = key else { @@ -209,7 +209,7 @@ where // if session is expired, refresh token if sessions().is_session_expired(&key) { - refresh_token(user_and_tenant_id, &key).await?; + refresh_token(user_and_tenant_id, &key, headers).await?; } match auth_result? { @@ -296,6 +296,7 @@ fn get_user_and_tenant( pub async fn refresh_token( user_and_tenant_id: Result<(Result, Option), RBACError>, key: &SessionKey, + headers: HeaderMap, ) -> Result<(), Error> { let oidc_client = OIDC_CLIENT.get(); @@ -320,8 +321,7 @@ pub async fn refresh_token( let refreshed_token = match client .read() .await - .client() - .refresh_token(&oauth_data, Some(PARSEABLE.options.scope.as_str())) + .refresh_token(&oauth_data, Some(PARSEABLE.options.scope.as_str()), headers) .await { Ok(bearer) => bearer, @@ -571,6 +571,10 @@ where header::COOKIE, HeaderValue::from_str(&format!("session={}", id)).unwrap(), ); + + // remove basic auth header + req.headers_mut().remove(header::AUTHORIZATION); + let session = SessionKey::SessionId(id); req.extensions_mut().insert(session.clone()); Users.new_session(&user, session, TimeDelta::seconds(20)); diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 471483d6d..115f8b643 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -26,7 +26,6 @@ use base64::{Engine, prelude::BASE64_STANDARD}; use bytes::Bytes; use futures::future; use once_cell::sync::OnceCell; -use openid::Discovered; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -40,7 +39,7 @@ use crate::{ correlation::CORRELATIONS, hottier::{HotTierManager, StreamHotTier}, metastore::metastore_traits::MetastoreObject, - oidc::{Claims, DiscoveredClient}, + oauth::{OAuthProvider, connect_oidc}, option::Mode, parseable::{DEFAULT_TENANT, PARSEABLE}, storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY}, @@ -48,7 +47,7 @@ use crate::{ utils::get_node_id, }; -use super::{API_BASE_PATH, API_VERSION, cross_origin_config, health_check, resource_check}; +use super::{cross_origin_config, health_check, resource_check}; pub mod ingest; pub mod ingest_server; @@ -58,28 +57,7 @@ pub mod server; pub mod ssl_acceptor; pub mod utils; -pub type OpenIdClient = Arc>; - -pub static OIDC_CLIENT: OnceCell>> = OnceCell::new(); - -#[derive(Debug)] -pub struct GlobalClient { - client: DiscoveredClient, -} - -impl GlobalClient { - pub fn set(&mut self, client: DiscoveredClient) { - self.client = client; - } - - pub fn client(&self) -> &DiscoveredClient { - &self.client - } - - pub fn new(client: DiscoveredClient) -> Self { - Self { client } - } -} +pub static OIDC_CLIENT: OnceCell>>> = OnceCell::new(); // to be decided on what the Default version should be pub const DEFAULT_VERSION: &str = "v4"; @@ -114,10 +92,9 @@ pub trait ParseableServer { Self: Sized, { if let Some(config) = oidc_client { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - OIDC_CLIENT.get_or_init(|| Arc::new(RwLock::new(GlobalClient::new(client)))); + let gc = connect_oidc(config).await?; + OIDC_CLIENT + .get_or_init(|| Arc::new(RwLock::new(Box::new(gc) as Box))); } // get the ssl stuff diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 9b7aa4aea..29a977c3d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -95,6 +95,7 @@ impl ParseableServer for Server { .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope()) .service(Self::get_user_role_webscope()) + .service(Self::get_roles_webscope()) .service(Self::get_counts_webscope().wrap(from_fn( resource_check::check_resource_utilization_middleware, ))) diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 49bbb2e8c..df89e5972 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -16,7 +16,7 @@ * */ -use std::{collections::HashSet, sync::Arc}; +use std::collections::HashSet; use actix_web::http::StatusCode; use actix_web::{ @@ -26,22 +26,18 @@ use actix_web::{ web, }; use chrono::{Duration, TimeDelta}; -use openid::{Bearer, Options, Token, Userinfo}; +use openid::Bearer; use regex::Regex; use serde::Deserialize; -use tokio::sync::RwLock; use ulid::Ulid; use url::Url; use crate::{ handlers::{ COOKIE_AGE_DAYS, SESSION_COOKIE_NAME, USER_COOKIE_NAME, USER_ID_COOKIE_NAME, - http::{ - API_BASE_PATH, API_VERSION, - modal::{GlobalClient, OIDC_CLIENT}, - }, + http::modal::OIDC_CLIENT, }, - oidc::{Claims, DiscoveredClient}, + oauth::OAuthSession, parseable::{DEFAULT_TENANT, PARSEABLE}, rbac::{ self, EXPIRY_DURATION, Users, @@ -87,11 +83,15 @@ pub async fn login( let (session_key, oidc_client) = match (session_key, oidc_client) { (None, None) => return Ok(redirect_no_oauth_setup(query.redirect.clone())), (None, Some(client)) => { - return Ok(redirect_to_oidc( - query, - client.read().await.client(), - PARSEABLE.options.scope.to_string().as_str(), - )); + let redirect = query.into_inner().redirect.to_string(); + + let scope = PARSEABLE.options.scope.to_string(); + let mut auth_url: String = client.read().await.auth_url(&scope, Some(redirect)).into(); + + auth_url.push_str("&access_type=offline&prompt=consent"); + return Ok(HttpResponse::TemporaryRedirect() + .insert_header((actix_web::http::header::LOCATION, auth_url)) + .finish()); } (Some(session_key), client) => (session_key, client), }; @@ -138,11 +138,17 @@ pub async fn login( } else { Users.remove_session(&key); if let Some(oidc_client) = oidc_client { - redirect_to_oidc( - query, - oidc_client.read().await.client(), - PARSEABLE.options.scope.to_string().as_str(), - ) + let redirect = query.into_inner().redirect.to_string(); + let scope = PARSEABLE.options.scope.to_string(); + let mut auth_url: String = oidc_client + .read() + .await + .auth_url(&scope, Some(redirect)) + .into(); + auth_url.push_str("&access_type=offline&prompt=consent"); + HttpResponse::TemporaryRedirect() + .insert_header((actix_web::http::header::LOCATION, auth_url)) + .finish() } else { redirect_to_client(query.redirect.as_str(), None) } @@ -161,13 +167,7 @@ pub async fn logout(req: HttpRequest, query: web::Query) -> let tenant_id = get_tenant_id_from_key(&session); let user = Users.remove_session(&session); let logout_endpoint = if let Some(client) = oidc_client { - client - .read() - .await - .client() - .config() - .end_session_endpoint - .clone() + client.read().await.logout_url() } else { None }; @@ -195,37 +195,41 @@ pub async fn reply_login( }; let tenant_id = get_tenant_id_from_request(&req); - let (mut claims, user_info, bearer) = match request_token(oidc_client, &login_query).await { - Ok(v) => v, + let OAuthSession { + bearer, + claims, + userinfo: user_info, + } = match oidc_client + .write() + .await + .exchange_code(&login_query.code) + .await + { + Ok(session) => session, Err(e) => { - tracing::error!("reply_login call failed- {e}"); + tracing::error!("reply_login exchange_code failed: {e}"); return Ok(HttpResponse::Unauthorized().finish()); } }; - let username = user_info + + let Some(username) = user_info .name .clone() .or_else(|| user_info.email.clone()) .or_else(|| user_info.sub.clone()) - .expect("OIDC provider did not return a usable identifier (name, email or sub)"); + else { + tracing::error!("OAuth provider did not return a usable identifier (name, email or sub)"); + return Err(OIDCError::Unauthorized); + }; let user_id = match user_info.sub.clone() { Some(id) => id, None => { - tracing::error!("OIDC provider did not return a sub"); + tracing::error!("OAuth provider did not return a sub"); return Err(OIDCError::Unauthorized); } }; let user_info: user::UserInfo = user_info.into(); - - // if provider has group A, and parseable as has role A - // then user will automatically get assigned role A - // else, the default oidc role (inside parseable) will get assigned - let group: HashSet = claims - .other - .remove("groups") - .map(serde_json::from_value) - .transpose()? - .unwrap_or_default(); + let group = claims.groups.clone(); let metadata = get_metadata(&tenant_id).await?; // Find which OIDC groups match existing roles in Parseable @@ -239,7 +243,6 @@ pub async fn reply_login( let default_role = if let Some(role) = DEFAULT_ROLE .read() - // .unwrap() .get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) && let Some(role) = role { @@ -247,13 +250,8 @@ pub async fn reply_login( } else { HashSet::new() }; - // let default_role = if let Some(default_role) = DEFAULT_ROLE.lock().unwrap().clone() { - // HashSet::from([default_role]) - // } else { - // HashSet::new() - // }; - let existing_user = find_existing_user(&user_info, tenant_id); + let existing_user = find_existing_user(&user_info, tenant_id.clone()); let mut final_roles = match existing_user { Some(ref user) => { // For existing users: keep existing roles + add new valid OIDC roles @@ -274,6 +272,21 @@ pub async fn reply_login( // If no roles were found, use the default role final_roles.clone_from(&default_role); } + // If still no roles, look for a native user with the same email + // and inherit their roles (e.g. tenant owner logging in via OAuth) + if final_roles.is_empty() + && let Some(email) = &user_info.email + { + for u in &metadata.users { + if matches!(u.ty, UserType::Native(_)) + && u.userid() == email.as_str() + && !u.roles.is_empty() + { + final_roles.clone_from(&u.roles); + break; + } + } + } let expires_in = if let Some(expires_in) = bearer.expires_in.as_ref() { // need an i64 somehow @@ -289,26 +302,45 @@ pub async fn reply_login( let user = match (existing_user, final_roles) { (Some(user), roles) => update_user_if_changed(user, roles, user_info, bearer).await?, - // LET TENANT BE NONE FOR NOW!!! - (None, roles) => put_user(&user_id, roles, user_info, bearer, None).await?, + (None, roles) => put_user(&user_id, roles, user_info, bearer, tenant_id.clone()).await?, }; - let id = Ulid::new(); + let id = Ulid::new(); Users.new_session(&user, SessionKey::SessionId(id), expires_in); - let redirect_url = login_query - .state - .clone() - .unwrap_or_else(|| PARSEABLE.options.address.to_string()); - - Ok(redirect_to_client( - &redirect_url, - [ - cookie_session(id), - cookie_username(&username), - cookie_userid(&user_id), - ], - )) + let cookies = [ + cookie_session(id), + cookie_username(&username), + cookie_userid(&user_id), + ]; + + // If the request is an XHR/fetch call (e.g. from the SPA frontend), + // return 200 with cookies instead of a 301 redirect to avoid CORS issues. + let is_xhr = req.headers().contains_key("x-p-tenant") + || req + .headers() + .get("accept") + .and_then(|v| v.to_str().ok()) + .is_some_and(|v| v.contains("application/json")); + + if is_xhr { + let mut response = HttpResponse::Ok(); + for cookie in cookies { + response.cookie(cookie); + } + Ok(response.json(serde_json::json!({ + "session": id.to_string(), + "username": username, + "user_id": user_id, + }))) + } else { + let redirect_url = login_query + .state + .clone() + .unwrap_or_else(|| PARSEABLE.options.address.to_string()); + + Ok(redirect_to_client(&redirect_url, cookies)) + } } fn find_existing_user(user_info: &user::UserInfo, tenant_id: Option) -> Option { @@ -347,24 +379,6 @@ fn exchange_basic_for_cookie( cookie_session(id) } -fn redirect_to_oidc( - query: web::Query, - oidc_client: &DiscoveredClient, - scope: &str, -) -> HttpResponse { - let redirect = query.into_inner().redirect.to_string(); - let auth_url = oidc_client.auth_url(&Options { - scope: Some(scope.to_string()), - state: Some(redirect), - ..Default::default() - }); - let mut url: String = auth_url.into(); - url.push_str("&access_type=offline&prompt=consent"); - HttpResponse::TemporaryRedirect() - .insert_header((actix_web::http::header::LOCATION, url)) - .finish() -} - fn redirect_to_oidc_logout(mut logout_endpoint: Url, redirect: &Url) -> HttpResponse { logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={redirect}"))); HttpResponse::TemporaryRedirect() @@ -401,7 +415,8 @@ fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse { pub fn cookie_session(id: Ulid) -> Cookie<'static> { Cookie::build(SESSION_COOKIE_NAME, id.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) - .same_site(SameSite::Strict) + .same_site(SameSite::None) + .secure(true) .path("/") .finish() } @@ -409,7 +424,8 @@ pub fn cookie_session(id: Ulid) -> Cookie<'static> { pub fn cookie_username(username: &str) -> Cookie<'static> { Cookie::build(USER_COOKIE_NAME, username.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) - .same_site(SameSite::Strict) + .same_site(SameSite::None) + .secure(true) .path("/") .finish() } @@ -417,56 +433,12 @@ pub fn cookie_username(username: &str) -> Cookie<'static> { pub fn cookie_userid(user_id: &str) -> Cookie<'static> { Cookie::build(USER_ID_COOKIE_NAME, user_id.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) - .same_site(SameSite::Strict) + .same_site(SameSite::None) + .secure(true) .path("/") .finish() } -pub async fn request_token( - oidc_client: &Arc>, - login_query: &Login, -) -> anyhow::Result<(Claims, Userinfo, Bearer)> { - let old_client = oidc_client.read().await.client().clone(); - let mut token: Token = old_client.request_token(&login_query.code).await?.into(); - - let id_token = if let Some(token) = token.id_token.as_mut() { - token - } else { - return Err(anyhow::anyhow!("No id_token provided")); - }; - - if let Err(e) = old_client.decode_token(id_token) { - tracing::error!("error while decoding the id_token- {e}"); - let new_client = PARSEABLE - .options - .openid() - .unwrap() - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - - // Reuse the already-obtained token, just decode with new client's JWKS - new_client.decode_token(id_token)?; - new_client.validate_token(id_token, None, None)?; - let claims = id_token.payload().expect("payload is decoded").clone(); - - let userinfo = new_client.request_userinfo(&token).await?; - let bearer = token.bearer; - - // replace old client with new one - drop(old_client); - - oidc_client.write().await.set(new_client); - return Ok((claims, userinfo, bearer)); - } - - old_client.validate_token(id_token, None, None)?; - let claims = id_token.payload().expect("payload is decoded").clone(); - - let userinfo = old_client.request_userinfo(&token).await?; - let bearer = token.bearer; - Ok((claims, userinfo, bearer)) -} - // put new user in metadata if does not exit // update local cache pub async fn put_user( diff --git a/src/lib.rs b/src/lib.rs index b6d15205e..75b4254be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ mod metadata; pub mod metastore; pub mod metrics; pub mod migration; +pub mod oauth; pub mod oidc; pub mod option; pub mod otel; diff --git a/src/oauth/mod.rs b/src/oauth/mod.rs new file mode 100644 index 000000000..9eff358ea --- /dev/null +++ b/src/oauth/mod.rs @@ -0,0 +1,5 @@ +pub mod oidc_client; +pub mod provider; + +pub use oidc_client::{GlobalClient, connect_oidc}; +pub use provider::{OAuthProvider, OAuthSession, ProviderClaims, ProviderUserInfo}; diff --git a/src/oauth/oidc_client.rs b/src/oauth/oidc_client.rs new file mode 100644 index 000000000..a2e492eaa --- /dev/null +++ b/src/oauth/oidc_client.rs @@ -0,0 +1,140 @@ +use actix_web::http::header::HeaderMap; +use async_trait::async_trait; +use openid::{Bearer, Options, Token}; +use url::Url; + +use crate::{ + handlers::http::{API_BASE_PATH, API_VERSION}, + oauth::provider::{OAuthProvider, OAuthSession, ProviderClaims, ProviderUserInfo}, + oidc::{Claims, DiscoveredClient, OpenidConfig}, + rbac::user::OAuth, +}; + +/// Wraps the OpenID Connect `DiscoveredClient`. +/// +/// Stores the original `OpenidConfig` and the redirect suffix so that it can +/// reconnect (rotating the JWKS) inside `exchange_code` without any outside +/// help. +#[derive(Debug)] +pub struct GlobalClient { + client: DiscoveredClient, + /// Original config – cloned and used to reconnect on JWKS rotation. + config: OpenidConfig, + /// `"api/v1/o/code"` – the path appended to the base URL for the + /// redirect URI when re-discovering. + redirect_suffix: String, +} + +impl GlobalClient { + pub fn new(client: DiscoveredClient, config: OpenidConfig, redirect_suffix: String) -> Self { + Self { + client, + config, + redirect_suffix, + } + } +} + +#[async_trait] +impl OAuthProvider for GlobalClient { + fn auth_url(&self, scope: &str, state: Option) -> Url { + self.client.auth_url(&Options { + scope: Some(scope.to_string()), + state, + ..Default::default() + }) + } + + /// Exchange an authorization code for the full session, handling JWKS + /// rotation transparently: if `decode_token` fails with the cached client, + /// a fresh discovery is performed and decoding is retried once. + async fn exchange_code(&mut self, code: &str) -> Result { + let mut token: Token = self.client.request_token(code).await?.into(); + + let id_token = token + .id_token + .as_mut() + .ok_or_else(|| anyhow::anyhow!("OIDC provider did not return an id_token"))?; + + if let Err(e) = self.client.decode_token(id_token) { + // Stale JWKS – reconnect and retry once. + tracing::warn!("id_token decode failed ({e}), rotating JWKS and retrying"); + self.client = self.config.clone().connect(&self.redirect_suffix).await?; + self.client.decode_token(id_token)?; + } + + self.client.validate_token(id_token, None, None)?; + + let raw_claims = id_token + .payload() + .expect("token is decoded at this point") + .clone(); + + // `sub` is a required non-optional String in StandardClaims. + // `email` and `name` are not in StandardClaims (they're userinfo fields) + // but providers sometimes include them as additional claims in the ID + // token; extract them from `other` if present. + let groups: std::collections::HashSet = raw_claims + .other + .get("groups") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + let claims = ProviderClaims { + sub: Some(raw_claims.standard.sub.clone()), + email: raw_claims + .other + .get("email") + .and_then(|v| v.as_str()) + .map(String::from), + name: raw_claims + .other + .get("name") + .and_then(|v| v.as_str()) + .map(String::from), + groups, + other: raw_claims.other.clone(), + }; + + let userinfo_raw = self.client.request_userinfo(&token).await?; + let userinfo = ProviderUserInfo { + sub: userinfo_raw.sub.clone(), + email: userinfo_raw.email.clone(), + name: userinfo_raw.name.clone(), + preferred_username: userinfo_raw.preferred_username.clone(), + + picture: userinfo_raw + .picture + .as_ref() + .map(|p| p.as_str().to_string()), + }; + + Ok(OAuthSession { + bearer: token.bearer, + claims, + userinfo, + }) + } + + async fn refresh_token( + &self, + oauth: &OAuth, + scope: Option<&str>, + _headers: HeaderMap, + ) -> Result { + // Box the clone so we can pass it to the openid client. + let boxed: Box = Box::new(oauth.clone()); + Ok(self.client.refresh_token(boxed, scope).await?) + } + + fn logout_url(&self) -> Option { + self.client.config().end_session_endpoint.clone() + } +} + +/// Runs OIDC discovery and wraps the result in a `GlobalClient`. +pub async fn connect_oidc(config: OpenidConfig) -> Result { + let redirect_suffix = format!("{API_BASE_PATH}/{API_VERSION}/o/code"); + let client = config.clone().connect(&redirect_suffix).await?; + Ok(GlobalClient::new(client, config, redirect_suffix)) +} diff --git a/src/oauth/provider.rs b/src/oauth/provider.rs new file mode 100644 index 000000000..668d8880c --- /dev/null +++ b/src/oauth/provider.rs @@ -0,0 +1,77 @@ +use std::{ + any::Any, + collections::{HashMap, HashSet}, +}; + +use actix_web::http::header::HeaderMap; +use async_trait::async_trait; +use openid::Bearer; +use url::Url; + +use crate::rbac::user::OAuth; + +/// Trait implemented by every OAuth provider. +/// +#[async_trait] +pub trait OAuthProvider: Send + Sync + Any { + /// Build the redirect-to-provider authorization URL. + fn auth_url(&self, scope: &str, state: Option) -> Url; + + /// Exchange an authorization code for the complete session data. + /// + /// Implementors are responsible for all internal steps: + /// - token request + /// - ID-token decoding and validation + /// - JWKS key rotation / reconnection (OIDC-specific) + /// - userinfo fetch + /// + /// Requires `&mut self` so OIDC can swap out its client on JWKS rotation + /// while holding the `write()` lock that the caller already holds. + async fn exchange_code(&mut self, code: &str) -> Result; + + /// Refresh an existing access token using the credentials stored in the + /// user's `OAuth` record. + async fn refresh_token( + &self, + oauth: &OAuth, + scope: Option<&str>, + headers: HeaderMap, + ) -> Result; + + /// Return the provider's logout / end-session URL, if one exists. + fn logout_url(&self) -> Option; +} + +// ── Output types ──────────────────────────────────────────────────────────── + +/// Everything produced by a successful code exchange. +#[derive(Debug)] +pub struct OAuthSession { + /// Stored verbatim in `OAuth::bearer` in the user model. + pub bearer: Bearer, + pub claims: ProviderClaims, + pub userinfo: ProviderUserInfo, +} + +/// Identity claims extracted from the ID token / JWT. +#[derive(Debug, Clone)] +pub struct ProviderClaims { + /// `sub` – stable unique identifier for the user at this provider. + pub sub: Option, + pub email: Option, + pub name: Option, + /// Group memberships / roles declared by the provider. + pub groups: HashSet, + /// Any additional claims not captured by the fields above. + pub other: HashMap, +} + +/// Data returned by the provider's userinfo endpoint. +#[derive(Debug, Clone)] +pub struct ProviderUserInfo { + pub sub: Option, + pub email: Option, + pub name: Option, + pub preferred_username: Option, + pub picture: Option, +} diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index c6072798d..a35791a5c 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -1073,18 +1073,18 @@ impl Parseable { pub async fn suspend_tenant_service( &self, - tenant_id: String, - service: Service, + tenant_id: &str, + service: &Service, ) -> Result<(), anyhow::Error> { - TENANT_METADATA.suspend_service(&tenant_id, service.clone()); + TENANT_METADATA.suspend_service(tenant_id, service); // write to disk - let tenant_id = &Some(tenant_id); + let tenant_id = &Some(tenant_id.to_owned()); let mut meta = get_metadata(tenant_id).await?; if let Some(sus) = meta.suspended_services.as_mut() { - sus.insert(service); + sus.insert(service.clone()); } else { - meta.suspended_services = Some(HashSet::from_iter([service])); + meta.suspended_services = Some(HashSet::from_iter([service.clone()])); } put_remote_metadata(&meta, tenant_id).await?; @@ -1093,16 +1093,16 @@ impl Parseable { pub async fn resume_tenant_service( &self, - tenant_id: String, - service: Service, + tenant_id: &str, + service: &Service, ) -> Result<(), anyhow::Error> { - TENANT_METADATA.resume_service(&tenant_id, service.clone()); + TENANT_METADATA.resume_service(tenant_id, service); // write to disk - let tenant_id = &Some(tenant_id); + let tenant_id = &Some(tenant_id.to_owned()); let mut meta = get_metadata(tenant_id).await?; if let Some(sus) = meta.suspended_services.as_mut() { - sus.remove(&service); + sus.remove(service); } put_remote_metadata(&meta, tenant_id).await?; diff --git a/src/rbac/user.rs b/src/rbac/user.rs index c438012e6..5e1a6a81f 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -253,6 +253,20 @@ impl From for UserInfo { } } +impl From for UserInfo { + fn from(info: crate::oauth::ProviderUserInfo) -> Self { + UserInfo { + sub: info.sub, + name: info.name, + preferred_username: info.preferred_username, + picture: info.picture.as_deref().and_then(|s| s.parse().ok()), + email: info.email, + gender: None, + updated_at: None, + } + } +} + /// Represents a user in a UserGroup - simplified structure for both user types #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct GroupUser { diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 78e6106e2..5b3cc1b66 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -24,11 +24,14 @@ use crate::event::format::json; use crate::handlers::TelemetryType; use crate::handlers::http::cluster::send_query_request; use crate::handlers::http::ingest::PostError; +use crate::handlers::http::middleware::CLUSTER_SECRET; +use crate::handlers::http::middleware::CLUSTER_SECRET_HEADER; use crate::handlers::http::query::Query; use crate::handlers::http::query::QueryError; use crate::handlers::http::query::query; use crate::metadata::SchemaVersion; use crate::option::Mode; +use crate::parseable::DEFAULT_TENANT; use crate::parseable::PARSEABLE; use crate::query::QUERY_SESSION_STATE; use crate::storage::ObjectStorageError; @@ -563,13 +566,27 @@ pub async fn get_dataset_stats( serde_json::from_str(body_str).map_err(|e| QueryError::CustomError(e.to_string()))? } Mode::Prism => { - let auth = if let Some(tenant) = tenant_id.as_ref() - && let Some(header) = TENANT_METADATA.get_global_query_auth(tenant) - { - let mut map = HeaderMap::new(); + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let auth = if let Some((_, hash)) = CLUSTER_SECRET.get() { + let mut map = actix_web::http::header::HeaderMap::new(); + if let Some(header) = TENANT_METADATA.get_global_query_auth(tenant) { + map.insert( + HeaderName::from_static("authorization"), + HeaderValue::from_str(&header).unwrap(), + ); + } + let userid = get_user_from_request(&req).unwrap(); + map.insert( + HeaderName::from_static(CLUSTER_SECRET_HEADER), + HeaderValue::from_str(hash).unwrap(), + ); map.insert( - HeaderName::from_static("authorization"), - HeaderValue::from_str(&header).unwrap(), + HeaderName::from_static("intra-cluster-tenant"), + HeaderValue::from_str(tenant).unwrap(), + ); + map.insert( + HeaderName::from_static("intra-cluster-userid"), + HeaderValue::from_str(&userid).unwrap(), ); Some(map) } else { @@ -589,6 +606,7 @@ pub async fn get_dataset_stats( } Some(map) }; + let response = match send_query_request(auth, &query_request, &tenant_id).await { Ok((query_response, _)) => query_response, Err(err) => { diff --git a/src/tenants/mod.rs b/src/tenants/mod.rs index 62926cb30..80197278f 100644 --- a/src/tenants/mod.rs +++ b/src/tenants/mod.rs @@ -90,16 +90,16 @@ impl TenantMetadata { } } - pub fn suspend_service(&self, tenant_id: &str, service: Service) { + pub fn suspend_service(&self, tenant_id: &str, service: &Service) { if let Some(mut tenant) = self.tenants.get_mut(tenant_id) { - tenant.suspended_services.insert(service); + tenant.suspended_services.insert(service.clone()); tenant.meta.suspended_services = Some(tenant.suspended_services.clone()); } } - pub fn resume_service(&self, tenant_id: &str, service: Service) { + pub fn resume_service(&self, tenant_id: &str, service: &Service) { if let Some(mut tenant) = self.tenants.get_mut(tenant_id) { - tenant.suspended_services.remove(&service); + tenant.suspended_services.remove(service); tenant.meta.suspended_services = if tenant.suspended_services.is_empty() { None } else {