1use crate::{
2 eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
3 StorageInfo,
4};
5use alloy_primitives::{TxHash, B256};
6use alloy_rpc_types::{pubsub::SubscriptionResult, FilteredParams, Log, Transaction};
7use anvil_core::eth::{block::Block, subscription::SubscriptionId, transaction::TypedReceipt};
8use anvil_rpc::{request::Version, response::ResponseResult};
9use futures::{channel::mpsc::Receiver, ready, Stream, StreamExt};
10use serde::Serialize;
11use std::{
12 collections::VecDeque,
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17#[derive(Debug)]
19pub struct LogsSubscription {
20 pub blocks: NewBlockNotifications,
21 pub storage: StorageInfo,
22 pub filter: FilteredParams,
23 pub queued: VecDeque<Log>,
24 pub id: SubscriptionId,
25}
26
27impl LogsSubscription {
28 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
29 loop {
30 if let Some(log) = self.queued.pop_front() {
31 let params = EthSubscriptionParams {
32 subscription: self.id.clone(),
33 result: to_rpc_result(log),
34 };
35 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
36 }
37
38 if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
39 let b = self.storage.block(block.hash);
40 let receipts = self.storage.receipts(block.hash);
41 if let (Some(receipts), Some(block)) = (receipts, b) {
42 let logs = filter_logs(block, receipts, &self.filter);
43 if logs.is_empty() {
44 continue;
48 }
49 self.queued.extend(logs)
50 }
51 } else {
52 return Poll::Ready(None);
53 }
54
55 if self.queued.is_empty() {
56 return Poll::Pending;
57 }
58 }
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
63pub struct EthSubscriptionResponse {
64 jsonrpc: Version,
65 method: &'static str,
66 params: EthSubscriptionParams,
67}
68
69impl EthSubscriptionResponse {
70 pub fn new(params: EthSubscriptionParams) -> Self {
71 Self { jsonrpc: Version::V2, method: "eth_subscription", params }
72 }
73}
74
75#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
77pub struct EthSubscriptionParams {
78 subscription: SubscriptionId,
79 #[serde(flatten)]
80 result: ResponseResult,
81}
82
83#[derive(Debug)]
85pub enum EthSubscription {
86 Logs(Box<LogsSubscription>),
87 Header(NewBlockNotifications, StorageInfo, SubscriptionId),
88 PendingTransactions(Receiver<TxHash>, SubscriptionId),
89}
90
91impl EthSubscription {
92 fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
93 match self {
94 Self::Logs(listener) => listener.poll(cx),
95 Self::Header(blocks, storage, id) => {
96 loop {
100 if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
101 if let Some(block) = storage.eth_block(block.hash) {
102 let params = EthSubscriptionParams {
103 subscription: id.clone(),
104 result: to_rpc_result(block),
105 };
106 return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
107 }
108 } else {
109 return Poll::Ready(None);
110 }
111 }
112 }
113 Self::PendingTransactions(tx, id) => {
114 let res = ready!(tx.poll_next_unpin(cx))
115 .map(SubscriptionResult::<Transaction>::TransactionHash)
116 .map(to_rpc_result)
117 .map(|result| {
118 let params = EthSubscriptionParams { subscription: id.clone(), result };
119 EthSubscriptionResponse::new(params)
120 });
121 Poll::Ready(res)
122 }
123 }
124 }
125}
126
127impl Stream for EthSubscription {
128 type Item = serde_json::Value;
129
130 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
131 let pin = self.get_mut();
132 match ready!(pin.poll_response(cx)) {
133 None => Poll::Ready(None),
134 Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
135 }
136 }
137}
138
139pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredParams) -> Vec<Log> {
141 fn add_log(
143 block_hash: B256,
144 l: &alloy_primitives::Log,
145 block: &Block,
146 params: &FilteredParams,
147 ) -> bool {
148 if params.filter.is_some() {
149 let block_number = block.header.number;
150 if !params.filter_block_range(block_number) ||
151 !params.filter_block_hash(block_hash) ||
152 !params.filter_address(&l.address) ||
153 !params.filter_topics(l.topics())
154 {
155 return false;
156 }
157 }
158 true
159 }
160
161 let block_hash = block.header.hash_slow();
162 let mut logs = vec![];
163 let mut log_index: u32 = 0;
164 for (receipt_index, receipt) in receipts.into_iter().enumerate() {
165 let transaction_hash = block.transactions[receipt_index].hash();
166 for log in receipt.logs() {
167 if add_log(block_hash, log, &block, filter) {
168 logs.push(Log {
169 inner: log.clone(),
170 block_hash: Some(block_hash),
171 block_number: Some(block.header.number),
172 transaction_hash: Some(transaction_hash),
173 transaction_index: Some(receipt_index as u64),
174 log_index: Some(log_index as u64),
175 removed: false,
176 block_timestamp: Some(block.header.timestamp),
177 });
178 }
179 log_index += 1;
180 }
181 }
182 logs
183}