1use crate::{
6 DEFAULT_USER_AGENT, REQUEST_TIMEOUT,
7 provider::mpp::{keys::discover_mpp_key, transport::LazyMppHttpTransport, ws::MppWsConnect},
8};
9use alloy_json_rpc::{RequestPacket, ResponsePacket};
10use alloy_pubsub::{PubSubConnect, PubSubFrontend};
11use alloy_rpc_types_engine::{Claims, JwtSecret};
12use alloy_transport::{
13 Authorization, BoxTransport, TransportError, TransportErrorKind, TransportFut,
14};
15use alloy_transport_ipc::IpcConnect;
16use alloy_transport_ws::WsConnect;
17use reqwest::header::{HeaderName, HeaderValue};
18use std::{fmt, path::PathBuf, str::FromStr, sync::Arc};
19use thiserror::Error;
20use tokio::sync::RwLock;
21use tower::Service;
22use url::Url;
23
24const KNOWN_MPP_HOSTS: &[&str] = &[".mpp.tempo.xyz"];
29
30fn is_known_mpp_endpoint(url: &Url) -> bool {
32 url.host_str().is_some_and(|host| KNOWN_MPP_HOSTS.iter().any(|suffix| host.ends_with(suffix)))
33}
34
35#[derive(Clone, Debug)]
38pub enum InnerTransport {
39 Http(LazyMppHttpTransport),
45 Ws(PubSubFrontend),
47 Ipc(PubSubFrontend),
49}
50
51#[derive(Error, Debug)]
53pub enum RuntimeTransportError {
54 #[error("Internal transport error: {0} with {1}")]
56 TransportError(TransportError, String),
57
58 #[error("URL scheme is not supported: {0}")]
60 BadScheme(String),
61
62 #[error("Invalid HTTP header: {0}")]
64 BadHeader(String),
65
66 #[error("Invalid IPC file path: {0}")]
68 BadPath(String),
69
70 #[error(transparent)]
72 HttpConstructionError(#[from] reqwest::Error),
73
74 #[error("Invalid JWT: {0}")]
76 InvalidJwt(String),
77}
78
79#[derive(Clone, Debug)]
87pub struct RuntimeTransport {
88 inner: Arc<RwLock<Option<InnerTransport>>>,
90 url: Url,
92 headers: Vec<String>,
94 jwt: Option<String>,
96 timeout: std::time::Duration,
98 accept_invalid_certs: bool,
100 no_proxy: bool,
102}
103
104#[derive(Debug)]
106pub struct RuntimeTransportBuilder {
107 url: Url,
108 headers: Vec<String>,
109 jwt: Option<String>,
110 timeout: std::time::Duration,
111 accept_invalid_certs: bool,
112 no_proxy: bool,
113}
114
115impl RuntimeTransportBuilder {
116 pub const fn new(url: Url) -> Self {
118 Self {
119 url,
120 headers: vec![],
121 jwt: None,
122 timeout: REQUEST_TIMEOUT,
123 accept_invalid_certs: false,
124 no_proxy: false,
125 }
126 }
127
128 pub fn with_headers(mut self, headers: Vec<String>) -> Self {
130 self.headers = headers;
131 self
132 }
133
134 pub fn with_jwt(mut self, jwt: Option<String>) -> Self {
136 self.jwt = jwt;
137 self
138 }
139
140 pub const fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
142 self.timeout = timeout;
143 self
144 }
145
146 pub const fn accept_invalid_certs(mut self, accept_invalid_certs: bool) -> Self {
148 self.accept_invalid_certs = accept_invalid_certs;
149 self
150 }
151
152 pub const fn no_proxy(mut self, no_proxy: bool) -> Self {
157 self.no_proxy = no_proxy;
158 self
159 }
160
161 pub fn build(self) -> RuntimeTransport {
164 RuntimeTransport {
165 inner: Arc::new(RwLock::new(None)),
166 url: self.url,
167 headers: self.headers,
168 jwt: self.jwt,
169 timeout: self.timeout,
170 accept_invalid_certs: self.accept_invalid_certs,
171 no_proxy: self.no_proxy,
172 }
173 }
174}
175
176impl fmt::Display for RuntimeTransport {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 write!(f, "RuntimeTransport {}", self.url)
179 }
180}
181
182impl RuntimeTransport {
183 pub async fn connect(&self) -> Result<InnerTransport, RuntimeTransportError> {
185 match self.url.scheme() {
186 "http" | "https" => self.connect_http(),
187 "ws" | "wss" => self.connect_ws().await,
188 "file" => self.connect_ipc().await,
189 _ => Err(RuntimeTransportError::BadScheme(self.url.scheme().to_string())),
190 }
191 }
192
193 pub fn reqwest_client(&self) -> Result<reqwest::Client, RuntimeTransportError> {
195 let mut client_builder = reqwest::Client::builder()
196 .timeout(self.timeout)
197 .danger_accept_invalid_certs(self.accept_invalid_certs);
198
199 if self.no_proxy {
203 client_builder = client_builder.no_proxy();
204 }
205
206 let mut headers = reqwest::header::HeaderMap::new();
207
208 if let Some(jwt) = self.jwt.clone() {
210 let auth =
211 build_auth(jwt).map_err(|e| RuntimeTransportError::InvalidJwt(e.to_string()))?;
212
213 let mut auth_value: HeaderValue =
214 HeaderValue::from_str(&auth.to_string()).expect("Header should be valid string");
215 auth_value.set_sensitive(true);
216
217 headers.insert(reqwest::header::AUTHORIZATION, auth_value);
218 };
219
220 for header in &self.headers {
222 let make_err = || RuntimeTransportError::BadHeader(header.clone());
223
224 let (key, val) = header.split_once(':').ok_or_else(make_err)?;
225
226 headers.insert(
227 HeaderName::from_str(key.trim()).map_err(|_| make_err())?,
228 HeaderValue::from_str(val.trim()).map_err(|_| make_err())?,
229 );
230 }
231
232 if !headers.contains_key(reqwest::header::USER_AGENT) {
233 headers.insert(
234 reqwest::header::USER_AGENT,
235 HeaderValue::from_str(DEFAULT_USER_AGENT)
236 .expect("User-Agent should be valid string"),
237 );
238 }
239
240 if !headers.contains_key(HeaderName::from_static("x-api-key"))
243 && let Ok(api_key) = std::env::var("MPP_API_KEY")
244 {
245 let api_key = api_key.trim();
246 if !api_key.is_empty() {
247 let mut value = HeaderValue::from_str(api_key)
248 .map_err(|_| RuntimeTransportError::BadHeader("MPP_API_KEY".to_string()))?;
249 value.set_sensitive(true);
250 headers.insert(HeaderName::from_static("x-api-key"), value);
251 }
252 }
253
254 client_builder = client_builder.default_headers(headers);
255
256 Ok(client_builder.build()?)
257 }
258
259 fn connect_http(&self) -> Result<InnerTransport, RuntimeTransportError> {
261 let client = self.reqwest_client()?;
262 Ok(InnerTransport::Http(LazyMppHttpTransport::lazy(client, self.url.clone())))
263 }
264
265 async fn connect_ws(&self) -> Result<InnerTransport, RuntimeTransportError> {
272 let auth = self.jwt.as_ref().and_then(|jwt| build_auth(jwt.clone()).ok());
273
274 let service = if is_known_mpp_endpoint(&self.url) && discover_mpp_key().is_some() {
275 let mut ws = MppWsConnect::new(self.url.to_string());
276 if let Some(auth) = auth {
277 ws = ws.with_auth(auth);
278 }
279 ws.into_service()
280 .await
281 .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
282 } else {
283 let mut ws = WsConnect::new(self.url.to_string());
284 if let Some(auth) = auth {
285 ws = ws.with_auth(auth);
286 }
287 ws.into_service()
288 .await
289 .map_err(|e| RuntimeTransportError::TransportError(e, self.url.to_string()))?
290 };
291
292 Ok(InnerTransport::Ws(service))
293 }
294
295 async fn connect_ipc(&self) -> Result<InnerTransport, RuntimeTransportError> {
297 let path = url_to_file_path(&self.url)
298 .map_err(|_| RuntimeTransportError::BadPath(self.url.to_string()))?;
299 let ipc_connector = IpcConnect::new(path.clone());
300 let ipc = ipc_connector.into_service().await.map_err(|e| {
301 RuntimeTransportError::TransportError(e, path.clone().display().to_string())
302 })?;
303 Ok(InnerTransport::Ipc(ipc))
304 }
305
306 pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
314 let this = self.clone();
315 Box::pin(async move {
316 let mut inner = this.inner.read().await;
317 if inner.is_none() {
318 drop(inner);
319 {
320 let mut inner_mut = this.inner.write().await;
321 if inner_mut.is_none() {
322 *inner_mut =
323 Some(this.connect().await.map_err(TransportErrorKind::custom)?);
324 }
325 }
326 inner = this.inner.read().await;
327 }
328
329 match inner.clone().expect("must've been initialized") {
331 InnerTransport::Http(mut http) => http.call(req),
332 InnerTransport::Ws(mut ws) => ws.call(req),
333 InnerTransport::Ipc(mut ipc) => ipc.call(req),
334 }
335 .await
336 })
337 }
338
339 pub fn boxed(self) -> BoxTransport
341 where
342 Self: Sized + Clone + Send + Sync + 'static,
343 {
344 BoxTransport::new(self)
345 }
346}
347
348impl tower::Service<RequestPacket> for RuntimeTransport {
349 type Response = ResponsePacket;
350 type Error = TransportError;
351 type Future = TransportFut<'static>;
352
353 #[inline]
354 fn poll_ready(
355 &mut self,
356 _cx: &mut std::task::Context<'_>,
357 ) -> std::task::Poll<Result<(), Self::Error>> {
358 std::task::Poll::Ready(Ok(()))
359 }
360
361 #[inline]
362 fn call(&mut self, req: RequestPacket) -> Self::Future {
363 self.request(req)
364 }
365}
366
367impl tower::Service<RequestPacket> for &RuntimeTransport {
368 type Response = ResponsePacket;
369 type Error = TransportError;
370 type Future = TransportFut<'static>;
371
372 #[inline]
373 fn poll_ready(
374 &mut self,
375 _cx: &mut std::task::Context<'_>,
376 ) -> std::task::Poll<Result<(), Self::Error>> {
377 std::task::Poll::Ready(Ok(()))
378 }
379
380 #[inline]
381 fn call(&mut self, req: RequestPacket) -> Self::Future {
382 self.request(req)
383 }
384}
385
386fn build_auth(jwt: String) -> eyre::Result<Authorization> {
387 let secret = JwtSecret::from_hex(jwt)?;
389 let claims = Claims::default();
390 let token = secret.encode(&claims)?;
391
392 let auth = Authorization::Bearer(token);
393
394 Ok(auth)
395}
396
397#[cfg(windows)]
398fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
399 const PREFIX: &str = "file:///pipe/";
400
401 let url_str = url.as_str();
402
403 if let Some(pipe_name) = url_str.strip_prefix(PREFIX) {
404 let pipe_path = format!(r"\\.\pipe\{pipe_name}");
405 return Ok(PathBuf::from(pipe_path));
406 }
407
408 url.to_file_path()
409}
410
411#[cfg(not(windows))]
412fn url_to_file_path(url: &Url) -> Result<PathBuf, ()> {
413 url.to_file_path()
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use reqwest::header::HeaderMap;
420
421 #[tokio::test]
422 async fn test_user_agent_header() {
423 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
424 let url = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
425
426 let http_handler = axum::routing::get(|actual_headers: HeaderMap| {
427 let user_agent = HeaderName::from_str("User-Agent").unwrap();
428 assert_eq!(actual_headers[user_agent], HeaderValue::from_str("test-agent").unwrap());
429
430 async { "" }
431 });
432
433 let server_task = tokio::spawn(async move {
434 axum::serve(listener, http_handler.into_make_service()).await.unwrap()
435 });
436
437 let transport = RuntimeTransportBuilder::new(url.clone())
438 .with_headers(vec!["User-Agent: test-agent".to_string()])
439 .build();
440 let inner = transport.connect_http().unwrap();
441
442 match inner {
443 InnerTransport::Http(http) => {
444 let _ = http.client().get(url).send().await.unwrap();
445
446 }
448 _ => unreachable!(),
449 }
450
451 server_task.abort();
452 }
453}