anvil/
pubsub.rs

1use crate::{
2    StorageInfo,
3    eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
4};
5use alloy_network::AnyRpcTransaction;
6use alloy_primitives::{B256, TxHash};
7use alloy_rpc_types::{FilteredParams, Log, Transaction, pubsub::SubscriptionResult};
8use anvil_core::eth::{block::Block, subscription::SubscriptionId};
9use anvil_rpc::{request::Version, response::ResponseResult};
10use foundry_primitives::FoundryReceiptEnvelope;
11use futures::{Stream, StreamExt, channel::mpsc::Receiver, ready};
12use serde::Serialize;
13use std::{
14    collections::VecDeque,
15    pin::Pin,
16    task::{Context, Poll},
17};
18use tokio::sync::mpsc::UnboundedReceiver;
19
20/// Listens for new blocks and matching logs emitted in that block
21#[derive(Debug)]
22pub struct LogsSubscription {
23    pub blocks: NewBlockNotifications,
24    pub storage: StorageInfo,
25    pub filter: FilteredParams,
26    pub queued: VecDeque<Log>,
27    pub id: SubscriptionId,
28}
29
30impl LogsSubscription {
31    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
32        loop {
33            if let Some(log) = self.queued.pop_front() {
34                let params = EthSubscriptionParams {
35                    subscription: self.id.clone(),
36                    result: to_rpc_result(log),
37                };
38                return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
39            }
40
41            if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
42                let b = self.storage.block(block.hash);
43                let receipts = self.storage.receipts(block.hash);
44                if let (Some(receipts), Some(block)) = (receipts, b) {
45                    let logs = filter_logs(block, receipts, &self.filter);
46                    if logs.is_empty() {
47                        // this ensures we poll the receiver until it is pending, in which case the
48                        // underlying `UnboundedReceiver` will register the new waker, see
49                        // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
50                        continue;
51                    }
52                    self.queued.extend(logs)
53                }
54            } else {
55                return Poll::Ready(None);
56            }
57
58            if self.queued.is_empty() {
59                return Poll::Pending;
60            }
61        }
62    }
63}
64
65#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
66pub struct EthSubscriptionResponse {
67    jsonrpc: Version,
68    method: &'static str,
69    params: EthSubscriptionParams,
70}
71
72impl EthSubscriptionResponse {
73    pub fn new(params: EthSubscriptionParams) -> Self {
74        Self { jsonrpc: Version::V2, method: "eth_subscription", params }
75    }
76}
77
78/// Represents the `params` field of an `eth_subscription` event
79#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
80pub struct EthSubscriptionParams {
81    subscription: SubscriptionId,
82    #[serde(flatten)]
83    result: ResponseResult,
84}
85
86/// Represents an ethereum Websocket subscription
87#[derive(Debug)]
88pub enum EthSubscription {
89    Logs(Box<LogsSubscription>),
90    Header(NewBlockNotifications, StorageInfo, SubscriptionId),
91    PendingTransactions(Receiver<TxHash>, SubscriptionId),
92    FullPendingTransactions(UnboundedReceiver<AnyRpcTransaction>, SubscriptionId),
93}
94
95impl EthSubscription {
96    fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
97        match self {
98            Self::Logs(listener) => listener.poll(cx),
99            Self::Header(blocks, storage, id) => {
100                // this loop ensures we poll the receiver until it is pending, in which case the
101                // underlying `UnboundedReceiver` will register the new waker, see
102                // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
103                loop {
104                    if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
105                        if let Some(block) = storage.eth_block(block.hash) {
106                            let params = EthSubscriptionParams {
107                                subscription: id.clone(),
108                                result: to_rpc_result(block),
109                            };
110                            return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
111                        }
112                    } else {
113                        return Poll::Ready(None);
114                    }
115                }
116            }
117            Self::PendingTransactions(tx, id) => {
118                let res = ready!(tx.poll_next_unpin(cx))
119                    .map(SubscriptionResult::<Transaction>::TransactionHash)
120                    .map(to_rpc_result)
121                    .map(|result| {
122                        let params = EthSubscriptionParams { subscription: id.clone(), result };
123                        EthSubscriptionResponse::new(params)
124                    });
125                Poll::Ready(res)
126            }
127            Self::FullPendingTransactions(tx, id) => {
128                let res = ready!(tx.poll_recv(cx)).map(to_rpc_result).map(|result| {
129                    let params = EthSubscriptionParams { subscription: id.clone(), result };
130                    EthSubscriptionResponse::new(params)
131                });
132                Poll::Ready(res)
133            }
134        }
135    }
136}
137
138impl Stream for EthSubscription {
139    type Item = serde_json::Value;
140
141    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142        let pin = self.get_mut();
143        match ready!(pin.poll_response(cx)) {
144            None => Poll::Ready(None),
145            Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
146        }
147    }
148}
149
150/// Returns all the logs that match the given filter
151pub fn filter_logs(
152    block: Block,
153    receipts: Vec<FoundryReceiptEnvelope>,
154    filter: &FilteredParams,
155) -> Vec<Log> {
156    /// Determines whether to add this log
157    fn add_log(
158        block_hash: B256,
159        l: &alloy_primitives::Log,
160        block: &Block,
161        params: &FilteredParams,
162    ) -> bool {
163        if params.filter.is_some() {
164            let block_number = block.header.number;
165            if !params.filter_block_range(block_number)
166                || !params.filter_block_hash(block_hash)
167                || !params.filter_address(&l.address)
168                || !params.filter_topics(l.topics())
169            {
170                return false;
171            }
172        }
173        true
174    }
175
176    let block_hash = block.header.hash_slow();
177    let mut logs = vec![];
178    let mut log_index: u32 = 0;
179    for (receipt_index, receipt) in receipts.into_iter().enumerate() {
180        let transaction_hash = block.body.transactions[receipt_index].hash();
181        for log in receipt.logs() {
182            if add_log(block_hash, log, &block, filter) {
183                logs.push(Log {
184                    inner: log.clone(),
185                    block_hash: Some(block_hash),
186                    block_number: Some(block.header.number),
187                    transaction_hash: Some(transaction_hash),
188                    transaction_index: Some(receipt_index as u64),
189                    log_index: Some(log_index as u64),
190                    removed: false,
191                    block_timestamp: Some(block.header.timestamp),
192                });
193            }
194            log_index += 1;
195        }
196    }
197    logs
198}