Skip to main content

anvil/server/
rpc_handlers.rs

1//! Contains RPC handlers
2use crate::{
3    EthApi,
4    eth::error::to_rpc_result,
5    pubsub::{EthSubscription, LogsSubscription},
6};
7use alloy_rpc_types::{
8    FilteredParams,
9    pubsub::{Params, SubscriptionKind},
10};
11use anvil_core::eth::{EthPubSub, EthRequest, EthRpcCall, subscription::SubscriptionId};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14use foundry_primitives::FoundryNetwork;
15
16/// A `RpcHandler` that expects `EthRequest` rpc calls via http
17#[derive(Clone)]
18pub struct HttpEthRpcHandler {
19    /// Access to the node
20    api: EthApi<FoundryNetwork>,
21}
22
23impl HttpEthRpcHandler {
24    /// Creates a new instance of the handler using the given `EthApi`
25    pub const fn new(api: EthApi<FoundryNetwork>) -> Self {
26        Self { api }
27    }
28}
29
30#[async_trait::async_trait]
31impl RpcHandler for HttpEthRpcHandler {
32    type Request = EthRequest;
33
34    async fn on_request(&self, request: Self::Request) -> ResponseResult {
35        self.api.execute(request).await
36    }
37}
38
39/// A `RpcHandler` that expects `EthRequest` rpc calls and `EthPubSub` via pubsub connection
40#[derive(Clone)]
41pub struct PubSubEthRpcHandler {
42    /// Access to the node
43    api: EthApi<FoundryNetwork>,
44}
45
46impl PubSubEthRpcHandler {
47    /// Creates a new instance of the handler using the given `EthApi`
48    pub const fn new(api: EthApi<FoundryNetwork>) -> Self {
49        Self { api }
50    }
51
52    /// Invoked for an ethereum pubsub rpc call
53    async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
54        let id = SubscriptionId::random_hex();
55        trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
56        match pubsub {
57            EthPubSub::EthUnSubscribe(id) => {
58                trace!(target: "rpc::ws", "canceling subscription {:?}", id);
59                let canceled = cx.remove_subscription(&id).is_some();
60                ResponseResult::Success(canceled.into())
61            }
62            EthPubSub::EthSubscribe(kind, raw_params) => {
63                let filter = match &*raw_params {
64                    Params::None => None,
65                    Params::Logs(filter) => Some(filter.clone()),
66                    Params::Bool(_) => None,
67                    Params::TransactionReceipts(_) => None,
68                };
69                let params = FilteredParams::new(filter.map(|b| *b));
70
71                let subscription = match kind {
72                    SubscriptionKind::Logs => {
73                        if raw_params.is_bool() {
74                            return ResponseResult::Error(RpcError::invalid_params(
75                                "Expected params for logs subscription",
76                            ));
77                        }
78
79                        trace!(target: "rpc::ws", "received logs subscription {:?}", params);
80                        let blocks = self.api.new_block_notifications();
81                        let storage = self.api.storage_info();
82                        EthSubscription::Logs(Box::new(LogsSubscription {
83                            blocks,
84                            storage,
85                            filter: params,
86                            queued: Default::default(),
87                            id: id.clone(),
88                        }))
89                    }
90                    SubscriptionKind::NewHeads => {
91                        trace!(target: "rpc::ws", "received header subscription");
92                        let blocks = self.api.new_block_notifications();
93                        let storage = self.api.storage_info();
94                        EthSubscription::Header(blocks, storage, id.clone())
95                    }
96                    SubscriptionKind::NewPendingTransactions => {
97                        trace!(target: "rpc::ws", "received pending transactions subscription");
98                        match *raw_params {
99                            Params::Bool(true) => EthSubscription::FullPendingTransactions(
100                                self.api.full_pending_transactions(),
101                                id.clone(),
102                            ),
103                            Params::Bool(false) | Params::None => {
104                                EthSubscription::PendingTransactions(
105                                    self.api.new_ready_transactions(),
106                                    id.clone(),
107                                )
108                            }
109                            _ => {
110                                return ResponseResult::Error(RpcError::invalid_params(
111                                    "Expected boolean parameter for newPendingTransactions",
112                                ));
113                            }
114                        }
115                    }
116                    SubscriptionKind::TransactionReceipts => {
117                        return RpcError::internal_error_with("Not implemented").into();
118                    }
119                    SubscriptionKind::Syncing => {
120                        return RpcError::internal_error_with("Not implemented").into();
121                    }
122                };
123
124                cx.add_subscription(id.clone(), subscription);
125
126                trace!(target: "rpc::ws", "created new subscription: {:?}", id);
127                to_rpc_result(id)
128            }
129        }
130    }
131}
132
133#[async_trait::async_trait]
134impl PubSubRpcHandler for PubSubEthRpcHandler {
135    type Request = EthRpcCall;
136    type SubscriptionId = SubscriptionId;
137    type Subscription = EthSubscription<FoundryNetwork>;
138
139    async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
140        trace!(target: "rpc", "received pubsub request {:?}", request);
141        match request {
142            EthRpcCall::Request(request) => self.api.execute(*request).await,
143            EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
144        }
145    }
146}