anvil/server/
rpc_handlers.rs1use crate::{
3 EthApi,
4 eth::error::to_rpc_result,
5 pubsub::{EthSubscription, LogsSubscription},
6};
7use alloy_rpc_types::{
8 FilteredParams,
9 pubsub::{Params, SubscriptionKind},
10};
11use anvil_core::eth::{EthPubSub, EthRequest, EthRpcCall, subscription::SubscriptionId};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14use foundry_primitives::FoundryNetwork;
15
16#[derive(Clone)]
18pub struct HttpEthRpcHandler {
19 api: EthApi<FoundryNetwork>,
21}
22
23impl HttpEthRpcHandler {
24 pub fn new(api: EthApi<FoundryNetwork>) -> Self {
26 Self { api }
27 }
28}
29
30#[async_trait::async_trait]
31impl RpcHandler for HttpEthRpcHandler {
32 type Request = EthRequest;
33
34 async fn on_request(&self, request: Self::Request) -> ResponseResult {
35 self.api.execute(request).await
36 }
37}
38
39#[derive(Clone)]
41pub struct PubSubEthRpcHandler {
42 api: EthApi<FoundryNetwork>,
44}
45
46impl PubSubEthRpcHandler {
47 pub fn new(api: EthApi<FoundryNetwork>) -> Self {
49 Self { api }
50 }
51
52 async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
54 let id = SubscriptionId::random_hex();
55 trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
56 match pubsub {
57 EthPubSub::EthUnSubscribe(id) => {
58 trace!(target: "rpc::ws", "canceling subscription {:?}", id);
59 let canceled = cx.remove_subscription(&id).is_some();
60 ResponseResult::Success(canceled.into())
61 }
62 EthPubSub::EthSubscribe(kind, raw_params) => {
63 let filter = match &*raw_params {
64 Params::None => None,
65 Params::Logs(filter) => Some(filter.clone()),
66 Params::Bool(_) => None,
67 };
68 let params = FilteredParams::new(filter.map(|b| *b));
69
70 let subscription = match kind {
71 SubscriptionKind::Logs => {
72 if raw_params.is_bool() {
73 return ResponseResult::Error(RpcError::invalid_params(
74 "Expected params for logs subscription",
75 ));
76 }
77
78 trace!(target: "rpc::ws", "received logs subscription {:?}", params);
79 let blocks = self.api.new_block_notifications();
80 let storage = self.api.storage_info();
81 EthSubscription::Logs(Box::new(LogsSubscription {
82 blocks,
83 storage,
84 filter: params,
85 queued: Default::default(),
86 id: id.clone(),
87 }))
88 }
89 SubscriptionKind::NewHeads => {
90 trace!(target: "rpc::ws", "received header subscription");
91 let blocks = self.api.new_block_notifications();
92 let storage = self.api.storage_info();
93 EthSubscription::Header(blocks, storage, id.clone())
94 }
95 SubscriptionKind::NewPendingTransactions => {
96 trace!(target: "rpc::ws", "received pending transactions subscription");
97 match *raw_params {
98 Params::Bool(true) => EthSubscription::FullPendingTransactions(
99 self.api.full_pending_transactions(),
100 id.clone(),
101 ),
102 Params::Bool(false) | Params::None => {
103 EthSubscription::PendingTransactions(
104 self.api.new_ready_transactions(),
105 id.clone(),
106 )
107 }
108 _ => {
109 return ResponseResult::Error(RpcError::invalid_params(
110 "Expected boolean parameter for newPendingTransactions",
111 ));
112 }
113 }
114 }
115 SubscriptionKind::Syncing => {
116 return RpcError::internal_error_with("Not implemented").into();
117 }
118 };
119
120 cx.add_subscription(id.clone(), subscription);
121
122 trace!(target: "rpc::ws", "created new subscription: {:?}", id);
123 to_rpc_result(id)
124 }
125 }
126 }
127}
128
129#[async_trait::async_trait]
130impl PubSubRpcHandler for PubSubEthRpcHandler {
131 type Request = EthRpcCall;
132 type SubscriptionId = SubscriptionId;
133 type Subscription = EthSubscription<FoundryNetwork>;
134
135 async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
136 trace!(target: "rpc", "received pubsub request {:?}", request);
137 match request {
138 EthRpcCall::Request(request) => self.api.execute(*request).await,
139 EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
140 }
141 }
142}