1use crate::{
6 DEFAULT_USER_AGENT, REQUEST_TIMEOUT,
7 provider::mpp::{keys::discover_mpp_key, transport::LazyMppHttpTransport, ws::MppWsConnect},
8};
9use alloy_json_rpc::{RequestPacket, ResponsePacket};
10use alloy_pubsub::{PubSubConnect, PubSubFrontend};
11use alloy_rpc_types_engine::{Claims, JwtSecret};
12use alloy_transport::{
13 Authorization, BoxTransport, TransportError, TransportErrorKind, TransportFut,
14 utils::guess_local_url,
15};
16use alloy_transport_ipc::IpcConnect;
17use alloy_transport_ws::WsConnect;
18use reqwest::header::{HeaderName, HeaderValue};
19use std::{fmt, path::PathBuf, str::FromStr, sync::Arc};
20use thiserror::Error;
21use tokio::sync::RwLock;
22use tower::Service;
23use url::Url;
24
25const KNOWN_MPP_HOSTS: &[&str] = &[".mpp.tempo.xyz"];
30
31fn is_known_mpp_endpoint(url: &Url) -> bool {
33 url.host_str().is_some_and(|host| KNOWN_MPP_HOSTS.iter().any(|suffix| host.ends_with(suffix)))
34}
35
36#[derive(Clone, Debug)]
39pub enum InnerTransport {
40 Http(LazyMppHttpTransport),
46 Ws(PubSubFrontend),
48 Ipc(PubSubFrontend),
50}
51
52#[derive(Error, Debug)]
54pub enum RuntimeTransportError {
55 #[error("Internal transport error: {0} with {1}")]
57 TransportError(TransportError, String),
58
59 #[error("URL scheme is not supported: {0}")]
61 BadScheme(String),
62
63 #[error("Invalid HTTP header: {0}")]
65 BadHeader(String),
66
67 #[error("Invalid IPC file path: {0}")]
69 BadPath(String),
70
71 #[error(transparent)]
73 HttpConstructionError(#[from] reqwest::Error),
74
75 #[error("Invalid JWT: {0}")]
77 InvalidJwt(String),
78}
79
80#[derive(Clone, Debug)]
88pub struct RuntimeTransport {
89 inner: Arc<RwLock<Option<InnerTransport>>>,
91 url: Url,
93 headers: Vec<String>,
95 jwt: Option<String>,
97 timeout: std::time::Duration,
99 accept_invalid_certs: bool,
101 no_proxy: bool,
103}
104
105#[derive(Debug)]
107pub struct RuntimeTransportBuilder {
108 url: Url,
109 headers: Vec<String>,
110 jwt: Option<String>,
111 timeout: std::time::Duration,
112 accept_invalid_certs: bool,
113 no_proxy: bool,
114}
115
116impl RuntimeTransportBuilder {
117 pub const fn new(url: Url) -> Self {
119 Self {
120 url,
121 headers: vec![],
122 jwt: None,
123 timeout: REQUEST_TIMEOUT,
124 accept_invalid_certs: false,
125 no_proxy: false,
126 }
127 }
128
129 pub fn with_headers(mut self, headers: Vec<String>) -> Self {
131 self.headers = headers;
132 self
133 }
134
135 pub fn with_jwt(mut self, jwt: Option<String>) -> Self {
137 self.jwt = jwt;
138 self
139 }
140
141 pub const fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
143 self.timeout = timeout;
144 self
145 }
146
147 pub const fn accept_invalid_certs(mut self, accept_invalid_certs: bool) -> Self {
149 self.accept_invalid_certs = accept_invalid_certs;
150 self
151 }
152
153 pub const fn no_proxy(mut self, no_proxy: bool) -> Self {
158 self.no_proxy = no_proxy;
159 self
160 }
161
162 pub fn build(self) -> RuntimeTransport {
165 RuntimeTransport {
166 inner: Arc::new(RwLock::new(None)),
167 url: self.url,
168 headers: self.headers,
169 jwt: self.jwt,
170 timeout: self.timeout,
171 accept_invalid_certs: self.accept_invalid_certs,
172 no_proxy: self.no_proxy,
173 }
174 }
175}
176
177impl fmt::Display for RuntimeTransport {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 write!(f, "RuntimeTransport {}", self.url)
180 }
181}
182
183impl RuntimeTransport {
184 pub async fn connect(&self) -> Result<InnerTransport, RuntimeTransportError> {
186 match self.url.scheme() {
187 "http" | "https" => self.connect_http(),
188 "ws" | "wss" => self.connect_ws().await,
189 "file" => self.connect_ipc().await,
190 _ => Err(RuntimeTransportError::BadScheme(self.url.scheme().to_string())),
191 }
192 }
193
194 pub fn reqwest_client(&self) -> Result<reqwest::Client, RuntimeTransportError> {
196 let mut client_builder = reqwest::Client::builder()
197 .timeout(self.timeout)
198 .danger_accept_invalid_certs(self.accept_invalid_certs);
199
200 if self.no_proxy || guess_local_url(self.url.as_str()) {
204 client_builder = client_builder.no_proxy();
205 }
206
207 let mut headers = reqwest::header::HeaderMap::new();
208
209 if let Some(jwt) = self.jwt.clone() {
211 let auth =
212 build_auth(jwt).map_err(|e| RuntimeTransportError::InvalidJwt(e.to_string()))?;
213
214 let mut auth_value: HeaderValue =
215 HeaderValue::from_str(&auth.to_string()).expect("Header should be valid string");
216 auth_value.set_sensitive(true);
217
218 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
219 };
220
221 for header in &self.headers {
223 let make_err = || RuntimeTransportError::BadHeader(header.clone());
224
225 let (key, val) = header.split_once(':').ok_or_else(make_err)?;
226
227 headers.insert(
228 HeaderName::from_str(key.trim()).map_err(|_| make_err())?,
229 HeaderValue::from_str(val.trim()).map_err(|_| make_err())?,
230 );
231 }
232
233 if !headers.contains_key(reqwest::header::USER_AGENT) {
234 headers.insert(
235 reqwest::header::USER_AGENT,
236 HeaderValue::from_str(DEFAULT_USER_AGENT)
237 .expect("User-Agent should be valid string"),
238 );
239 }
240
241 if !headers.contains_key(HeaderName::from_static("x-api-key"))
244 && let Ok(api_key) = std::env::var("MPP_API_KEY")
245 {
246 let api_key = api_key.trim();
247 if !api_key.is_empty() {
248 let mut value = HeaderValue::from_str(api_key)
249 .map_err(|_| RuntimeTransportError::BadHeader("MPP_API_KEY".to_string()))?;
250 value.set_sensitive(true);
251 headers.insert(HeaderName::from_static("x-api-key"), value);
252 }
253 }
254
255 client_builder = client_builder.default_headers(headers);
256
257 Ok(client_builder.build()?)
258 }
259
260 fn connect_http(&self) -> Result<InnerTransport, RuntimeTransportError> {
262 let client = self.reqwest_client()?;
263 Ok(InnerTransport::Http(LazyMppHttpTransport::lazy(client, self.url.clone())))
264 }
265
266 async fn connect_ws(&self) -> Result<InnerTransport, RuntimeTransportError> {
273 let auth = self.jwt.as_ref().and_then(|jwt| build_auth(jwt.clone()).ok());
274
275 let service = if is_known_mpp_endpoint(&self.url) && discover_mpp_key().is_some() {
276 let mut ws = MppWsConnect::new(self.url.to_string());
277 if let Some(auth) = auth {
278 ws = ws.with_auth(auth);
279 }
280 ws.into_service()
281 .await
282 .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
283 } else {
284 let mut ws = WsConnect::new(self.url.to_string());
285 if let Some(auth) = auth {
286 ws = ws.with_auth(auth);
287 }
288 ws.into_service()
289 .await
290 .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
291 };
292
293 Ok(InnerTransport::Ws(service))
294 }
295
296 async fn connect_ipc(&self) -> Result<InnerTransport, RuntimeTransportError> {
298 let path = url_to_file_path(&self.url)
299 .map_err(|_| RuntimeTransportError::BadPath(self.url.to_string()))?;
300 let ipc_connector = IpcConnect::new(path.clone());
301 let ipc = ipc_connector.into_service().await.map_err(|e| {
302 RuntimeTransportError::TransportError(e, path.clone().display().to_string())
303 })?;
304 Ok(InnerTransport::Ipc(ipc))
305 }
306
307 pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
315 let this = self.clone();
316 Box::pin(async move {
317 let mut inner = this.inner.read().await;
318 if inner.is_none() {
319 drop(inner);
320 {
321 let mut inner_mut = this.inner.write().await;
322 if inner_mut.is_none() {
323 *inner_mut =
324 Some(this.connect().await.map_err(TransportErrorKind::custom)?);
325 }
326 }
327 inner = this.inner.read().await;
328 }
329
330 match inner.clone().expect("must've been initialized") {
332 InnerTransport::Http(mut http) => http.call(req),
333 InnerTransport::Ws(mut ws) => ws.call(req),
334 InnerTransport::Ipc(mut ipc) => ipc.call(req),
335 }
336 .await
337 })
338 }
339
340 pub fn boxed(self) -> BoxTransport
342 where
343 Self: Sized + Clone + Send + Sync + 'static,
344 {
345 BoxTransport::new(self)
346 }
347}
348
349impl tower::Service<RequestPacket> for RuntimeTransport {
350 type Response = ResponsePacket;
351 type Error = TransportError;
352 type Future = TransportFut<'static>;
353
354 #[inline]
355 fn poll_ready(
356 &mut self,
357 _cx: &mut std::task::Context<'_>,
358 ) -> std::task::Poll<Result<(), Self::Error>> {
359 std::task::Poll::Ready(Ok(()))
360 }
361
362 #[inline]
363 fn call(&mut self, req: RequestPacket) -> Self::Future {
364 self.request(req)
365 }
366}
367
368impl tower::Service<RequestPacket> for &RuntimeTransport {
369 type Response = ResponsePacket;
370 type Error = TransportError;
371 type Future = TransportFut<'static>;
372
373 #[inline]
374 fn poll_ready(
375 &mut self,
376 _cx: &mut std::task::Context<'_>,
377 ) -> std::task::Poll<Result<(), Self::Error>> {
378 std::task::Poll::Ready(Ok(()))
379 }
380
381 #[inline]
382 fn call(&mut self, req: RequestPacket) -> Self::Future {
383 self.request(req)
384 }
385}
386
387fn build_auth(jwt: String) -> eyre::Result<Authorization> {
388 let secret = JwtSecret::from_hex(jwt)?;
390 let claims = Claims::default();
391 let token = secret.encode(&claims)?;
392
393 let auth = Authorization::Bearer(token);
394
395 Ok(auth)
396}
397
398#[cfg(windows)]
399fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
400 const PREFIX: &str = "file:///pipe/";
401
402 let url_str = url.as_str();
403
404 if let Some(pipe_name) = url_str.strip_prefix(PREFIX) {
405 let pipe_path = format!(r"\\.\pipe\{pipe_name}");
406 return Ok(PathBuf::from(pipe_path));
407 }
408
409 url.to_file_path()
410}
411
412#[cfg(not(windows))]
413fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
414 url.to_file_path()
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use reqwest::header::HeaderMap;
421
422 #[tokio::test]
423 async fn test_user_agent_header() {
424 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
425 let url = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
426
427 let http_handler = axum::routing::get(|actual_headers: HeaderMap| {
428 let user_agent = HeaderName::from_str("User-Agent").unwrap();
429 assert_eq!(actual_headers[user_agent], HeaderValue::from_str("test-agent").unwrap());
430
431 async { "" }
432 });
433
434 let server_task = tokio::spawn(async move {
435 axum::serve(listener, http_handler.into_make_service()).await.unwrap()
436 });
437
438 let transport = RuntimeTransportBuilder::new(url.clone())
439 .with_headers(vec!["User-Agent: test-agent".to_string()])
440 .build();
441 let inner = transport.connect_http().unwrap();
442
443 match inner {
444 InnerTransport::Http(http) => {
445 let _ = http.client().get(url).send().await.unwrap();
446
447 }
449 _ => unreachable!(),
450 }
451
452 server_task.abort();
453 }
454}