1use crate::{
2 eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
3 StorageInfo,
4};
5use alloy_network::AnyRpcTransaction;
6use alloy_primitives::{TxHash, B256};
7use alloy_rpc_types::{pubsub::SubscriptionResult, FilteredParams, Log, Transaction};
8use anvil_core::eth::{block::Block, subscription::SubscriptionId, transaction::TypedReceipt};
9use anvil_rpc::{request::Version, response::ResponseResult};
10use futures::{channel::mpsc::Receiver, ready, Stream, StreamExt};
11use serde::Serialize;
12use std::{
13 collections::VecDeque,
14 pin::Pin,
15 task::{Context, Poll},
16};
17use tokio::sync::mpsc::UnboundedReceiver;
18
19#[derive(Debug)]
21pub struct LogsSubscription {
22 pub blocks: NewBlockNotifications,
23 pub storage: StorageInfo,
24 pub filter: FilteredParams,
25 pub queued: VecDeque<Log>,
26 pub id: SubscriptionId,
27}
28
29impl LogsSubscription {
30 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
31 loop {
32 if let Some(log) = self.queued.pop_front() {
33 let params = EthSubscriptionParams {
34 subscription: self.id.clone(),
35 result: to_rpc_result(log),
36 };
37 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
38 }
39
40 if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
41 let b = self.storage.block(block.hash);
42 let receipts = self.storage.receipts(block.hash);
43 if let (Some(receipts), Some(block)) = (receipts, b) {
44 let logs = filter_logs(block, receipts, &self.filter);
45 if logs.is_empty() {
46 continue;
50 }
51 self.queued.extend(logs)
52 }
53 } else {
54 return Poll::Ready(None);
55 }
56
57 if self.queued.is_empty() {
58 return Poll::Pending;
59 }
60 }
61 }
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
65pub struct EthSubscriptionResponse {
66 jsonrpc: Version,
67 method: &'static str,
68 params: EthSubscriptionParams,
69}
70
71impl EthSubscriptionResponse {
72 pub fn new(params: EthSubscriptionParams) -> Self {
73 Self { jsonrpc: Version::V2, method: "eth_subscription", params }
74 }
75}
76
77#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
79pub struct EthSubscriptionParams {
80 subscription: SubscriptionId,
81 #[serde(flatten)]
82 result: ResponseResult,
83}
84
85#[derive(Debug)]
87pub enum EthSubscription {
88 Logs(Box<LogsSubscription>),
89 Header(NewBlockNotifications, StorageInfo, SubscriptionId),
90 PendingTransactions(Receiver<TxHash>, SubscriptionId),
91 FullPendingTransactions(UnboundedReceiver<AnyRpcTransaction>, SubscriptionId),
92}
93
94impl EthSubscription {
95 fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
96 match self {
97 Self::Logs(listener) => listener.poll(cx),
98 Self::Header(blocks, storage, id) => {
99 loop {
103 if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
104 if let Some(block) = storage.eth_block(block.hash) {
105 let params = EthSubscriptionParams {
106 subscription: id.clone(),
107 result: to_rpc_result(block),
108 };
109 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
110 }
111 } else {
112 return Poll::Ready(None);
113 }
114 }
115 }
116 Self::PendingTransactions(tx, id) => {
117 let res = ready!(tx.poll_next_unpin(cx))
118 .map(SubscriptionResult::<Transaction>::TransactionHash)
119 .map(to_rpc_result)
120 .map(|result| {
121 let params = EthSubscriptionParams { subscription: id.clone(), result };
122 EthSubscriptionResponse::new(params)
123 });
124 Poll::Ready(res)
125 }
126 Self::FullPendingTransactions(tx, id) => {
127 let res = ready!(tx.poll_recv(cx)).map(to_rpc_result).map(|result| {
128 let params = EthSubscriptionParams { subscription: id.clone(), result };
129 EthSubscriptionResponse::new(params)
130 });
131 Poll::Ready(res)
132 }
133 }
134 }
135}
136
137impl Stream for EthSubscription {
138 type Item = serde_json::Value;
139
140 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 let pin = self.get_mut();
142 match ready!(pin.poll_response(cx)) {
143 None => Poll::Ready(None),
144 Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
145 }
146 }
147}
148
149pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredParams) -> Vec<Log> {
151 fn add_log(
153 block_hash: B256,
154 l: &alloy_primitives::Log,
155 block: &Block,
156 params: &FilteredParams,
157 ) -> bool {
158 if params.filter.is_some() {
159 let block_number = block.header.number;
160 if !params.filter_block_range(block_number) ||
161 !params.filter_block_hash(block_hash) ||
162 !params.filter_address(&l.address) ||
163 !params.filter_topics(l.topics())
164 {
165 return false;
166 }
167 }
168 true
169 }
170
171 let block_hash = block.header.hash_slow();
172 let mut logs = vec![];
173 let mut log_index: u32 = 0;
174 for (receipt_index, receipt) in receipts.into_iter().enumerate() {
175 let transaction_hash = block.transactions[receipt_index].hash();
176 for log in receipt.logs() {
177 if add_log(block_hash, log, &block, filter) {
178 logs.push(Log {
179 inner: log.clone(),
180 block_hash: Some(block_hash),
181 block_number: Some(block.header.number),
182 transaction_hash: Some(transaction_hash),
183 transaction_index: Some(receipt_index as u64),
184 log_index: Some(log_index as u64),
185 removed: false,
186 block_timestamp: Some(block.header.timestamp),
187 });
188 }
189 log_index += 1;
190 }
191 }
192 logs
193}