Skip to main content

foundry_common/provider/
runtime_transport.rs

1//! Runtime transport that connects on first request, which can take either of an HTTP,
2//! WebSocket, or IPC transport. Retries are handled by a client layer (e.g.,
3//! `RetryBackoffLayer`) when used.
4
5use crate::{
6    DEFAULT_USER_AGENT, REQUEST_TIMEOUT,
7    provider::mpp::{keys::discover_mpp_key, transport::LazyMppHttpTransport, ws::MppWsConnect},
8};
9use alloy_json_rpc::{RequestPacket, ResponsePacket};
10use alloy_pubsub::{PubSubConnect, PubSubFrontend};
11use alloy_rpc_types_engine::{Claims, JwtSecret};
12use alloy_transport::{
13    Authorization, BoxTransport, TransportError, TransportErrorKind, TransportFut,
14};
15use alloy_transport_ipc::IpcConnect;
16use alloy_transport_ws::WsConnect;
17use reqwest::header::{HeaderName, HeaderValue};
18use std::{fmt, path::PathBuf, str::FromStr, sync::Arc};
19use thiserror::Error;
20use tokio::sync::RwLock;
21use tower::Service;
22use url::Url;
23
24/// Known MPP-enabled RPC host suffixes.
25///
26/// Endpoints matching these patterns are always connected via [`MppWsConnect`],
27/// regardless of whether local MPP keys have been discovered.
28const KNOWN_MPP_HOSTS: &[&str] = &[".mpp.tempo.xyz"];
29
30/// Returns `true` if `url` points to a known MPP-enabled RPC service.
31fn is_known_mpp_endpoint(url: &Url) -> bool {
32    url.host_str().is_some_and(|host| KNOWN_MPP_HOSTS.iter().any(|suffix| host.ends_with(suffix)))
33}
34
35/// An enum representing the different transports that can be used to connect to a runtime.
36/// Only meant to be used internally by [RuntimeTransport].
37#[derive(Clone, Debug)]
38pub enum InnerTransport {
39    /// HTTP transport with lazy MPP 402 handling.
40    ///
41    /// For known Tempo endpoints, the MPP layer additionally runs the
42    /// `wallet.tempo.xyz` device-code flow on a 402 when no local access key
43    /// is configured (see [`crate::tempo::ensure_access_key`]).
44    Http(LazyMppHttpTransport),
45    /// WebSocket transport
46    Ws(PubSubFrontend),
47    /// IPC transport
48    Ipc(PubSubFrontend),
49}
50
51/// Error type for the runtime transport.
52#[derive(Error, Debug)]
53pub enum RuntimeTransportError {
54    /// Internal transport error
55    #[error("Internal transport error: {0} with {1}")]
56    TransportError(TransportError, String),
57
58    /// Invalid URL scheme
59    #[error("URL scheme is not supported: {0}")]
60    BadScheme(String),
61
62    /// Invalid HTTP header
63    #[error("Invalid HTTP header: {0}")]
64    BadHeader(String),
65
66    /// Invalid file path
67    #[error("Invalid IPC file path: {0}")]
68    BadPath(String),
69
70    /// Invalid construction of Http provider
71    #[error(transparent)]
72    HttpConstructionError(#[from] reqwest::Error),
73
74    /// Invalid JWT
75    #[error("Invalid JWT: {0}")]
76    InvalidJwt(String),
77}
78
79/// Runtime transport that only connects on first request.
80///
81/// A runtime transport is a custom [`alloy_transport::Transport`] that only connects when the
82/// *first* request is made. When the first request is made, it will connect to the runtime using
83/// either an HTTP WebSocket, or IPC transport depending on the URL used.
84/// Retries for rate-limiting and timeout-related errors are handled by an external
85/// client layer (e.g., `RetryBackoffLayer`) when configured.
86#[derive(Clone, Debug)]
87pub struct RuntimeTransport {
88    /// The inner actual transport used.
89    inner: Arc<RwLock<Option<InnerTransport>>>,
90    /// The URL to connect to.
91    url: Url,
92    /// The headers to use for requests.
93    headers: Vec<String>,
94    /// The JWT to use for requests.
95    jwt: Option<String>,
96    /// The timeout for requests.
97    timeout: std::time::Duration,
98    /// Whether to accept invalid certificates.
99    accept_invalid_certs: bool,
100    /// Whether to disable automatic proxy detection.
101    no_proxy: bool,
102}
103
104/// A builder for [RuntimeTransport].
105#[derive(Debug)]
106pub struct RuntimeTransportBuilder {
107    url: Url,
108    headers: Vec<String>,
109    jwt: Option<String>,
110    timeout: std::time::Duration,
111    accept_invalid_certs: bool,
112    no_proxy: bool,
113}
114
115impl RuntimeTransportBuilder {
116    /// Create a new builder with the given URL.
117    pub const fn new(url: Url) -> Self {
118        Self {
119            url,
120            headers: vec![],
121            jwt: None,
122            timeout: REQUEST_TIMEOUT,
123            accept_invalid_certs: false,
124            no_proxy: false,
125        }
126    }
127
128    /// Set the URL for the transport.
129    pub fn with_headers(mut self, headers: Vec<String>) -> Self {
130        self.headers = headers;
131        self
132    }
133
134    /// Set the JWT for the transport.
135    pub fn with_jwt(mut self, jwt: Option<String>) -> Self {
136        self.jwt = jwt;
137        self
138    }
139
140    /// Set the timeout for the transport.
141    pub const fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
142        self.timeout = timeout;
143        self
144    }
145
146    /// Set whether to accept invalid certificates.
147    pub const fn accept_invalid_certs(mut self, accept_invalid_certs: bool) -> Self {
148        self.accept_invalid_certs = accept_invalid_certs;
149        self
150    }
151
152    /// Set whether to disable automatic proxy detection.
153    ///
154    /// This can help in sandboxed environments (e.g., Cursor IDE sandbox, macOS App Sandbox)
155    /// where system proxy detection via SCDynamicStore causes crashes.
156    pub const fn no_proxy(mut self, no_proxy: bool) -> Self {
157        self.no_proxy = no_proxy;
158        self
159    }
160
161    /// Builds the [RuntimeTransport] and returns it in a disconnected state.
162    /// The runtime transport will then connect when the first request happens.
163    pub fn build(self) -> RuntimeTransport {
164        RuntimeTransport {
165            inner: Arc::new(RwLock::new(None)),
166            url: self.url,
167            headers: self.headers,
168            jwt: self.jwt,
169            timeout: self.timeout,
170            accept_invalid_certs: self.accept_invalid_certs,
171            no_proxy: self.no_proxy,
172        }
173    }
174}
175
176impl fmt::Display for RuntimeTransport {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        write!(f, "RuntimeTransport {}", self.url)
179    }
180}
181
182impl RuntimeTransport {
183    /// Connects the underlying transport, depending on the URL scheme.
184    pub async fn connect(&self) -> Result<InnerTransport, RuntimeTransportError> {
185        match self.url.scheme() {
186            "http" | "https" => self.connect_http(),
187            "ws" | "wss" => self.connect_ws().await,
188            "file" => self.connect_ipc().await,
189            _ => Err(RuntimeTransportError::BadScheme(self.url.scheme().to_string())),
190        }
191    }
192
193    /// Creates a new reqwest client from this transport.
194    pub fn reqwest_client(&self) -> Result<reqwest::Client, RuntimeTransportError> {
195        let mut client_builder = reqwest::Client::builder()
196            .timeout(self.timeout)
197            .danger_accept_invalid_certs(self.accept_invalid_certs);
198
199        // Disable automatic proxy detection if requested. This helps in sandboxed environments
200        // (e.g., Cursor IDE sandbox, macOS App Sandbox) where system proxy detection via
201        // SCDynamicStore causes crashes. See: https://github.com/foundry-rs/foundry/issues/12733
202        if self.no_proxy {
203            client_builder = client_builder.no_proxy();
204        }
205
206        let mut headers = reqwest::header::HeaderMap::new();
207
208        // If there's a JWT, add it to the headers if we can decode it.
209        if let Some(jwt) = self.jwt.clone() {
210            let auth =
211                build_auth(jwt).map_err(|e| RuntimeTransportError::InvalidJwt(e.to_string()))?;
212
213            let mut auth_value: HeaderValue =
214                HeaderValue::from_str(&auth.to_string()).expect("Header should be valid string");
215            auth_value.set_sensitive(true);
216
217            headers.insert(reqwest::header::AUTHORIZATION, auth_value);
218        };
219
220        // Add any custom headers.
221        for header in &self.headers {
222            let make_err = || RuntimeTransportError::BadHeader(header.clone());
223
224            let (key, val) = header.split_once(':').ok_or_else(make_err)?;
225
226            headers.insert(
227                HeaderName::from_str(key.trim()).map_err(|_| make_err())?,
228                HeaderValue::from_str(val.trim()).map_err(|_| make_err())?,
229            );
230        }
231
232        if !headers.contains_key(reqwest::header::USER_AGENT) {
233            headers.insert(
234                reqwest::header::USER_AGENT,
235                HeaderValue::from_str(DEFAULT_USER_AGENT)
236                    .expect("User-Agent should be valid string"),
237            );
238        }
239
240        // If MPP_API_KEY is set, attach it as x-api-key for gated MPP proxies.
241        // Does not override an explicit x-api-key header from the user.
242        if !headers.contains_key(HeaderName::from_static("x-api-key"))
243            && let Ok(api_key) = std::env::var("MPP_API_KEY")
244        {
245            let api_key = api_key.trim();
246            if !api_key.is_empty() {
247                let mut value = HeaderValue::from_str(api_key)
248                    .map_err(|_| RuntimeTransportError::BadHeader("MPP_API_KEY".to_string()))?;
249                value.set_sensitive(true);
250                headers.insert(HeaderName::from_static("x-api-key"), value);
251            }
252        }
253
254        client_builder = client_builder.default_headers(headers);
255
256        Ok(client_builder.build()?)
257    }
258
259    /// Connects to an HTTP transport with lazy MPP 402 handling.
260    fn connect_http(&self) -> Result<InnerTransport, RuntimeTransportError> {
261        let client = self.reqwest_client()?;
262        Ok(InnerTransport::Http(LazyMppHttpTransport::lazy(client, self.url.clone())))
263    }
264
265    /// Connects to a WS transport.
266    ///
267    /// Uses [`MppWsConnect`] (which performs the MPP challenge/credential
268    /// handshake at connect time) when the endpoint is a known MPP service or
269    /// when MPP keys are discoverable. Otherwise falls back to alloy's plain
270    /// [`WsConnect`] with zero overhead.
271    async fn connect_ws(&self) -> Result<InnerTransport, RuntimeTransportError> {
272        let auth = self.jwt.as_ref().and_then(|jwt| build_auth(jwt.clone()).ok());
273
274        let service = if is_known_mpp_endpoint(&self.url) && discover_mpp_key().is_some() {
275            let mut ws = MppWsConnect::new(self.url.to_string());
276            if let Some(auth) = auth {
277                ws = ws.with_auth(auth);
278            }
279            ws.into_service()
280                .await
281                .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
282        } else {
283            let mut ws = WsConnect::new(self.url.to_string());
284            if let Some(auth) = auth {
285                ws = ws.with_auth(auth);
286            }
287            ws.into_service()
288                .await
289                .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
290        };
291
292        Ok(InnerTransport::Ws(service))
293    }
294
295    /// Connects to an IPC transport.
296    async fn connect_ipc(&self) -> Result<InnerTransport, RuntimeTransportError> {
297        let path = url_to_file_path(&self.url)
298            .map_err(|_| RuntimeTransportError::BadPath(self.url.to_string()))?;
299        let ipc_connector = IpcConnect::new(path.clone());
300        let ipc = ipc_connector.into_service().await.map_err(|e| {
301            RuntimeTransportError::TransportError(e, path.clone().display().to_string())
302        })?;
303        Ok(InnerTransport::Ipc(ipc))
304    }
305
306    /// Sends a request using the underlying transport.
307    /// If this is the first request, it will connect to the appropriate transport depending on the
308    /// URL scheme. Retries are performed by an external client layer (e.g., `RetryBackoffLayer`),
309    /// if such a layer is configured by the caller.
310    /// For sending the actual request, this action is delegated down to the
311    /// underlying transport through Tower's [tower::Service::call]. See tower's [tower::Service]
312    /// trait for more information.
313    pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
314        let this = self.clone();
315        Box::pin(async move {
316            let mut inner = this.inner.read().await;
317            if inner.is_none() {
318                drop(inner);
319                {
320                    let mut inner_mut = this.inner.write().await;
321                    if inner_mut.is_none() {
322                        *inner_mut =
323                            Some(this.connect().await.map_err(TransportErrorKind::custom)?);
324                    }
325                }
326                inner = this.inner.read().await;
327            }
328
329            // SAFETY: We just checked that the inner transport exists.
330            match inner.clone().expect("must've been initialized") {
331                InnerTransport::Http(mut http) => http.call(req),
332                InnerTransport::Ws(mut ws) => ws.call(req),
333                InnerTransport::Ipc(mut ipc) => ipc.call(req),
334            }
335            .await
336        })
337    }
338
339    /// Convert this transport into a boxed trait object.
340    pub fn boxed(self) -> BoxTransport
341    where
342        Self: Sized + Clone + Send + Sync + 'static,
343    {
344        BoxTransport::new(self)
345    }
346}
347
348impl tower::Service<RequestPacket> for RuntimeTransport {
349    type Response = ResponsePacket;
350    type Error = TransportError;
351    type Future = TransportFut<'static>;
352
353    #[inline]
354    fn poll_ready(
355        &mut self,
356        _cx: &mut std::task::Context<'_>,
357    ) -> std::task::Poll<Result<(), Self::Error>> {
358        std::task::Poll::Ready(Ok(()))
359    }
360
361    #[inline]
362    fn call(&mut self, req: RequestPacket) -> Self::Future {
363        self.request(req)
364    }
365}
366
367impl tower::Service<RequestPacket> for &RuntimeTransport {
368    type Response = ResponsePacket;
369    type Error = TransportError;
370    type Future = TransportFut<'static>;
371
372    #[inline]
373    fn poll_ready(
374        &mut self,
375        _cx: &mut std::task::Context<'_>,
376    ) -> std::task::Poll<Result<(), Self::Error>> {
377        std::task::Poll::Ready(Ok(()))
378    }
379
380    #[inline]
381    fn call(&mut self, req: RequestPacket) -> Self::Future {
382        self.request(req)
383    }
384}
385
386fn build_auth(jwt: String) -> eyre::Result<Authorization> {
387    // Decode jwt from hex, then generate claims (iat with current timestamp)
388    let secret = JwtSecret::from_hex(jwt)?;
389    let claims = Claims::default();
390    let token = secret.encode(&claims)?;
391
392    let auth = Authorization::Bearer(token);
393
394    Ok(auth)
395}
396
397#[cfg(windows)]
398fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
399    const PREFIX: &str = "file:///pipe/";
400
401    let url_str = url.as_str();
402
403    if let Some(pipe_name) = url_str.strip_prefix(PREFIX) {
404        let pipe_path = format!(r"\\.\pipe\{pipe_name}");
405        return Ok(PathBuf::from(pipe_path));
406    }
407
408    url.to_file_path()
409}
410
411#[cfg(not(windows))]
412fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
413    url.to_file_path()
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use reqwest::header::HeaderMap;
420
421    #[tokio::test]
422    async fn test_user_agent_header() {
423        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
424        let url = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
425
426        let http_handler = axum::routing::get(|actual_headers: HeaderMap| {
427            let user_agent = HeaderName::from_str("User-Agent").unwrap();
428            assert_eq!(actual_headers[user_agent], HeaderValue::from_str("test-agent").unwrap());
429
430            async { "" }
431        });
432
433        let server_task = tokio::spawn(async move {
434            axum::serve(listener, http_handler.into_make_service()).await.unwrap()
435        });
436
437        let transport = RuntimeTransportBuilder::new(url.clone())
438            .with_headers(vec!["User-Agent: test-agent".to_string()])
439            .build();
440        let inner = transport.connect_http().unwrap();
441
442        match inner {
443            InnerTransport::Http(http) => {
444                let _ = http.client().get(url).send().await.unwrap();
445
446                // assert inside http_handler
447            }
448            _ => unreachable!(),
449        }
450
451        server_task.abort();
452    }
453}