Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ impl MuxKeysLoader {
"Mux keys URL {url} is insecure; consider using HTTPS if possible instead"
);
}
let url = url.as_str();
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
let response = client.get(url).send().await?;
let pubkey_bytes = safe_read_http_response(response, MUXER_HTTP_MAX_LENGTH).await?;
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/pbs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum PbsError {
#[error("json decode error: {err:?}, raw: {raw}")]
JsonDecode { err: serde_json::Error, raw: String },

#[error("ssz decode error: {err:?}, fork: {fork}")]
SSZDecode { err: String, fork: ForkName },

#[error("{0}")]
ReadResponse(#[from] ResponseReadError),

Expand Down
133 changes: 91 additions & 42 deletions crates/common/src/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use bytes::Bytes;
use futures::StreamExt;
use headers_accept::Accept;
use lh_types::{BeaconBlock, ForkName};
use mediatype::{MediaType, ReadParams};
use mediatype::{MediaType, ReadParams, names};
use reqwest::{
Response,
header::{ACCEPT, CONTENT_TYPE, HeaderMap},
header::{ACCEPT, CONTENT_TYPE, HeaderMap, ToStrError},
};
use thiserror::Error;

Expand All @@ -37,6 +37,18 @@ pub enum ResponseReadError {
NonSuccess { status_code: u16, error_msg: String, request_url: String },
}

#[derive(Debug, Error)]
pub enum AcceptedEncodingsError {
#[error("invalid header string: {0}")]
InvalidString(#[from] ToStrError),

#[error("invalid accept header")]
InvalidAccept,

#[error("unsupported accept type")]
UnsupportedAcceptType,
}

#[cfg(feature = "testing-flags")]
thread_local! {
static IGNORE_CONTENT_LENGTH: Cell<bool> = const { Cell::new(false) };
Expand Down Expand Up @@ -108,7 +120,7 @@ pub async fn read_chunked_body_with_max(
/// Reads an HTTP response body with a size limit, erroring on non-success
/// status or read failure.
pub async fn safe_read_http_response(
response: reqwest::Response,
response: Response,
max_size: usize,
) -> Result<Vec<u8>, ResponseReadError> {
let status_code = response.status();
Expand Down Expand Up @@ -140,12 +152,6 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<Head
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?)
}

/// Deterministic outbound `Accept` header used when PBS asks a relay for a
/// response it will itself decode (validation mode On/Extra). SSZ is preferred
/// for wire efficiency. Emitted verbatim so packet captures and support
/// tickets are reproducible.
pub const OUTBOUND_ACCEPT: &str = "application/octet-stream;q=1.0,application/json;q=0.9";

/// Default encoding used when the caller does not express a format
/// preference. This covers both `Accept: */*` (see `get_accept_types`) and
/// a missing Content-Type header on inbound or relay responses (see
Expand Down Expand Up @@ -190,24 +196,32 @@ impl IntoIterator for AcceptedEncodings {
/// Parse the ACCEPT header into a q-value ordered [`AcceptedEncodings`]
/// (highest preference first, deduplicated), defaulting to the request's
/// Content-Type when no Accept header is present. Returns an error only if
/// every media type in the header is malformed or unsupported. Supports
/// requests with multiple ACCEPT headers or headers with multiple media
/// types. `q=0` entries are treated as explicit rejections per RFC 7231
/// every media type in the header is malformed or unsupported.
/// Multiple Accept header fields are combined before parsing so q-value
/// ordering is applied globally across all media ranges.
/// `q=0` entries are treated as explicit rejections per RFC 7231
/// §5.3.1 and are skipped.
///
/// The returned order honors the RFC 9110 §12.5.1 precedence rules already
/// applied by `headers_accept::Accept::media_types()` (specificity, then
/// q-value, then original order).
pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodings> {
pub fn get_accept_types(
req_headers: &HeaderMap,
) -> Result<AcceptedEncodings, AcceptedEncodingsError> {
// Only two supported media types, so the ordered set is at most two
// entries: primary + optional fallback.
let mut primary: Option<EncodingType> = None;
let mut fallback: Option<EncodingType> = None;
let mut saw_any = false;
let mut had_supported = false;
let mut accept_values = Vec::new();

for header in req_headers.get_all(ACCEPT).iter() {
let accept = Accept::from_str(header.to_str()?)
.map_err(|e| eyre::eyre!("invalid accept header: {e}"))?;
accept_values.push(header.to_str()?);
}
if !accept_values.is_empty() {
let accept_str = accept_values.join(",");
let accept =
Accept::from_str(&accept_str).map_err(|_| AcceptedEncodingsError::InvalidAccept)?;
for mt in accept.media_types() {
saw_any = true;

Expand All @@ -221,13 +235,7 @@ pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodin
continue;
}

let parsed = match mt.essence().to_string().as_str() {
APPLICATION_OCTET_STREAM => Some(EncodingType::Ssz),
APPLICATION_JSON => Some(EncodingType::Json),
WILDCARD => Some(NO_PREFERENCE_DEFAULT),
_ => None,
};
if let Some(enc) = parsed {
if let Some(enc) = essence_encoding(&mt.essence()) {
had_supported = true;
match primary {
None => primary = Some(enc),
Expand All @@ -243,14 +251,30 @@ pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodin
}

if saw_any && !had_supported {
eyre::bail!("unsupported accept type");
return Err(AcceptedEncodingsError::UnsupportedAcceptType)
}

// No accept header (or only q=0 rejections): fall back to the request
// Content-Type, which mirrors the historical behavior.
Ok(AcceptedEncodings::single(get_content_type(req_headers)))
}

fn essence_encoding(mt: &MediaType) -> Option<EncodingType> {
if mt.suffix.is_some() {
return None;
}

match () {
_ if mt.ty == names::_STAR && mt.subty == names::_STAR => Some(NO_PREFERENCE_DEFAULT),
_ if mt.ty == names::APPLICATION && mt.subty == names::OCTET_STREAM => {
Some(EncodingType::Ssz)
}
_ if mt.ty == names::APPLICATION && mt.subty == names::JSON => Some(EncodingType::Json),
_ if mt.ty == names::APPLICATION && mt.subty == names::_STAR => Some(NO_PREFERENCE_DEFAULT),
_ => None,
Comment thread
JasonVranek marked this conversation as resolved.
}
}

/// Compute the q-value for the `index`-th preferred encoding when building an
/// outbound `Accept` header. The first entry gets q=1.0, each subsequent entry
/// decreases by 0.1, and the value is clamped to a minimum of 0.1 so we never
Expand All @@ -269,17 +293,19 @@ fn format_accept_entry(enc: EncodingType, q: f32) -> String {
format!("{};q={:.1}", enc.content_type(), q)
}

/// Build an `Accept` header string that mirrors the caller's preference order
/// Build an `Accept` header that mirrors the caller's preference order
/// so the relay sees the same priority the beacon node asked us for. Each
/// subsequent entry receives a q-value 0.1 lower than the previous one,
/// starting at 1.0.
pub fn build_outbound_accept(preferred: AcceptedEncodings) -> String {
preferred
/// starting at 1.0. Returns a ready-to-use `HeaderValue` — the output is
/// always valid ASCII, so infallible.
pub fn build_outbound_accept(preferred: AcceptedEncodings) -> HeaderValue {
let s = preferred
.iter()
.enumerate()
.map(|(i, enc)| format_accept_entry(enc, accept_q_value_for_index(i)))
.collect::<Vec<_>>()
.join(",")
.join(",");
HeaderValue::from_str(&s).expect("build_outbound_accept produces valid header value")
}

pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType {
Expand Down Expand Up @@ -345,11 +371,7 @@ impl FromStr for EncodingType {
// (e.g. `application/json; charset=utf-8`). Compare essence only.
let parsed =
MediaType::parse(value).map_err(|e| format!("invalid content type {value}: {e}"))?;
match parsed.essence().to_string().to_ascii_lowercase().as_str() {
APPLICATION_JSON => Ok(EncodingType::Json),
APPLICATION_OCTET_STREAM => Ok(EncodingType::Ssz),
_ => Err(format!("unsupported encoding type: {value}")),
}
essence_encoding(&parsed).ok_or_else(|| format!("unsupported encoding type: {value}"))
}
}

Expand Down Expand Up @@ -430,7 +452,7 @@ mod test {

use super::{
APPLICATION_JSON, APPLICATION_OCTET_STREAM, AcceptedEncodings, BodyDeserializeError,
CONSENSUS_VERSION_HEADER, EncodingType, NO_PREFERENCE_DEFAULT, OUTBOUND_ACCEPT, WILDCARD,
CONSENSUS_VERSION_HEADER, EncodingType, NO_PREFERENCE_DEFAULT, WILDCARD,
accept_q_value_for_index, build_outbound_accept, deserialize_body, format_accept_entry,
get_accept_types, get_consensus_version_header, get_content_type,
parse_response_encoding_and_fork,
Expand Down Expand Up @@ -525,6 +547,15 @@ mod test {
assert!(result.is_err());
}

/// Test rejecting an unknown Accept: / type
#[test]
fn test_invalid_accept_header_type_slash() {
let mut headers = HeaderMap::new();
headers.append(ACCEPT, HeaderValue::from_str("/").unwrap());
let result = get_accept_types(&headers);
assert!(result.is_err());
}

/// Test accepting one header with multiple values
#[test]
fn test_accept_header_invalid_parse() {
Expand Down Expand Up @@ -689,6 +720,24 @@ mod test {
);
}

/// Multiple Accept header fields must be combined before parsing so
/// q-values are ordered globally across all media ranges, not per
/// header field.
#[test]
fn test_multiple_accept_headers_q_value_ordering() {
let mut headers = HeaderMap::new();

// SSZ appears in the first header field but has a lower q-value.
// JSON appears in the second header field and should win globally.
headers.append(ACCEPT, HeaderValue::from_str("application/octet-stream;q=0.1").unwrap());
headers.append(ACCEPT, HeaderValue::from_str("application/json;q=1.0").unwrap());

assert_eq!(get_accept_types(&headers).unwrap(), AcceptedEncodings {
primary: EncodingType::Json,
fallback: Some(EncodingType::Ssz),
});
}

/// Once primary and fallback are filled, further supported entries must
/// not overwrite fallback. (Belt-and-suspenders — only two supported
/// variants exist today, so this is mostly a guard against future
Expand Down Expand Up @@ -746,13 +795,6 @@ mod test {
);
}

/// Snapshot test: constant emits exactly what we document in
/// OUTBOUND_ACCEPT.
#[test]
fn test_outbound_accept_constant_snapshot() {
assert_eq!(OUTBOUND_ACCEPT, "application/octet-stream;q=1.0,application/json;q=0.9");
}

/// q-value ladder: first entry is 1.0, each subsequent entry drops by 0.1.
#[test]
fn test_accept_q_value_for_index_ladder() {
Expand Down Expand Up @@ -810,6 +852,13 @@ mod test {
assert_eq!(get_content_type(&headers), EncodingType::Json);
}

#[test]
fn test_content_type_invalid_defaults_to_json() {
let mut headers = HeaderMap::new();
headers.append(CONTENT_TYPE, HeaderValue::from_str("/").unwrap());
assert_eq!(get_content_type(&headers), EncodingType::Json);
}

// ── get_consensus_version_header ─────────────────────────────────────────

#[test]
Expand Down
13 changes: 10 additions & 3 deletions crates/pbs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use axum::{http::StatusCode, response::IntoResponse};
use cb_common::wire::BodyDeserializeError;
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use cb_common::wire::{AcceptedEncodingsError, BodyDeserializeError};
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -13,6 +16,8 @@ pub enum PbsClientError {
Internal,
#[error("failed to deserialize body: {0}")]
DecodeError(#[from] BodyDeserializeError),
#[error("invalid accept types: {0}")]
HeaderError(#[from] AcceptedEncodingsError),
}

impl PbsClientError {
Expand All @@ -22,17 +27,19 @@ impl PbsClientError {
PbsClientError::NoPayload => StatusCode::BAD_GATEWAY,
PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR,
PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST,
PbsClientError::HeaderError(_) => StatusCode::NOT_ACCEPTABLE,
}
}
}

impl IntoResponse for PbsClientError {
fn into_response(self) -> axum::response::Response {
fn into_response(self) -> Response {
let msg = match &self {
PbsClientError::NoResponse => "no response from relays".to_string(),
PbsClientError::NoPayload => "no payload from relays".to_string(),
PbsClientError::Internal => "internal server error".to_string(),
PbsClientError::DecodeError(e) => format!("error decoding request: {e}"),
PbsClientError::HeaderError(e) => format!("header error: {e}"),
};

(self.status_code(), msg).into_response()
Expand Down
Loading
Loading