1use crate::{
2 StorageInfo,
3 eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
4};
5use alloy_network::AnyRpcTransaction;
6use alloy_primitives::{B256, TxHash};
7use alloy_rpc_types::{FilteredParams, Log, Transaction, pubsub::SubscriptionResult};
8use anvil_core::eth::{block::Block, subscription::SubscriptionId};
9use anvil_rpc::{request::Version, response::ResponseResult};
10use foundry_primitives::FoundryReceiptEnvelope;
11use futures::{Stream, StreamExt, channel::mpsc::Receiver, ready};
12use serde::Serialize;
13use std::{
14 collections::VecDeque,
15 pin::Pin,
16 task::{Context, Poll},
17};
18use tokio::sync::mpsc::UnboundedReceiver;
19
20#[derive(Debug)]
22pub struct LogsSubscription {
23 pub blocks: NewBlockNotifications,
24 pub storage: StorageInfo,
25 pub filter: FilteredParams,
26 pub queued: VecDeque<Log>,
27 pub id: SubscriptionId,
28}
29
30impl LogsSubscription {
31 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
32 loop {
33 if let Some(log) = self.queued.pop_front() {
34 let params = EthSubscriptionParams {
35 subscription: self.id.clone(),
36 result: to_rpc_result(log),
37 };
38 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
39 }
40
41 if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
42 let b = self.storage.block(block.hash);
43 let receipts = self.storage.receipts(block.hash);
44 if let (Some(receipts), Some(block)) = (receipts, b) {
45 let logs = filter_logs(block, receipts, &self.filter);
46 if logs.is_empty() {
47 continue;
51 }
52 self.queued.extend(logs)
53 }
54 } else {
55 return Poll::Ready(None);
56 }
57
58 if self.queued.is_empty() {
59 return Poll::Pending;
60 }
61 }
62 }
63}
64
65#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
66pub struct EthSubscriptionResponse {
67 jsonrpc: Version,
68 method: &'static str,
69 params: EthSubscriptionParams,
70}
71
72impl EthSubscriptionResponse {
73 pub fn new(params: EthSubscriptionParams) -> Self {
74 Self { jsonrpc: Version::V2, method: "eth_subscription", params }
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
80pub struct EthSubscriptionParams {
81 subscription: SubscriptionId,
82 #[serde(flatten)]
83 result: ResponseResult,
84}
85
86#[derive(Debug)]
88pub enum EthSubscription {
89 Logs(Box<LogsSubscription>),
90 Header(NewBlockNotifications, StorageInfo, SubscriptionId),
91 PendingTransactions(Receiver<TxHash>, SubscriptionId),
92 FullPendingTransactions(UnboundedReceiver<AnyRpcTransaction>, SubscriptionId),
93}
94
95impl EthSubscription {
96 fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
97 match self {
98 Self::Logs(listener) => listener.poll(cx),
99 Self::Header(blocks, storage, id) => {
100 loop {
104 if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
105 if let Some(block) = storage.eth_block(block.hash) {
106 let params = EthSubscriptionParams {
107 subscription: id.clone(),
108 result: to_rpc_result(block),
109 };
110 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
111 }
112 } else {
113 return Poll::Ready(None);
114 }
115 }
116 }
117 Self::PendingTransactions(tx, id) => {
118 let res = ready!(tx.poll_next_unpin(cx))
119 .map(SubscriptionResult::<Transaction>::TransactionHash)
120 .map(to_rpc_result)
121 .map(|result| {
122 let params = EthSubscriptionParams { subscription: id.clone(), result };
123 EthSubscriptionResponse::new(params)
124 });
125 Poll::Ready(res)
126 }
127 Self::FullPendingTransactions(tx, id) => {
128 let res = ready!(tx.poll_recv(cx)).map(to_rpc_result).map(|result| {
129 let params = EthSubscriptionParams { subscription: id.clone(), result };
130 EthSubscriptionResponse::new(params)
131 });
132 Poll::Ready(res)
133 }
134 }
135 }
136}
137
138impl Stream for EthSubscription {
139 type Item = serde_json::Value;
140
141 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142 let pin = self.get_mut();
143 match ready!(pin.poll_response(cx)) {
144 None => Poll::Ready(None),
145 Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
146 }
147 }
148}
149
150pub fn filter_logs(
152 block: Block,
153 receipts: Vec<FoundryReceiptEnvelope>,
154 filter: &FilteredParams,
155) -> Vec<Log> {
156 fn add_log(
158 block_hash: B256,
159 l: &alloy_primitives::Log,
160 block: &Block,
161 params: &FilteredParams,
162 ) -> bool {
163 if params.filter.is_some() {
164 let block_number = block.header.number;
165 if !params.filter_block_range(block_number)
166 || !params.filter_block_hash(block_hash)
167 || !params.filter_address(&l.address)
168 || !params.filter_topics(l.topics())
169 {
170 return false;
171 }
172 }
173 true
174 }
175
176 let block_hash = block.header.hash_slow();
177 let mut logs = vec![];
178 let mut log_index: u32 = 0;
179 for (receipt_index, receipt) in receipts.into_iter().enumerate() {
180 let transaction_hash = block.body.transactions[receipt_index].hash();
181 for log in receipt.logs() {
182 if add_log(block_hash, log, &block, filter) {
183 logs.push(Log {
184 inner: log.clone(),
185 block_hash: Some(block_hash),
186 block_number: Some(block.header.number),
187 transaction_hash: Some(transaction_hash),
188 transaction_index: Some(receipt_index as u64),
189 log_index: Some(log_index as u64),
190 removed: false,
191 block_timestamp: Some(block.header.timestamp),
192 });
193 }
194 log_index += 1;
195 }
196 }
197 logs
198}