anvil/
pubsub.rs

1use crate::{
2    eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
3    StorageInfo,
4};
5use alloy_network::AnyRpcTransaction;
6use alloy_primitives::{TxHash, B256};
7use alloy_rpc_types::{pubsub::SubscriptionResult, FilteredParams, Log, Transaction};
8use anvil_core::eth::{block::Block, subscription::SubscriptionId, transaction::TypedReceipt};
9use anvil_rpc::{request::Version, response::ResponseResult};
10use futures::{channel::mpsc::Receiver, ready, Stream, StreamExt};
11use serde::Serialize;
12use std::{
13    collections::VecDeque,
14    pin::Pin,
15    task::{Context, Poll},
16};
17use tokio::sync::mpsc::UnboundedReceiver;
18
19/// Listens for new blocks and matching logs emitted in that block
20#[derive(Debug)]
21pub struct LogsSubscription {
22    pub blocks: NewBlockNotifications,
23    pub storage: StorageInfo,
24    pub filter: FilteredParams,
25    pub queued: VecDeque<Log>,
26    pub id: SubscriptionId,
27}
28
29impl LogsSubscription {
30    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
31        loop {
32            if let Some(log) = self.queued.pop_front() {
33                let params = EthSubscriptionParams {
34                    subscription: self.id.clone(),
35                    result: to_rpc_result(log),
36                };
37                return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
38            }
39
40            if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
41                let b = self.storage.block(block.hash);
42                let receipts = self.storage.receipts(block.hash);
43                if let (Some(receipts), Some(block)) = (receipts, b) {
44                    let logs = filter_logs(block, receipts, &self.filter);
45                    if logs.is_empty() {
46                        // this ensures we poll the receiver until it is pending, in which case the
47                        // underlying `UnboundedReceiver` will register the new waker, see
48                        // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
49                        continue;
50                    }
51                    self.queued.extend(logs)
52                }
53            } else {
54                return Poll::Ready(None);
55            }
56
57            if self.queued.is_empty() {
58                return Poll::Pending;
59            }
60        }
61    }
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
65pub struct EthSubscriptionResponse {
66    jsonrpc: Version,
67    method: &'static str,
68    params: EthSubscriptionParams,
69}
70
71impl EthSubscriptionResponse {
72    pub fn new(params: EthSubscriptionParams) -> Self {
73        Self { jsonrpc: Version::V2, method: "eth_subscription", params }
74    }
75}
76
77/// Represents the `params` field of an `eth_subscription` event
78#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
79pub struct EthSubscriptionParams {
80    subscription: SubscriptionId,
81    #[serde(flatten)]
82    result: ResponseResult,
83}
84
85/// Represents an ethereum Websocket subscription
86#[derive(Debug)]
87pub enum EthSubscription {
88    Logs(Box<LogsSubscription>),
89    Header(NewBlockNotifications, StorageInfo, SubscriptionId),
90    PendingTransactions(Receiver<TxHash>, SubscriptionId),
91    FullPendingTransactions(UnboundedReceiver<AnyRpcTransaction>, SubscriptionId),
92}
93
94impl EthSubscription {
95    fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
96        match self {
97            Self::Logs(listener) => listener.poll(cx),
98            Self::Header(blocks, storage, id) => {
99                // this loop ensures we poll the receiver until it is pending, in which case the
100                // underlying `UnboundedReceiver` will register the new waker, see
101                // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
102                loop {
103                    if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
104                        if let Some(block) = storage.eth_block(block.hash) {
105                            let params = EthSubscriptionParams {
106                                subscription: id.clone(),
107                                result: to_rpc_result(block),
108                            };
109                            return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
110                        }
111                    } else {
112                        return Poll::Ready(None);
113                    }
114                }
115            }
116            Self::PendingTransactions(tx, id) => {
117                let res = ready!(tx.poll_next_unpin(cx))
118                    .map(SubscriptionResult::<Transaction>::TransactionHash)
119                    .map(to_rpc_result)
120                    .map(|result| {
121                        let params = EthSubscriptionParams { subscription: id.clone(), result };
122                        EthSubscriptionResponse::new(params)
123                    });
124                Poll::Ready(res)
125            }
126            Self::FullPendingTransactions(tx, id) => {
127                let res = ready!(tx.poll_recv(cx)).map(to_rpc_result).map(|result| {
128                    let params = EthSubscriptionParams { subscription: id.clone(), result };
129                    EthSubscriptionResponse::new(params)
130                });
131                Poll::Ready(res)
132            }
133        }
134    }
135}
136
137impl Stream for EthSubscription {
138    type Item = serde_json::Value;
139
140    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141        let pin = self.get_mut();
142        match ready!(pin.poll_response(cx)) {
143            None => Poll::Ready(None),
144            Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
145        }
146    }
147}
148
149/// Returns all the logs that match the given filter
150pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredParams) -> Vec<Log> {
151    /// Determines whether to add this log
152    fn add_log(
153        block_hash: B256,
154        l: &alloy_primitives::Log,
155        block: &Block,
156        params: &FilteredParams,
157    ) -> bool {
158        if params.filter.is_some() {
159            let block_number = block.header.number;
160            if !params.filter_block_range(block_number) ||
161                !params.filter_block_hash(block_hash) ||
162                !params.filter_address(&l.address) ||
163                !params.filter_topics(l.topics())
164            {
165                return false;
166            }
167        }
168        true
169    }
170
171    let block_hash = block.header.hash_slow();
172    let mut logs = vec![];
173    let mut log_index: u32 = 0;
174    for (receipt_index, receipt) in receipts.into_iter().enumerate() {
175        let transaction_hash = block.transactions[receipt_index].hash();
176        for log in receipt.logs() {
177            if add_log(block_hash, log, &block, filter) {
178                logs.push(Log {
179                    inner: log.clone(),
180                    block_hash: Some(block_hash),
181                    block_number: Some(block.header.number),
182                    transaction_hash: Some(transaction_hash),
183                    transaction_index: Some(receipt_index as u64),
184                    log_index: Some(log_index as u64),
185                    removed: false,
186                    block_timestamp: Some(block.header.timestamp),
187                });
188            }
189            log_index += 1;
190        }
191    }
192    logs
193}