anvil/
pubsub.rs

1use crate::{
2    eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
3    StorageInfo,
4};
5use alloy_primitives::{TxHash, B256};
6use alloy_rpc_types::{pubsub::SubscriptionResult, FilteredParams, Log, Transaction};
7use anvil_core::eth::{block::Block, subscription::SubscriptionId, transaction::TypedReceipt};
8use anvil_rpc::{request::Version, response::ResponseResult};
9use futures::{channel::mpsc::Receiver, ready, Stream, StreamExt};
10use serde::Serialize;
11use std::{
12    collections::VecDeque,
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17/// Listens for new blocks and matching logs emitted in that block
18#[derive(Debug)]
19pub struct LogsSubscription {
20    pub blocks: NewBlockNotifications,
21    pub storage: StorageInfo,
22    pub filter: FilteredParams,
23    pub queued: VecDeque<Log>,
24    pub id: SubscriptionId,
25}
26
27impl LogsSubscription {
28    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
29        loop {
30            if let Some(log) = self.queued.pop_front() {
31                let params = EthSubscriptionParams {
32                    subscription: self.id.clone(),
33                    result: to_rpc_result(log),
34                };
35                return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
36            }
37
38            if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
39                let b = self.storage.block(block.hash);
40                let receipts = self.storage.receipts(block.hash);
41                if let (Some(receipts), Some(block)) = (receipts, b) {
42                    let logs = filter_logs(block, receipts, &self.filter);
43                    if logs.is_empty() {
44                        // this ensures we poll the receiver until it is pending, in which case the
45                        // underlying `UnboundedReceiver` will register the new waker, see
46                        // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
47                        continue;
48                    }
49                    self.queued.extend(logs)
50                }
51            } else {
52                return Poll::Ready(None);
53            }
54
55            if self.queued.is_empty() {
56                return Poll::Pending;
57            }
58        }
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
63pub struct EthSubscriptionResponse {
64    jsonrpc: Version,
65    method: &'static str,
66    params: EthSubscriptionParams,
67}
68
69impl EthSubscriptionResponse {
70    pub fn new(params: EthSubscriptionParams) -> Self {
71        Self { jsonrpc: Version::V2, method: "eth_subscription", params }
72    }
73}
74
75/// Represents the `params` field of an `eth_subscription` event
76#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
77pub struct EthSubscriptionParams {
78    subscription: SubscriptionId,
79    #[serde(flatten)]
80    result: ResponseResult,
81}
82
83/// Represents an ethereum Websocket subscription
84#[derive(Debug)]
85pub enum EthSubscription {
86    Logs(Box<LogsSubscription>),
87    Header(NewBlockNotifications, StorageInfo, SubscriptionId),
88    PendingTransactions(Receiver<TxHash>, SubscriptionId),
89}
90
91impl EthSubscription {
92    fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Option<EthSubscriptionResponse>> {
93        match self {
94            Self::Logs(listener) => listener.poll(cx),
95            Self::Header(blocks, storage, id) => {
96                // this loop ensures we poll the receiver until it is pending, in which case the
97                // underlying `UnboundedReceiver` will register the new waker, see
98                // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
99                loop {
100                    if let Some(block) = ready!(blocks.poll_next_unpin(cx)) {
101                        if let Some(block) = storage.eth_block(block.hash) {
102                            let params = EthSubscriptionParams {
103                                subscription: id.clone(),
104                                result: to_rpc_result(block),
105                            };
106                            return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
107                        }
108                    } else {
109                        return Poll::Ready(None);
110                    }
111                }
112            }
113            Self::PendingTransactions(tx, id) => {
114                let res = ready!(tx.poll_next_unpin(cx))
115                    .map(SubscriptionResult::<Transaction>::TransactionHash)
116                    .map(to_rpc_result)
117                    .map(|result| {
118                        let params = EthSubscriptionParams { subscription: id.clone(), result };
119                        EthSubscriptionResponse::new(params)
120                    });
121                Poll::Ready(res)
122            }
123        }
124    }
125}
126
127impl Stream for EthSubscription {
128    type Item = serde_json::Value;
129
130    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
131        let pin = self.get_mut();
132        match ready!(pin.poll_response(cx)) {
133            None => Poll::Ready(None),
134            Some(res) => Poll::Ready(Some(serde_json::to_value(res).expect("can't fail;"))),
135        }
136    }
137}
138
139/// Returns all the logs that match the given filter
140pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredParams) -> Vec<Log> {
141    /// Determines whether to add this log
142    fn add_log(
143        block_hash: B256,
144        l: &alloy_primitives::Log,
145        block: &Block,
146        params: &FilteredParams,
147    ) -> bool {
148        if params.filter.is_some() {
149            let block_number = block.header.number;
150            if !params.filter_block_range(block_number) ||
151                !params.filter_block_hash(block_hash) ||
152                !params.filter_address(&l.address) ||
153                !params.filter_topics(l.topics())
154            {
155                return false;
156            }
157        }
158        true
159    }
160
161    let block_hash = block.header.hash_slow();
162    let mut logs = vec![];
163    let mut log_index: u32 = 0;
164    for (receipt_index, receipt) in receipts.into_iter().enumerate() {
165        let transaction_hash = block.transactions[receipt_index].hash();
166        for log in receipt.logs() {
167            if add_log(block_hash, log, &block, filter) {
168                logs.push(Log {
169                    inner: log.clone(),
170                    block_hash: Some(block_hash),
171                    block_number: Some(block.header.number),
172                    transaction_hash: Some(transaction_hash),
173                    transaction_index: Some(receipt_index as u64),
174                    log_index: Some(log_index as u64),
175                    removed: false,
176                    block_timestamp: Some(block.header.timestamp),
177                });
178            }
179            log_index += 1;
180        }
181    }
182    logs
183}