anvil/server/
handler.rs

1//! Contains RPC handlers
2use crate::{
3    eth::error::to_rpc_result,
4    pubsub::{EthSubscription, LogsSubscription},
5    EthApi,
6};
7use alloy_rpc_types::{
8    pubsub::{Params, SubscriptionKind},
9    FilteredParams,
10};
11use anvil_core::eth::{subscription::SubscriptionId, EthPubSub, EthRequest, EthRpcCall};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14
15/// A `RpcHandler` that expects `EthRequest` rpc calls via http
16#[derive(Clone)]
17pub struct HttpEthRpcHandler {
18    /// Access to the node
19    api: EthApi,
20}
21
22impl HttpEthRpcHandler {
23    /// Creates a new instance of the handler using the given `EthApi`
24    pub fn new(api: EthApi) -> Self {
25        Self { api }
26    }
27}
28
29#[async_trait::async_trait]
30impl RpcHandler for HttpEthRpcHandler {
31    type Request = EthRequest;
32
33    async fn on_request(&self, request: Self::Request) -> ResponseResult {
34        self.api.execute(request).await
35    }
36}
37
38/// A `RpcHandler` that expects `EthRequest` rpc calls and `EthPubSub` via pubsub connection
39#[derive(Clone)]
40pub struct PubSubEthRpcHandler {
41    /// Access to the node
42    api: EthApi,
43}
44
45impl PubSubEthRpcHandler {
46    /// Creates a new instance of the handler using the given `EthApi`
47    pub fn new(api: EthApi) -> Self {
48        Self { api }
49    }
50
51    /// Invoked for an ethereum pubsub rpc call
52    async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
53        let id = SubscriptionId::random_hex();
54        trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
55        match pubsub {
56            EthPubSub::EthUnSubscribe(id) => {
57                trace!(target: "rpc::ws", "canceling subscription {:?}", id);
58                let canceled = cx.remove_subscription(&id).is_some();
59                ResponseResult::Success(canceled.into())
60            }
61            EthPubSub::EthSubscribe(kind, params) => {
62                let filter = match *params {
63                    Params::None => None,
64                    Params::Logs(filter) => Some(*filter),
65                    Params::Bool(_) => {
66                        return ResponseResult::Error(RpcError::invalid_params(
67                            "Expected params for logs subscription",
68                        ))
69                    }
70                };
71                let params = FilteredParams::new(filter);
72
73                let subscription = match kind {
74                    SubscriptionKind::Logs => {
75                        trace!(target: "rpc::ws", "received logs subscription {:?}", params);
76                        let blocks = self.api.new_block_notifications();
77                        let storage = self.api.storage_info();
78                        EthSubscription::Logs(Box::new(LogsSubscription {
79                            blocks,
80                            storage,
81                            filter: params,
82                            queued: Default::default(),
83                            id: id.clone(),
84                        }))
85                    }
86                    SubscriptionKind::NewHeads => {
87                        trace!(target: "rpc::ws", "received header subscription");
88                        let blocks = self.api.new_block_notifications();
89                        let storage = self.api.storage_info();
90                        EthSubscription::Header(blocks, storage, id.clone())
91                    }
92                    SubscriptionKind::NewPendingTransactions => {
93                        trace!(target: "rpc::ws", "received pending transactions subscription");
94                        EthSubscription::PendingTransactions(
95                            self.api.new_ready_transactions(),
96                            id.clone(),
97                        )
98                    }
99                    SubscriptionKind::Syncing => {
100                        return RpcError::internal_error_with("Not implemented").into()
101                    }
102                };
103
104                cx.add_subscription(id.clone(), subscription);
105
106                trace!(target: "rpc::ws", "created new subscription: {:?}", id);
107                to_rpc_result(id)
108            }
109        }
110    }
111}
112
113#[async_trait::async_trait]
114impl PubSubRpcHandler for PubSubEthRpcHandler {
115    type Request = EthRpcCall;
116    type SubscriptionId = SubscriptionId;
117    type Subscription = EthSubscription;
118
119    async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
120        trace!(target: "rpc", "received pubsub request {:?}", request);
121        match request {
122            EthRpcCall::Request(request) => self.api.execute(*request).await,
123            EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
124        }
125    }
126}