Skip to content
Open
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ impl MuxKeysLoader {
"Mux keys URL {url} is insecure; consider using HTTPS if possible instead"
);
}
let url = url.as_str();
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
let response = client.get(url).send().await?;
let pubkey_bytes = safe_read_http_response(response, MUXER_HTTP_MAX_LENGTH).await?;
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/pbs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum PbsError {
#[error("json decode error: {err:?}, raw: {raw}")]
JsonDecode { err: serde_json::Error, raw: String },

#[error("ssz decode error: {err:?}, fork: {fork}")]
SSZDecode { err: String, fork: ForkName },

#[error("{0}")]
ReadResponse(#[from] ResponseReadError),

Expand Down
133 changes: 91 additions & 42 deletions crates/common/src/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use bytes::Bytes;
use futures::StreamExt;
use headers_accept::Accept;
use lh_types::{BeaconBlock, ForkName};
use mediatype::{MediaType, ReadParams};
use mediatype::{MediaType, ReadParams, names};
use reqwest::{
Response,
header::{ACCEPT, CONTENT_TYPE, HeaderMap},
header::{ACCEPT, CONTENT_TYPE, HeaderMap, ToStrError},
};
use thiserror::Error;

Expand All @@ -37,6 +37,18 @@ pub enum ResponseReadError {
NonSuccess { status_code: u16, error_msg: String, request_url: String },
}

#[derive(Debug, Error)]
pub enum AcceptedEncodingsError {
#[error("invalid header string: {0}")]
InvalidString(#[from] ToStrError),

#[error("invalid accept header")]
InvalidAccept,

#[error("unsupported accept type")]
UnsupportedAcceptType,
}

#[cfg(feature = "testing-flags")]
thread_local! {
static IGNORE_CONTENT_LENGTH: Cell<bool> = const { Cell::new(false) };
Expand Down Expand Up @@ -108,7 +120,7 @@ pub async fn read_chunked_body_with_max(
/// Reads an HTTP response body with a size limit, erroring on non-success
/// status or read failure.
pub async fn safe_read_http_response(
response: reqwest::Response,
response: Response,
max_size: usize,
) -> Result<Vec<u8>, ResponseReadError> {
let status_code = response.status();
Expand Down Expand Up @@ -140,12 +152,6 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<Head
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?)
}

/// Deterministic outbound `Accept` header used when PBS asks a relay for a
/// response it will itself decode (validation mode On/Extra). SSZ is preferred
/// for wire efficiency. Emitted verbatim so packet captures and support
/// tickets are reproducible.
pub const OUTBOUND_ACCEPT: &str = "application/octet-stream;q=1.0,application/json;q=0.9";

/// Default encoding used when the caller does not express a format
/// preference. This covers both `Accept: */*` (see `get_accept_types`) and
/// a missing Content-Type header on inbound or relay responses (see
Expand Down Expand Up @@ -190,24 +196,32 @@ impl IntoIterator for AcceptedEncodings {
/// Parse the ACCEPT header into a q-value ordered [`AcceptedEncodings`]
/// (highest preference first, deduplicated), defaulting to the request's
/// Content-Type when no Accept header is present. Returns an error only if
/// every media type in the header is malformed or unsupported. Supports
/// requests with multiple ACCEPT headers or headers with multiple media
/// types. `q=0` entries are treated as explicit rejections per RFC 7231
/// every media type in the header is malformed or unsupported.
/// Multiple Accept header fields are combined before parsing so q-value
/// ordering is applied globally across all media ranges.
/// `q=0` entries are treated as explicit rejections per RFC 7231
/// §5.3.1 and are skipped.
///
/// The returned order honors the RFC 9110 §12.5.1 precedence rules already
/// applied by `headers_accept::Accept::media_types()` (specificity, then
/// q-value, then original order).
pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodings> {
pub fn get_accept_types(
req_headers: &HeaderMap,
) -> Result<AcceptedEncodings, AcceptedEncodingsError> {
// Only two supported media types, so the ordered set is at most two
// entries: primary + optional fallback.
let mut primary: Option<EncodingType> = None;
let mut fallback: Option<EncodingType> = None;
let mut saw_any = false;
let mut had_supported = false;
let mut accept_values = Vec::new();

for header in req_headers.get_all(ACCEPT).iter() {
let accept = Accept::from_str(header.to_str()?)
.map_err(|e| eyre::eyre!("invalid accept header: {e}"))?;
accept_values.push(header.to_str()?);
}
if !accept_values.is_empty() {
let accept_str = accept_values.join(",");
let accept =
Accept::from_str(&accept_str).map_err(|_| AcceptedEncodingsError::InvalidAccept)?;
for mt in accept.media_types() {
saw_any = true;

Expand All @@ -221,13 +235,7 @@ pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodin
continue;
}

let parsed = match mt.essence().to_string().as_str() {
APPLICATION_OCTET_STREAM => Some(EncodingType::Ssz),
APPLICATION_JSON => Some(EncodingType::Json),
WILDCARD => Some(NO_PREFERENCE_DEFAULT),
_ => None,
};
if let Some(enc) = parsed {
if let Some(enc) = essence_encoding(&mt.essence()) {
had_supported = true;
match primary {
None => primary = Some(enc),
Expand All @@ -243,14 +251,30 @@ pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result<AcceptedEncodin
}

if saw_any && !had_supported {
eyre::bail!("unsupported accept type");
return Err(AcceptedEncodingsError::UnsupportedAcceptType)
}

// No accept header (or only q=0 rejections): fall back to the request
// Content-Type, which mirrors the historical behavior.
Ok(AcceptedEncodings::single(get_content_type(req_headers)))
}

fn essence_encoding(mt: &MediaType) -> Option<EncodingType> {
if mt.suffix.is_some() {
return None;
}

match () {
_ if mt.ty == names::_STAR && mt.subty == names::_STAR => Some(NO_PREFERENCE_DEFAULT),
_ if mt.ty == names::APPLICATION && mt.subty == names::OCTET_STREAM => {
Some(EncodingType::Ssz)
}
_ if mt.ty == names::APPLICATION && mt.subty == names::JSON => Some(EncodingType::Json),
_ if mt.ty == names::APPLICATION && mt.subty == names::_STAR => Some(NO_PREFERENCE_DEFAULT),
_ => None,
Comment thread
JasonVranek marked this conversation as resolved.
}
}

/// Compute the q-value for the `index`-th preferred encoding when building an
/// outbound `Accept` header. The first entry gets q=1.0, each subsequent entry
/// decreases by 0.1, and the value is clamped to a minimum of 0.1 so we never
Expand All @@ -269,17 +293,19 @@ fn format_accept_entry(enc: EncodingType, q: f32) -> String {
format!("{};q={:.1}", enc.content_type(), q)
}

/// Build an `Accept` header string that mirrors the caller's preference order
/// Build an `Accept` header that mirrors the caller's preference order
/// so the relay sees the same priority the beacon node asked us for. Each
/// subsequent entry receives a q-value 0.1 lower than the previous one,
/// starting at 1.0.
pub fn build_outbound_accept(preferred: AcceptedEncodings) -> String {
preferred
/// starting at 1.0. Returns a ready-to-use `HeaderValue` — the output is
/// always valid ASCII, so infallible.
pub fn build_outbound_accept(preferred: AcceptedEncodings) -> HeaderValue {
let s = preferred
.iter()
.enumerate()
.map(|(i, enc)| format_accept_entry(enc, accept_q_value_for_index(i)))
.collect::<Vec<_>>()
.join(",")
.join(",");
HeaderValue::from_str(&s).expect("build_outbound_accept produces valid header value")
}

pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType {
Expand Down Expand Up @@ -345,11 +371,7 @@ impl FromStr for EncodingType {
// (e.g. `application/json; charset=utf-8`). Compare essence only.
let parsed =
MediaType::parse(value).map_err(|e| format!("invalid content type {value}: {e}"))?;
match parsed.essence().to_string().to_ascii_lowercase().as_str() {
APPLICATION_JSON => Ok(EncodingType::Json),
APPLICATION_OCTET_STREAM => Ok(EncodingType::Ssz),
_ => Err(format!("unsupported encoding type: {value}")),
}
essence_encoding(&parsed).ok_or_else(|| format!("unsupported encoding type: {value}"))
}
}

Expand Down Expand Up @@ -430,7 +452,7 @@ mod test {

use super::{
APPLICATION_JSON, APPLICATION_OCTET_STREAM, AcceptedEncodings, BodyDeserializeError,
CONSENSUS_VERSION_HEADER, EncodingType, NO_PREFERENCE_DEFAULT, OUTBOUND_ACCEPT, WILDCARD,
CONSENSUS_VERSION_HEADER, EncodingType, NO_PREFERENCE_DEFAULT, WILDCARD,
accept_q_value_for_index, build_outbound_accept, deserialize_body, format_accept_entry,
get_accept_types, get_consensus_version_header, get_content_type,
parse_response_encoding_and_fork,
Expand Down Expand Up @@ -525,6 +547,15 @@ mod test {
assert!(result.is_err());
}

/// Test rejecting an unknown Accept: / type
#[test]
fn test_invalid_accept_header_type_slash() {
let mut headers = HeaderMap::new();
headers.append(ACCEPT, HeaderValue::from_str("/").unwrap());
let result = get_accept_types(&headers);
assert!(result.is_err());
}

/// Test accepting one header with multiple values
#[test]
fn test_accept_header_invalid_parse() {
Expand Down Expand Up @@ -689,6 +720,24 @@ mod test {
);
}

/// Multiple Accept header fields must be combined before parsing so
/// q-values are ordered globally across all media ranges, not per
/// header field.
#[test]
fn test_multiple_accept_headers_q_value_ordering() {
let mut headers = HeaderMap::new();

// SSZ appears in the first header field but has a lower q-value.
// JSON appears in the second header field and should win globally.
headers.append(ACCEPT, HeaderValue::from_str("application/octet-stream;q=0.1").unwrap());
headers.append(ACCEPT, HeaderValue::from_str("application/json;q=1.0").unwrap());

assert_eq!(get_accept_types(&headers).unwrap(), AcceptedEncodings {
primary: EncodingType::Json,
fallback: Some(EncodingType::Ssz),
});
}

/// Once primary and fallback are filled, further supported entries must
/// not overwrite fallback. (Belt-and-suspenders — only two supported
/// variants exist today, so this is mostly a guard against future
Expand Down Expand Up @@ -746,13 +795,6 @@ mod test {
);
}

/// Snapshot test: constant emits exactly what we document in
/// OUTBOUND_ACCEPT.
#[test]
fn test_outbound_accept_constant_snapshot() {
assert_eq!(OUTBOUND_ACCEPT, "application/octet-stream;q=1.0,application/json;q=0.9");
}

/// q-value ladder: first entry is 1.0, each subsequent entry drops by 0.1.
#[test]
fn test_accept_q_value_for_index_ladder() {
Expand Down Expand Up @@ -810,6 +852,13 @@ mod test {
assert_eq!(get_content_type(&headers), EncodingType::Json);
}

#[test]
fn test_content_type_invalid_defaults_to_json() {
let mut headers = HeaderMap::new();
headers.append(CONTENT_TYPE, HeaderValue::from_str("/").unwrap());
assert_eq!(get_content_type(&headers), EncodingType::Json);
}

// ── get_consensus_version_header ─────────────────────────────────────────

#[test]
Expand Down
13 changes: 10 additions & 3 deletions crates/pbs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use axum::{http::StatusCode, response::IntoResponse};
use cb_common::wire::BodyDeserializeError;
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use cb_common::wire::{AcceptedEncodingsError, BodyDeserializeError};
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -13,6 +16,8 @@ pub enum PbsClientError {
Internal,
#[error("failed to deserialize body: {0}")]
DecodeError(#[from] BodyDeserializeError),
#[error("invalid accept types: {0}")]
HeaderError(#[from] AcceptedEncodingsError),
}

impl PbsClientError {
Expand All @@ -22,17 +27,19 @@ impl PbsClientError {
PbsClientError::NoPayload => StatusCode::BAD_GATEWAY,
PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR,
PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST,
PbsClientError::HeaderError(_) => StatusCode::NOT_ACCEPTABLE,
}
}
}

impl IntoResponse for PbsClientError {
fn into_response(self) -> axum::response::Response {
fn into_response(self) -> Response {
let msg = match &self {
PbsClientError::NoResponse => "no response from relays".to_string(),
PbsClientError::NoPayload => "no payload from relays".to_string(),
PbsClientError::Internal => "internal server error".to_string(),
PbsClientError::DecodeError(e) => format!("error decoding request: {e}"),
PbsClientError::HeaderError(e) => format!("header error: {e}"),
};

(self.status_code(), msg).into_response()
Expand Down
Loading
Loading