1use crate::RpcHandler;
2use anvil_rpc::{
3 error::RpcError,
4 request::{Request, RpcCall},
5 response::{Response, RpcResponse},
6};
7use axum::{
8 Json,
9 extract::{State, rejection::JsonRejection},
10 http::StatusCode,
11 response::{IntoResponse, Response as AxumResponse},
12};
13use futures::{FutureExt, future};
14
15pub async fn handle<Http: RpcHandler, Ws>(
18 State((handler, _)): State<(Http, Ws)>,
19 request: Result<Json<Request>, JsonRejection>,
20) -> AxumResponse {
21 match request {
22 Ok(Json(req)) => handle_request(req, handler)
23 .await
24 .map(Json)
25 .map(IntoResponse::into_response)
26 .unwrap_or_else(|| StatusCode::NO_CONTENT.into_response()),
27 Err(err) => {
28 warn!(target: "rpc", ?err, "invalid request");
29 Json(Response::error(RpcError::invalid_request())).into_response()
30 }
31 }
32}
33
34pub async fn handle_request<Handler: RpcHandler>(
39 req: Request,
40 handler: Handler,
41) -> Option<Response> {
42 fn responses_as_batch(outs: Vec<Option<RpcResponse>>) -> Option<Response> {
44 let batch: Vec<_> = outs.into_iter().flatten().collect();
45 (!batch.is_empty()).then_some(Response::Batch(batch))
46 }
47
48 match req {
49 Request::Single(call) => handle_call(call, handler).await.map(Response::Single),
50 Request::Batch(calls) => {
51 if calls.is_empty() {
52 return Some(Response::error(RpcError::invalid_request()));
53 }
54 future::join_all(calls.into_iter().map(move |call| handle_call(call, handler.clone())))
55 .map(responses_as_batch)
56 .await
57 }
58 }
59}
60
61async fn handle_call<Handler: RpcHandler>(call: RpcCall, handler: Handler) -> Option<RpcResponse> {
63 match call {
64 RpcCall::MethodCall(call) => {
65 trace!(target: "rpc", id = ?call.id , method = ?call.method, "handling call");
66 Some(handler.on_call(call).await)
67 }
68 RpcCall::Notification(notification) => {
69 trace!(target: "rpc", method = ?notification.method, "received rpc notification");
70 None
71 }
72 RpcCall::Invalid { id } => {
73 warn!(target: "rpc", ?id, "invalid rpc call");
74 Some(RpcResponse::invalid_request(id))
75 }
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use anvil_rpc::{
83 request::{RequestParams, RpcNotification, Version},
84 response::ResponseResult,
85 };
86 use axum::body::to_bytes;
87 use std::{
88 pin::pin,
89 task::{Context, Poll, Waker},
90 };
91
92 #[derive(Clone)]
93 struct TestHandler;
94
95 #[async_trait::async_trait]
96 impl RpcHandler for TestHandler {
97 type Request = serde_json::Value;
98
99 async fn on_request(&self, request: Self::Request) -> ResponseResult {
100 ResponseResult::success(request)
101 }
102 }
103
104 fn notification() -> RpcCall {
105 RpcCall::Notification(RpcNotification {
106 jsonrpc: Some(Version::V2),
107 method: "eth_subscribe".to_owned(),
108 params: RequestParams::None,
109 })
110 }
111
112 fn run_ready<F: Future>(future: F) -> F::Output {
113 let waker = Waker::noop();
114 let mut cx = Context::from_waker(waker);
115 let mut future = pin!(future);
116 match future.as_mut().poll(&mut cx) {
117 Poll::Ready(output) => output,
118 Poll::Pending => panic!("future unexpectedly pending"),
119 }
120 }
121
122 #[test]
123 fn empty_batch_returns_invalid_request() {
124 let response = run_ready(handle_request(Request::Batch(vec![]), TestHandler));
125
126 assert_eq!(response, Some(Response::error(RpcError::invalid_request())));
127 }
128
129 #[test]
130 fn notification_only_batch_returns_no_response() {
131 let response = run_ready(handle_request(Request::Batch(vec![notification()]), TestHandler));
132
133 assert_eq!(response, None);
134 }
135
136 #[test]
137 fn http_notification_only_batch_returns_no_content() {
138 let response = run_ready(handle(
139 State((TestHandler, ())),
140 Ok(Json(Request::Batch(vec![notification()]))),
141 ));
142
143 assert_eq!(response.status(), StatusCode::NO_CONTENT);
144 assert!(run_ready(to_bytes(response.into_body(), usize::MAX)).unwrap().is_empty());
145 }
146}