anvil/server/
handler.rs
1use crate::{
3 eth::error::to_rpc_result,
4 pubsub::{EthSubscription, LogsSubscription},
5 EthApi,
6};
7use alloy_rpc_types::{
8 pubsub::{Params, SubscriptionKind},
9 FilteredParams,
10};
11use anvil_core::eth::{subscription::SubscriptionId, EthPubSub, EthRequest, EthRpcCall};
12use anvil_rpc::{error::RpcError, response::ResponseResult};
13use anvil_server::{PubSubContext, PubSubRpcHandler, RpcHandler};
14
15#[derive(Clone)]
17pub struct HttpEthRpcHandler {
18 api: EthApi,
20}
21
22impl HttpEthRpcHandler {
23 pub fn new(api: EthApi) -> Self {
25 Self { api }
26 }
27}
28
29#[async_trait::async_trait]
30impl RpcHandler for HttpEthRpcHandler {
31 type Request = EthRequest;
32
33 async fn on_request(&self, request: Self::Request) -> ResponseResult {
34 self.api.execute(request).await
35 }
36}
37
38#[derive(Clone)]
40pub struct PubSubEthRpcHandler {
41 api: EthApi,
43}
44
45impl PubSubEthRpcHandler {
46 pub fn new(api: EthApi) -> Self {
48 Self { api }
49 }
50
51 async fn on_pub_sub(&self, pubsub: EthPubSub, cx: PubSubContext<Self>) -> ResponseResult {
53 let id = SubscriptionId::random_hex();
54 trace!(target: "rpc::ws", "received pubsub request {:?}", pubsub);
55 match pubsub {
56 EthPubSub::EthUnSubscribe(id) => {
57 trace!(target: "rpc::ws", "canceling subscription {:?}", id);
58 let canceled = cx.remove_subscription(&id).is_some();
59 ResponseResult::Success(canceled.into())
60 }
61 EthPubSub::EthSubscribe(kind, params) => {
62 let filter = match *params {
63 Params::None => None,
64 Params::Logs(filter) => Some(*filter),
65 Params::Bool(_) => {
66 return ResponseResult::Error(RpcError::invalid_params(
67 "Expected params for logs subscription",
68 ))
69 }
70 };
71 let params = FilteredParams::new(filter);
72
73 let subscription = match kind {
74 SubscriptionKind::Logs => {
75 trace!(target: "rpc::ws", "received logs subscription {:?}", params);
76 let blocks = self.api.new_block_notifications();
77 let storage = self.api.storage_info();
78 EthSubscription::Logs(Box::new(LogsSubscription {
79 blocks,
80 storage,
81 filter: params,
82 queued: Default::default(),
83 id: id.clone(),
84 }))
85 }
86 SubscriptionKind::NewHeads => {
87 trace!(target: "rpc::ws", "received header subscription");
88 let blocks = self.api.new_block_notifications();
89 let storage = self.api.storage_info();
90 EthSubscription::Header(blocks, storage, id.clone())
91 }
92 SubscriptionKind::NewPendingTransactions => {
93 trace!(target: "rpc::ws", "received pending transactions subscription");
94 EthSubscription::PendingTransactions(
95 self.api.new_ready_transactions(),
96 id.clone(),
97 )
98 }
99 SubscriptionKind::Syncing => {
100 return RpcError::internal_error_with("Not implemented").into()
101 }
102 };
103
104 cx.add_subscription(id.clone(), subscription);
105
106 trace!(target: "rpc::ws", "created new subscription: {:?}", id);
107 to_rpc_result(id)
108 }
109 }
110 }
111}
112
113#[async_trait::async_trait]
114impl PubSubRpcHandler for PubSubEthRpcHandler {
115 type Request = EthRpcCall;
116 type SubscriptionId = SubscriptionId;
117 type Subscription = EthSubscription;
118
119 async fn on_request(&self, request: Self::Request, cx: PubSubContext<Self>) -> ResponseResult {
120 trace!(target: "rpc", "received pubsub request {:?}", request);
121 match request {
122 EthRpcCall::Request(request) => self.api.execute(*request).await,
123 EthRpcCall::PubSub(pubsub) => self.on_pub_sub(pubsub, cx).await,
124 }
125 }
126}