anvil/server/
rpc_handlers.rs1use crate::{
3 EthApi,
4 eth::error::to_rpc_result,
5 pubsub::{EthSubscription, LogsSubscription},
6};
7use alloy_rpc_types::{
8 FilteredParams,
9 pubsub::{Params, SubscriptionKind},
10};
11use anvil_core::eth::{EthPubSub, EthRequest, EthRpcCall, subscription::SubscriptionId};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14use foundry_primitives::FoundryNetwork;
15
16#[derive(Clone)]
18pub struct HttpEthRpcHandler {
19 api: EthApi<FoundryNetwork>,
21}
22
23impl HttpEthRpcHandler {
24 pub const fn new(api: EthApi<FoundryNetwork>) -> Self {
26 Self { api }
27 }
28}
29
30#[async_trait::async_trait]
31impl RpcHandler for HttpEthRpcHandler {
32 type Request = EthRequest;
33
34 async fn on_request(&self, request: Self::Request) -> ResponseResult {
35 self.api.execute(request).await
36 }
37}
38
39#[derive(Clone)]
41pub struct PubSubEthRpcHandler {
42 api: EthApi<FoundryNetwork>,
44}
45
46impl PubSubEthRpcHandler {
47 pub const fn new(api: EthApi<FoundryNetwork>) -> Self {
49 Self { api }
50 }
51
52 async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
54 let id = SubscriptionId::random_hex();
55 trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
56 match pubsub {
57 EthPubSub::EthUnSubscribe(id) => {
58 trace!(target: "rpc::ws", "canceling subscription {:?}", id);
59 let canceled = cx.remove_subscription(&id).is_some();
60 ResponseResult::Success(canceled.into())
61 }
62 EthPubSub::EthSubscribe(kind, raw_params) => {
63 let filter = match &*raw_params {
64 Params::None => None,
65 Params::Logs(filter) => Some(filter.clone()),
66 Params::Bool(_) => None,
67 Params::TransactionReceipts(_) => None,
68 };
69 let params = FilteredParams::new(filter.map(|b| *b));
70
71 let subscription = match kind {
72 SubscriptionKind::Logs => {
73 if raw_params.is_bool() {
74 return ResponseResult::Error(RpcError::invalid_params(
75 "Expected params for logs subscription",
76 ));
77 }
78
79 trace!(target: "rpc::ws", "received logs subscription {:?}", params);
80 let blocks = self.api.new_block_notifications();
81 let storage = self.api.storage_info();
82 EthSubscription::Logs(Box::new(LogsSubscription {
83 blocks,
84 storage,
85 filter: params,
86 queued: Default::default(),
87 id: id.clone(),
88 }))
89 }
90 SubscriptionKind::NewHeads => {
91 trace!(target: "rpc::ws", "received header subscription");
92 let blocks = self.api.new_block_notifications();
93 let storage = self.api.storage_info();
94 EthSubscription::Header(blocks, storage, id.clone())
95 }
96 SubscriptionKind::NewPendingTransactions => {
97 trace!(target: "rpc::ws", "received pending transactions subscription");
98 match *raw_params {
99 Params::Bool(true) => EthSubscription::FullPendingTransactions(
100 self.api.full_pending_transactions(),
101 id.clone(),
102 ),
103 Params::Bool(false) | Params::None => {
104 EthSubscription::PendingTransactions(
105 self.api.new_ready_transactions(),
106 id.clone(),
107 )
108 }
109 _ => {
110 return ResponseResult::Error(RpcError::invalid_params(
111 "Expected boolean parameter for newPendingTransactions",
112 ));
113 }
114 }
115 }
116 SubscriptionKind::TransactionReceipts => {
117 return RpcError::internal_error_with("Not implemented").into();
118 }
119 SubscriptionKind::Syncing => {
120 return RpcError::internal_error_with("Not implemented").into();
121 }
122 };
123
124 cx.add_subscription(id.clone(), subscription);
125
126 trace!(target: "rpc::ws", "created new subscription: {:?}", id);
127 to_rpc_result(id)
128 }
129 }
130 }
131}
132
133#[async_trait::async_trait]
134impl PubSubRpcHandler for PubSubEthRpcHandler {
135 type Request = EthRpcCall;
136 type SubscriptionId = SubscriptionId;
137 type Subscription = EthSubscription<FoundryNetwork>;
138
139 async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
140 trace!(target: "rpc", "received pubsub request {:?}", request);
141 match request {
142 EthRpcCall::Request(request) => self.api.execute(*request).await,
143 EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
144 }
145 }
146}