anvil/server/
handler.rs
1use crate::{
3 EthApi,
4 eth::error::to_rpc_result,
5 pubsub::{EthSubscription, LogsSubscription},
6};
7use alloy_rpc_types::{
8 FilteredParams,
9 pubsub::{Params, SubscriptionKind},
10};
11use anvil_core::eth::{EthPubSub, EthRequest, EthRpcCall, subscription::SubscriptionId};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14
15#[derive(Clone)]
17pub struct HttpEthRpcHandler {
18 api: EthApi,
20}
21
22impl HttpEthRpcHandler {
23 pub fn new(api: EthApi) -> Self {
25 Self { api }
26 }
27}
28
29#[async_trait::async_trait]
30impl RpcHandler for HttpEthRpcHandler {
31 type Request = EthRequest;
32
33 async fn on_request(&self, request: Self::Request) -> ResponseResult {
34 self.api.execute(request).await
35 }
36}
37
38#[derive(Clone)]
40pub struct PubSubEthRpcHandler {
41 api: EthApi,
43}
44
45impl PubSubEthRpcHandler {
46 pub fn new(api: EthApi) -> Self {
48 Self { api }
49 }
50
51 async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
53 let id = SubscriptionId::random_hex();
54 trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
55 match pubsub {
56 EthPubSub::EthUnSubscribe(id) => {
57 trace!(target: "rpc::ws", "canceling subscription {:?}", id);
58 let canceled = cx.remove_subscription(&id).is_some();
59 ResponseResult::Success(canceled.into())
60 }
61 EthPubSub::EthSubscribe(kind, raw_params) => {
62 let filter = match &*raw_params {
63 Params::None => None,
64 Params::Logs(filter) => Some(filter.clone()),
65 Params::Bool(_) => None,
66 };
67 let params = FilteredParams::new(filter.map(|b| *b));
68
69 let subscription = match kind {
70 SubscriptionKind::Logs => {
71 if raw_params.is_bool() {
72 return ResponseResult::Error(RpcError::invalid_params(
73 "Expected params for logs subscription",
74 ));
75 }
76
77 trace!(target: "rpc::ws", "received logs subscription {:?}", params);
78 let blocks = self.api.new_block_notifications();
79 let storage = self.api.storage_info();
80 EthSubscription::Logs(Box::new(LogsSubscription {
81 blocks,
82 storage,
83 filter: params,
84 queued: Default::default(),
85 id: id.clone(),
86 }))
87 }
88 SubscriptionKind::NewHeads => {
89 trace!(target: "rpc::ws", "received header subscription");
90 let blocks = self.api.new_block_notifications();
91 let storage = self.api.storage_info();
92 EthSubscription::Header(blocks, storage, id.clone())
93 }
94 SubscriptionKind::NewPendingTransactions => {
95 trace!(target: "rpc::ws", "received pending transactions subscription");
96 match *raw_params {
97 Params::Bool(true) => EthSubscription::FullPendingTransactions(
98 self.api.full_pending_transactions(),
99 id.clone(),
100 ),
101 Params::Bool(false) | Params::None => {
102 EthSubscription::PendingTransactions(
103 self.api.new_ready_transactions(),
104 id.clone(),
105 )
106 }
107 _ => {
108 return ResponseResult::Error(RpcError::invalid_params(
109 "Expected boolean parameter for newPendingTransactions",
110 ));
111 }
112 }
113 }
114 SubscriptionKind::Syncing => {
115 return RpcError::internal_error_with("Not implemented").into();
116 }
117 };
118
119 cx.add_subscription(id.clone(), subscription);
120
121 trace!(target: "rpc::ws", "created new subscription: {:?}", id);
122 to_rpc_result(id)
123 }
124 }
125 }
126}
127
128#[async_trait::async_trait]
129impl PubSubRpcHandler for PubSubEthRpcHandler {
130 type Request = EthRpcCall;
131 type SubscriptionId = SubscriptionId;
132 type Subscription = EthSubscription;
133
134 async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
135 trace!(target: "rpc", "received pubsub request {:?}", request);
136 match request {
137 EthRpcCall::Request(request) => self.api.execute(*request).await,
138 EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
139 }
140 }
141}