Skip to main content

anvil/
filter.rs

1//! Support for polling based filters
2use crate::{
3    StorageInfo,
4    eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
5    pubsub::filter_logs,
6};
7use alloy_consensus::TxReceipt;
8use alloy_network::Network;
9use alloy_primitives::{TxHash, map::HashMap};
10use alloy_rpc_types::{Filter, FilteredParams, Log};
11use anvil_core::eth::subscription::SubscriptionId;
12use anvil_rpc::{
13    error::{ErrorCode, RpcError},
14    response::ResponseResult,
15};
16use futures::{Stream, StreamExt, channel::mpsc::Receiver};
17use std::{
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21    time::{Duration, Instant},
22};
23use tokio::sync::Mutex;
24
25/// Type alias for filters identified by their id and their expiration timestamp
26type FilterMap<N> = Arc<Mutex<HashMap<String, (EthFilter<N>, Instant)>>>;
27
28/// timeout after which to remove an active filter if it wasn't polled since then
29pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
30
31/// Contains all registered filters
32pub struct Filters<N: Network> {
33    /// all currently active filters
34    active_filters: FilterMap<N>,
35    /// How long we keep a live the filter after the last poll
36    keepalive: Duration,
37}
38
39impl<N: Network> Clone for Filters<N> {
40    fn clone(&self) -> Self {
41        Self { active_filters: self.active_filters.clone(), keepalive: self.keepalive }
42    }
43}
44
45impl<N: Network> std::fmt::Debug for Filters<N> {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("Filters").field("keepalive", &self.keepalive).finish_non_exhaustive()
48    }
49}
50
51impl<N: Network> Filters<N> {
52    /// Adds a new `EthFilter` to the set
53    pub async fn add_filter(&self, filter: EthFilter<N>) -> String {
54        let id = new_id();
55        trace!(target: "node::filter", "Adding new filter id {}", id);
56        let mut filters = self.active_filters.lock().await;
57        filters.insert(id.clone(), (filter, self.next_deadline()));
58        id
59    }
60
61    /// Returns the original `Filter` of an `eth_newFilter`
62    pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
63        let filters = self.active_filters.lock().await;
64        if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
65            return log.filter.filter.clone();
66        }
67        None
68    }
69
70    /// Removes the filter identified with the `id`
71    pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter<N>> {
72        trace!(target: "node::filter", "Uninstalling filter id {}", id);
73        self.active_filters.lock().await.remove(id).map(|(f, _)| f)
74    }
75
76    /// The duration how long to keep alive stale filters
77    pub fn keep_alive(&self) -> Duration {
78        self.keepalive
79    }
80
81    /// Returns the timestamp after which a filter should expire
82    fn next_deadline(&self) -> Instant {
83        Instant::now() + self.keep_alive()
84    }
85
86    /// Evict all filters that weren't updated and reached there deadline
87    pub async fn evict(&self) {
88        trace!(target: "node::filter", "Evicting stale filters");
89        let now = Instant::now();
90        let mut active_filters = self.active_filters.lock().await;
91        active_filters.retain(|id, (_, deadline)| {
92            if now > *deadline {
93                trace!(target: "node::filter",?id, "Evicting stale filter");
94                return false;
95            }
96            true
97        });
98    }
99}
100
101impl<N: Network> Filters<N>
102where
103    N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
104{
105    pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
106        {
107            let mut filters = self.active_filters.lock().await;
108            if let Some((filter, deadline)) = filters.get_mut(id) {
109                let resp = filter
110                    .next()
111                    .await
112                    .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
113                *deadline = self.next_deadline();
114                return resp;
115            }
116        }
117        warn!(target: "node::filter", "No filter found for {}", id);
118        ResponseResult::error(RpcError {
119            code: ErrorCode::ServerError(-32000),
120            message: "filter not found".into(),
121            data: None,
122        })
123    }
124}
125
126impl<N: Network> Default for Filters<N> {
127    fn default() -> Self {
128        Self {
129            active_filters: Arc::new(Default::default()),
130            keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
131        }
132    }
133}
134
135/// returns a new random hex id
136fn new_id() -> String {
137    SubscriptionId::random_hex().to_string()
138}
139
140/// Represents a poll based filter
141pub enum EthFilter<N: Network> {
142    Logs(Box<LogsFilter<N>>),
143    Blocks(NewBlockNotifications),
144    PendingTransactions(Receiver<TxHash>),
145}
146
147impl<N: Network> std::fmt::Debug for EthFilter<N> {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        match self {
150            Self::Logs(_) => f.debug_tuple("Logs").finish(),
151            Self::Blocks(_) => f.debug_tuple("Blocks").finish(),
152            Self::PendingTransactions(_) => f.debug_tuple("PendingTransactions").finish(),
153        }
154    }
155}
156
157impl<N: Network> Stream for EthFilter<N>
158where
159    N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
160{
161    type Item = ResponseResult;
162
163    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
164        let pin = self.get_mut();
165        match pin {
166            Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
167            Self::Blocks(blocks) => {
168                let mut new_blocks = Vec::new();
169                while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
170                    new_blocks.push(block.hash);
171                }
172                Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
173            }
174            Self::PendingTransactions(tx) => {
175                let mut new_txs = Vec::new();
176                while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
177                    new_txs.push(tx_hash);
178                }
179                Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
180            }
181        }
182    }
183}
184
185/// Listens for new blocks and matching logs emitted in that block
186pub struct LogsFilter<N: Network> {
187    /// listener for new blocks
188    pub blocks: NewBlockNotifications,
189    /// accessor for block storage
190    pub storage: StorageInfo<N>,
191    /// matcher with all provided filter params
192    pub filter: FilteredParams,
193    /// existing logs that matched the filter when the listener was installed
194    ///
195    /// They'll be returned on the first poll
196    pub historic: Option<Vec<Log>>,
197}
198
199impl<N: Network> std::fmt::Debug for LogsFilter<N> {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        f.debug_struct("LogsFilter").field("filter", &self.filter).finish_non_exhaustive()
202    }
203}
204
205impl<N: Network> LogsFilter<N>
206where
207    N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
208{
209    /// Returns all the logs since the last time this filter was polled
210    pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
211        let mut logs = self.historic.take().unwrap_or_default();
212        while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
213            let b = self.storage.block(block.hash);
214            let receipts = self.storage.receipts(block.hash);
215            if let (Some(receipts), Some(block)) = (receipts, b) {
216                logs.extend(filter_logs(block, receipts, &self.filter))
217            }
218        }
219        logs
220    }
221}