Skip to main content

foundry_common/provider/
runtime_transport.rs

1//! Runtime transport that connects on first request, which can take either of an HTTP,
2//! WebSocket, or IPC transport. Retries are handled by a client layer (e.g.,
3//! `RetryBackoffLayer`) when used.
4
5use crate::{
6    DEFAULT_USER_AGENT, REQUEST_TIMEOUT,
7    provider::mpp::{keys::discover_mpp_key, transport::LazyMppHttpTransport, ws::MppWsConnect},
8};
9use alloy_json_rpc::{RequestPacket, ResponsePacket};
10use alloy_pubsub::{PubSubConnect, PubSubFrontend};
11use alloy_rpc_types_engine::{Claims, JwtSecret};
12use alloy_transport::{
13    Authorization, BoxTransport, TransportError, TransportErrorKind, TransportFut,
14    utils::guess_local_url,
15};
16use alloy_transport_ipc::IpcConnect;
17use alloy_transport_ws::WsConnect;
18use reqwest::header::{HeaderName, HeaderValue};
19use std::{fmt, path::PathBuf, str::FromStr, sync::Arc};
20use thiserror::Error;
21use tokio::sync::RwLock;
22use tower::Service;
23use url::Url;
24
25/// Known MPP-enabled RPC host suffixes.
26///
27/// Endpoints matching these patterns are always connected via [`MppWsConnect`],
28/// regardless of whether local MPP keys have been discovered.
29const KNOWN_MPP_HOSTS: &[&str] = &[".mpp.tempo.xyz"];
30
31/// Returns `true` if `url` points to a known MPP-enabled RPC service.
32fn is_known_mpp_endpoint(url: &Url) -> bool {
33    url.host_str().is_some_and(|host| KNOWN_MPP_HOSTS.iter().any(|suffix| host.ends_with(suffix)))
34}
35
36/// An enum representing the different transports that can be used to connect to a runtime.
37/// Only meant to be used internally by [RuntimeTransport].
38#[derive(Clone, Debug)]
39pub enum InnerTransport {
40    /// HTTP transport with lazy MPP 402 handling.
41    ///
42    /// For known Tempo endpoints, the MPP layer additionally runs the
43    /// `wallet.tempo.xyz` device-code flow on a 402 when no local access key
44    /// is configured (see [`crate::tempo::ensure_access_key`]).
45    Http(LazyMppHttpTransport),
46    /// WebSocket transport
47    Ws(PubSubFrontend),
48    /// IPC transport
49    Ipc(PubSubFrontend),
50}
51
52/// Error type for the runtime transport.
53#[derive(Error, Debug)]
54pub enum RuntimeTransportError {
55    /// Internal transport error
56    #[error("Internal transport error: {0} with {1}")]
57    TransportError(TransportError, String),
58
59    /// Invalid URL scheme
60    #[error("URL scheme is not supported: {0}")]
61    BadScheme(String),
62
63    /// Invalid HTTP header
64    #[error("Invalid HTTP header: {0}")]
65    BadHeader(String),
66
67    /// Invalid file path
68    #[error("Invalid IPC file path: {0}")]
69    BadPath(String),
70
71    /// Invalid construction of Http provider
72    #[error(transparent)]
73    HttpConstructionError(#[from] reqwest::Error),
74
75    /// Invalid JWT
76    #[error("Invalid JWT: {0}")]
77    InvalidJwt(String),
78}
79
80/// Runtime transport that only connects on first request.
81///
82/// A runtime transport is a custom [`alloy_transport::Transport`] that only connects when the
83/// *first* request is made. When the first request is made, it will connect to the runtime using
84/// either an HTTP WebSocket, or IPC transport depending on the URL used.
85/// Retries for rate-limiting and timeout-related errors are handled by an external
86/// client layer (e.g., `RetryBackoffLayer`) when configured.
87#[derive(Clone, Debug)]
88pub struct RuntimeTransport {
89    /// The inner actual transport used.
90    inner: Arc<RwLock<Option<InnerTransport>>>,
91    /// The URL to connect to.
92    url: Url,
93    /// The headers to use for requests.
94    headers: Vec<String>,
95    /// The JWT to use for requests.
96    jwt: Option<String>,
97    /// The timeout for requests.
98    timeout: std::time::Duration,
99    /// Whether to accept invalid certificates.
100    accept_invalid_certs: bool,
101    /// Whether to disable automatic proxy detection.
102    no_proxy: bool,
103}
104
105/// A builder for [RuntimeTransport].
106#[derive(Debug)]
107pub struct RuntimeTransportBuilder {
108    url: Url,
109    headers: Vec<String>,
110    jwt: Option<String>,
111    timeout: std::time::Duration,
112    accept_invalid_certs: bool,
113    no_proxy: bool,
114}
115
116impl RuntimeTransportBuilder {
117    /// Create a new builder with the given URL.
118    pub const fn new(url: Url) -> Self {
119        Self {
120            url,
121            headers: vec![],
122            jwt: None,
123            timeout: REQUEST_TIMEOUT,
124            accept_invalid_certs: false,
125            no_proxy: false,
126        }
127    }
128
129    /// Set the URL for the transport.
130    pub fn with_headers(mut self, headers: Vec<String>) -> Self {
131        self.headers = headers;
132        self
133    }
134
135    /// Set the JWT for the transport.
136    pub fn with_jwt(mut self, jwt: Option<String>) -> Self {
137        self.jwt = jwt;
138        self
139    }
140
141    /// Set the timeout for the transport.
142    pub const fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
143        self.timeout = timeout;
144        self
145    }
146
147    /// Set whether to accept invalid certificates.
148    pub const fn accept_invalid_certs(mut self, accept_invalid_certs: bool) -> Self {
149        self.accept_invalid_certs = accept_invalid_certs;
150        self
151    }
152
153    /// Set whether to disable automatic proxy detection.
154    ///
155    /// This can help in sandboxed environments (e.g., Cursor IDE sandbox, macOS App Sandbox)
156    /// where system proxy detection via SCDynamicStore causes crashes.
157    pub const fn no_proxy(mut self, no_proxy: bool) -> Self {
158        self.no_proxy = no_proxy;
159        self
160    }
161
162    /// Builds the [RuntimeTransport] and returns it in a disconnected state.
163    /// The runtime transport will then connect when the first request happens.
164    pub fn build(self) -> RuntimeTransport {
165        RuntimeTransport {
166            inner: Arc::new(RwLock::new(None)),
167            url: self.url,
168            headers: self.headers,
169            jwt: self.jwt,
170            timeout: self.timeout,
171            accept_invalid_certs: self.accept_invalid_certs,
172            no_proxy: self.no_proxy,
173        }
174    }
175}
176
177impl fmt::Display for RuntimeTransport {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        write!(f, "RuntimeTransport {}", self.url)
180    }
181}
182
183impl RuntimeTransport {
184    /// Connects the underlying transport, depending on the URL scheme.
185    pub async fn connect(&self) -> Result<InnerTransport, RuntimeTransportError> {
186        match self.url.scheme() {
187            "http" | "https" => self.connect_http(),
188            "ws" | "wss" => self.connect_ws().await,
189            "file" => self.connect_ipc().await,
190            _ => Err(RuntimeTransportError::BadScheme(self.url.scheme().to_string())),
191        }
192    }
193
194    /// Creates a new reqwest client from this transport.
195    pub fn reqwest_client(&self) -> Result<reqwest::Client, RuntimeTransportError> {
196        let mut client_builder = reqwest::Client::builder()
197            .timeout(self.timeout)
198            .danger_accept_invalid_certs(self.accept_invalid_certs);
199
200        // Disable automatic proxy detection if requested. This helps in sandboxed environments
201        // (e.g., Cursor IDE sandbox, macOS App Sandbox) where system proxy detection via
202        // SCDynamicStore causes crashes. See: https://github.com/foundry-rs/foundry/issues/12733
203        if self.no_proxy || guess_local_url(self.url.as_str()) {
204            client_builder = client_builder.no_proxy();
205        }
206
207        let mut headers = reqwest::header::HeaderMap::new();
208
209        // If there's a JWT, add it to the headers if we can decode it.
210        if let Some(jwt) = self.jwt.clone() {
211            let auth =
212                build_auth(jwt).map_err(|e| RuntimeTransportError::InvalidJwt(e.to_string()))?;
213
214            let mut auth_value: HeaderValue =
215                HeaderValue::from_str(&auth.to_string()).expect("Header should be valid string");
216            auth_value.set_sensitive(true);
217
218            headers.insert(reqwest::header::AUTHORIZATION, auth_value);
219        };
220
221        // Add any custom headers.
222        for header in &self.headers {
223            let make_err = || RuntimeTransportError::BadHeader(header.clone());
224
225            let (key, val) = header.split_once(':').ok_or_else(make_err)?;
226
227            headers.insert(
228                HeaderName::from_str(key.trim()).map_err(|_| make_err())?,
229                HeaderValue::from_str(val.trim()).map_err(|_| make_err())?,
230            );
231        }
232
233        if !headers.contains_key(reqwest::header::USER_AGENT) {
234            headers.insert(
235                reqwest::header::USER_AGENT,
236                HeaderValue::from_str(DEFAULT_USER_AGENT)
237                    .expect("User-Agent should be valid string"),
238            );
239        }
240
241        // If MPP_API_KEY is set, attach it as x-api-key for gated MPP proxies.
242        // Does not override an explicit x-api-key header from the user.
243        if !headers.contains_key(HeaderName::from_static("x-api-key"))
244            && let Ok(api_key) = std::env::var("MPP_API_KEY")
245        {
246            let api_key = api_key.trim();
247            if !api_key.is_empty() {
248                let mut value = HeaderValue::from_str(api_key)
249                    .map_err(|_| RuntimeTransportError::BadHeader("MPP_API_KEY".to_string()))?;
250                value.set_sensitive(true);
251                headers.insert(HeaderName::from_static("x-api-key"), value);
252            }
253        }
254
255        client_builder = client_builder.default_headers(headers);
256
257        Ok(client_builder.build()?)
258    }
259
260    /// Connects to an HTTP transport with lazy MPP 402 handling.
261    fn connect_http(&self) -> Result<InnerTransport, RuntimeTransportError> {
262        let client = self.reqwest_client()?;
263        Ok(InnerTransport::Http(LazyMppHttpTransport::lazy(client, self.url.clone())))
264    }
265
266    /// Connects to a WS transport.
267    ///
268    /// Uses [`MppWsConnect`] (which performs the MPP challenge/credential
269    /// handshake at connect time) when the endpoint is a known MPP service or
270    /// when MPP keys are discoverable. Otherwise falls back to alloy's plain
271    /// [`WsConnect`] with zero overhead.
272    async fn connect_ws(&self) -> Result<InnerTransport, RuntimeTransportError> {
273        let auth = self.jwt.as_ref().and_then(|jwt| build_auth(jwt.clone()).ok());
274
275        let service = if is_known_mpp_endpoint(&self.url) && discover_mpp_key().is_some() {
276            let mut ws = MppWsConnect::new(self.url.to_string());
277            if let Some(auth) = auth {
278                ws = ws.with_auth(auth);
279            }
280            ws.into_service()
281                .await
282                .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
283        } else {
284            let mut ws = WsConnect::new(self.url.to_string());
285            if let Some(auth) = auth {
286                ws = ws.with_auth(auth);
287            }
288            ws.into_service()
289                .await
290                .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
291        };
292
293        Ok(InnerTransport::Ws(service))
294    }
295
296    /// Connects to an IPC transport.
297    async fn connect_ipc(&self) -> Result<InnerTransport, RuntimeTransportError> {
298        let path = url_to_file_path(&self.url)
299            .map_err(|_| RuntimeTransportError::BadPath(self.url.to_string()))?;
300        let ipc_connector = IpcConnect::new(path.clone());
301        let ipc = ipc_connector.into_service().await.map_err(|e| {
302            RuntimeTransportError::TransportError(e, path.clone().display().to_string())
303        })?;
304        Ok(InnerTransport::Ipc(ipc))
305    }
306
307    /// Sends a request using the underlying transport.
308    /// If this is the first request, it will connect to the appropriate transport depending on the
309    /// URL scheme. Retries are performed by an external client layer (e.g., `RetryBackoffLayer`),
310    /// if such a layer is configured by the caller.
311    /// For sending the actual request, this action is delegated down to the
312    /// underlying transport through Tower's [tower::Service::call]. See tower's [tower::Service]
313    /// trait for more information.
314    pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
315        let this = self.clone();
316        Box::pin(async move {
317            let mut inner = this.inner.read().await;
318            if inner.is_none() {
319                drop(inner);
320                {
321                    let mut inner_mut = this.inner.write().await;
322                    if inner_mut.is_none() {
323                        *inner_mut =
324                            Some(this.connect().await.map_err(TransportErrorKind::custom)?);
325                    }
326                }
327                inner = this.inner.read().await;
328            }
329
330            // SAFETY: We just checked that the inner transport exists.
331            match inner.clone().expect("must've been initialized") {
332                InnerTransport::Http(mut http) => http.call(req),
333                InnerTransport::Ws(mut ws) => ws.call(req),
334                InnerTransport::Ipc(mut ipc) => ipc.call(req),
335            }
336            .await
337        })
338    }
339
340    /// Convert this transport into a boxed trait object.
341    pub fn boxed(self) -> BoxTransport
342    where
343        Self: Sized + Clone + Send + Sync + 'static,
344    {
345        BoxTransport::new(self)
346    }
347}
348
349impl tower::Service<RequestPacket> for RuntimeTransport {
350    type Response = ResponsePacket;
351    type Error = TransportError;
352    type Future = TransportFut<'static>;
353
354    #[inline]
355    fn poll_ready(
356        &mut self,
357        _cx: &mut std::task::Context<'_>,
358    ) -> std::task::Poll<Result<(), Self::Error>> {
359        std::task::Poll::Ready(Ok(()))
360    }
361
362    #[inline]
363    fn call(&mut self, req: RequestPacket) -> Self::Future {
364        self.request(req)
365    }
366}
367
368impl tower::Service<RequestPacket> for &RuntimeTransport {
369    type Response = ResponsePacket;
370    type Error = TransportError;
371    type Future = TransportFut<'static>;
372
373    #[inline]
374    fn poll_ready(
375        &mut self,
376        _cx: &mut std::task::Context<'_>,
377    ) -> std::task::Poll<Result<(), Self::Error>> {
378        std::task::Poll::Ready(Ok(()))
379    }
380
381    #[inline]
382    fn call(&mut self, req: RequestPacket) -> Self::Future {
383        self.request(req)
384    }
385}
386
387fn build_auth(jwt: String) -> eyre::Result<Authorization> {
388    // Decode jwt from hex, then generate claims (iat with current timestamp)
389    let secret = JwtSecret::from_hex(jwt)?;
390    let claims = Claims::default();
391    let token = secret.encode(&claims)?;
392
393    let auth = Authorization::Bearer(token);
394
395    Ok(auth)
396}
397
398#[cfg(windows)]
399fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
400    const PREFIX: &str = "file:///pipe/";
401
402    let url_str = url.as_str();
403
404    if let Some(pipe_name) = url_str.strip_prefix(PREFIX) {
405        let pipe_path = format!(r"\\.\pipe\{pipe_name}");
406        return Ok(PathBuf::from(pipe_path));
407    }
408
409    url.to_file_path()
410}
411
412#[cfg(not(windows))]
413fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
414    url.to_file_path()
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use reqwest::header::HeaderMap;
421
422    #[tokio::test]
423    async fn test_user_agent_header() {
424        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
425        let url = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
426
427        let http_handler = axum::routing::get(|actual_headers: HeaderMap| {
428            let user_agent = HeaderName::from_str("User-Agent").unwrap();
429            assert_eq!(actual_headers[user_agent], HeaderValue::from_str("test-agent").unwrap());
430
431            async { "" }
432        });
433
434        let server_task = tokio::spawn(async move {
435            axum::serve(listener, http_handler.into_make_service()).await.unwrap()
436        });
437
438        let transport = RuntimeTransportBuilder::new(url.clone())
439            .with_headers(vec!["User-Agent: test-agent".to_string()])
440            .build();
441        let inner = transport.connect_http().unwrap();
442
443        match inner {
444            InnerTransport::Http(http) => {
445                let _ = http.client().get(url).send().await.unwrap();
446
447                // assert inside http_handler
448            }
449            _ => unreachable!(),
450        }
451
452        server_task.abort();
453    }
454}