Skip to main content

anvil_server/
handler.rs

1use crate::RpcHandler;
2use anvil_rpc::{
3    error::RpcError,
4    request::{Request, RpcCall},
5    response::{Response, RpcResponse},
6};
7use axum::{
8    Json,
9    extract::{State, rejection::JsonRejection},
10    http::StatusCode,
11    response::{IntoResponse, Response as AxumResponse},
12};
13use futures::{FutureExt, future};
14
15/// Handles incoming JSON-RPC Request.
16// NOTE: `handler` must come first because the `request` extractor consumes the request body.
17pub async fn handle<Http: RpcHandler, Ws>(
18    State((handler, _)): State<(Http, Ws)>,
19    request: Result<Json<Request>, JsonRejection>,
20) -> AxumResponse {
21    match request {
22        Ok(Json(req)) => handle_request(req, handler)
23            .await
24            .map(Json)
25            .map(IntoResponse::into_response)
26            .unwrap_or_else(|| StatusCode::NO_CONTENT.into_response()),
27        Err(err) => {
28            warn!(target: "rpc", ?err, "invalid request");
29            Json(Response::error(RpcError::invalid_request())).into_response()
30        }
31    }
32}
33
34/// Handle the JSON-RPC [Request]
35///
36/// This will try to deserialize the payload into the request type of the handler and if successful
37/// invoke the handler
38pub async fn handle_request<Handler: RpcHandler>(
39    req: Request,
40    handler: Handler,
41) -> Option<Response> {
42    /// processes batch calls
43    fn responses_as_batch(outs: Vec<Option<RpcResponse>>) -> Option<Response> {
44        let batch: Vec<_> = outs.into_iter().flatten().collect();
45        (!batch.is_empty()).then_some(Response::Batch(batch))
46    }
47
48    match req {
49        Request::Single(call) => handle_call(call, handler).await.map(Response::Single),
50        Request::Batch(calls) => {
51            if calls.is_empty() {
52                return Some(Response::error(RpcError::invalid_request()));
53            }
54            future::join_all(calls.into_iter().map(move |call| handle_call(call, handler.clone())))
55                .map(responses_as_batch)
56                .await
57        }
58    }
59}
60
61/// handle a single RPC method call
62async fn handle_call<Handler: RpcHandler>(call: RpcCall, handler: Handler) -> Option<RpcResponse> {
63    match call {
64        RpcCall::MethodCall(call) => {
65            trace!(target: "rpc", id = ?call.id , method = ?call.method,  "handling call");
66            Some(handler.on_call(call).await)
67        }
68        RpcCall::Notification(notification) => {
69            trace!(target: "rpc", method = ?notification.method, "received rpc notification");
70            None
71        }
72        RpcCall::Invalid { id } => {
73            warn!(target: "rpc", ?id,  "invalid rpc call");
74            Some(RpcResponse::invalid_request(id))
75        }
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use anvil_rpc::{
83        request::{RequestParams, RpcNotification, Version},
84        response::ResponseResult,
85    };
86    use axum::body::to_bytes;
87    use std::{
88        pin::pin,
89        task::{Context, Poll, Waker},
90    };
91
92    #[derive(Clone)]
93    struct TestHandler;
94
95    #[async_trait::async_trait]
96    impl RpcHandler for TestHandler {
97        type Request = serde_json::Value;
98
99        async fn on_request(&self, request: Self::Request) -> ResponseResult {
100            ResponseResult::success(request)
101        }
102    }
103
104    fn notification() -> RpcCall {
105        RpcCall::Notification(RpcNotification {
106            jsonrpc: Some(Version::V2),
107            method: "eth_subscribe".to_owned(),
108            params: RequestParams::None,
109        })
110    }
111
112    fn run_ready<F: Future>(future: F) -> F::Output {
113        let waker = Waker::noop();
114        let mut cx = Context::from_waker(waker);
115        let mut future = pin!(future);
116        match future.as_mut().poll(&mut cx) {
117            Poll::Ready(output) => output,
118            Poll::Pending => panic!("future unexpectedly pending"),
119        }
120    }
121
122    #[test]
123    fn empty_batch_returns_invalid_request() {
124        let response = run_ready(handle_request(Request::Batch(vec![]), TestHandler));
125
126        assert_eq!(response, Some(Response::error(RpcError::invalid_request())));
127    }
128
129    #[test]
130    fn notification_only_batch_returns_no_response() {
131        let response = run_ready(handle_request(Request::Batch(vec![notification()]), TestHandler));
132
133        assert_eq!(response, None);
134    }
135
136    #[test]
137    fn http_notification_only_batch_returns_no_content() {
138        let response = run_ready(handle(
139            State((TestHandler, ())),
140            Ok(Json(Request::Batch(vec![notification()]))),
141        ));
142
143        assert_eq!(response.status(), StatusCode::NO_CONTENT);
144        assert!(run_ready(to_bytes(response.into_body(), usize::MAX)).unwrap().is_empty());
145    }
146}