anvil_server/
handler.rs
1use crate::RpcHandler;
2use anvil_rpc::{
3 error::RpcError,
4 request::{Request, RpcCall},
5 response::{Response, RpcResponse},
6};
7use axum::{
8 extract::{rejection::JsonRejection, State},
9 Json,
10};
11use futures::{future, FutureExt};
12
13pub async fn handle<Http: RpcHandler, Ws>(
16 State((handler, _)): State<(Http, Ws)>,
17 request: Result<Json<Request>, JsonRejection>,
18) -> Json<Response> {
19 Json(match request {
20 Ok(Json(req)) => handle_request(req, handler)
21 .await
22 .unwrap_or_else(|| Response::error(RpcError::invalid_request())),
23 Err(err) => {
24 warn!(target: "rpc", ?err, "invalid request");
25 Response::error(RpcError::invalid_request())
26 }
27 })
28}
29
30pub async fn handle_request<Handler: RpcHandler>(
35 req: Request,
36 handler: Handler,
37) -> Option<Response> {
38 fn responses_as_batch(outs: Vec<Option<RpcResponse>>) -> Option<Response> {
40 let batch: Vec<_> = outs.into_iter().flatten().collect();
41 (!batch.is_empty()).then_some(Response::Batch(batch))
42 }
43
44 match req {
45 Request::Single(call) => handle_call(call, handler).await.map(Response::Single),
46 Request::Batch(calls) => {
47 future::join_all(calls.into_iter().map(move |call| handle_call(call, handler.clone())))
48 .map(responses_as_batch)
49 .await
50 }
51 }
52}
53
54async fn handle_call<Handler: RpcHandler>(call: RpcCall, handler: Handler) -> Option<RpcResponse> {
56 match call {
57 RpcCall::MethodCall(call) => {
58 trace!(target: "rpc", id = ?call.id , method = ?call.method, "handling call");
59 Some(handler.on_call(call).await)
60 }
61 RpcCall::Notification(notification) => {
62 trace!(target: "rpc", method = ?notification.method, "received rpc notification");
63 None
64 }
65 RpcCall::Invalid { id } => {
66 warn!(target: "rpc", ?id, "invalid rpc call");
67 Some(RpcResponse::invalid_request(id))
68 }
69 }
70}