anvil/server/
handler.rs

1//! Contains RPC handlers
2use crate::{
3    EthApi,
4    eth::error::to_rpc_result,
5    pubsub::{EthSubscription, LogsSubscription},
6};
7use alloy_rpc_types::{
8    FilteredParams,
9    pubsub::{Params, SubscriptionKind},
10};
11use anvil_core::eth::{EthPubSub, EthRequest, EthRpcCall, subscription::SubscriptionId};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14
15/// A `RpcHandler` that expects `EthRequest` rpc calls via http
16#[derive(Clone)]
17pub struct HttpEthRpcHandler {
18    /// Access to the node
19    api: EthApi,
20}
21
22impl HttpEthRpcHandler {
23    /// Creates a new instance of the handler using the given `EthApi`
24    pub fn new(api: EthApi) -> Self {
25        Self { api }
26    }
27}
28
29#[async_trait::async_trait]
30impl RpcHandler for HttpEthRpcHandler {
31    type Request = EthRequest;
32
33    async fn on_request(&self, request: Self::Request) -> ResponseResult {
34        self.api.execute(request).await
35    }
36}
37
38/// A `RpcHandler` that expects `EthRequest` rpc calls and `EthPubSub` via pubsub connection
39#[derive(Clone)]
40pub struct PubSubEthRpcHandler {
41    /// Access to the node
42    api: EthApi,
43}
44
45impl PubSubEthRpcHandler {
46    /// Creates a new instance of the handler using the given `EthApi`
47    pub fn new(api: EthApi) -> Self {
48        Self { api }
49    }
50
51    /// Invoked for an ethereum pubsub rpc call
52    async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
53        let id = SubscriptionId::random_hex();
54        trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
55        match pubsub {
56            EthPubSub::EthUnSubscribe(id) => {
57                trace!(target: "rpc::ws", "canceling subscription {:?}", id);
58                let canceled = cx.remove_subscription(&id).is_some();
59                ResponseResult::Success(canceled.into())
60            }
61            EthPubSub::EthSubscribe(kind, raw_params) => {
62                let filter = match &*raw_params {
63                    Params::None => None,
64                    Params::Logs(filter) => Some(filter.clone()),
65                    Params::Bool(_) => None,
66                };
67                let params = FilteredParams::new(filter.map(|b| *b));
68
69                let subscription = match kind {
70                    SubscriptionKind::Logs => {
71                        if raw_params.is_bool() {
72                            return ResponseResult::Error(RpcError::invalid_params(
73                                "Expected params for logs subscription",
74                            ));
75                        }
76
77                        trace!(target: "rpc::ws", "received logs subscription {:?}", params);
78                        let blocks = self.api.new_block_notifications();
79                        let storage = self.api.storage_info();
80                        EthSubscription::Logs(Box::new(LogsSubscription {
81                            blocks,
82                            storage,
83                            filter: params,
84                            queued: Default::default(),
85                            id: id.clone(),
86                        }))
87                    }
88                    SubscriptionKind::NewHeads => {
89                        trace!(target: "rpc::ws", "received header subscription");
90                        let blocks = self.api.new_block_notifications();
91                        let storage = self.api.storage_info();
92                        EthSubscription::Header(blocks, storage, id.clone())
93                    }
94                    SubscriptionKind::NewPendingTransactions => {
95                        trace!(target: "rpc::ws", "received pending transactions subscription");
96                        match *raw_params {
97                            Params::Bool(true) => EthSubscription::FullPendingTransactions(
98                                self.api.full_pending_transactions(),
99                                id.clone(),
100                            ),
101                            Params::Bool(false) | Params::None => {
102                                EthSubscription::PendingTransactions(
103                                    self.api.new_ready_transactions(),
104                                    id.clone(),
105                                )
106                            }
107                            _ => {
108                                return ResponseResult::Error(RpcError::invalid_params(
109                                    "Expected boolean parameter for newPendingTransactions",
110                                ));
111                            }
112                        }
113                    }
114                    SubscriptionKind::Syncing => {
115                        return RpcError::internal_error_with("Not implemented").into();
116                    }
117                };
118
119                cx.add_subscription(id.clone(), subscription);
120
121                trace!(target: "rpc::ws", "created new subscription: {:?}", id);
122                to_rpc_result(id)
123            }
124        }
125    }
126}
127
128#[async_trait::async_trait]
129impl PubSubRpcHandler for PubSubEthRpcHandler {
130    type Request = EthRpcCall;
131    type SubscriptionId = SubscriptionId;
132    type Subscription = EthSubscription;
133
134    async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
135        trace!(target: "rpc", "received pubsub request {:?}", request);
136        match request {
137            EthRpcCall::Request(request) => self.api.execute(*request).await,
138            EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
139        }
140    }
141}