anvil/
filter.rs

1//! Support for polling based filters
2use crate::{
3    eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
4    pubsub::filter_logs,
5    StorageInfo,
6};
7use alloy_primitives::{map::HashMap, TxHash};
8use alloy_rpc_types::{Filter, FilteredParams, Log};
9use anvil_core::eth::subscription::SubscriptionId;
10use anvil_rpc::response::ResponseResult;
11use futures::{channel::mpsc::Receiver, Stream, StreamExt};
12use std::{
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16    time::{Duration, Instant},
17};
18use tokio::sync::Mutex;
19
20/// Type alias for filters identified by their id and their expiration timestamp
21type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
22
23/// timeout after which to remove an active filter if it wasn't polled since then
24pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
25
26/// Contains all registered filters
27#[derive(Clone, Debug)]
28pub struct Filters {
29    /// all currently active filters
30    active_filters: FilterMap,
31    /// How long we keep a live the filter after the last poll
32    keepalive: Duration,
33}
34
35impl Filters {
36    /// Adds a new `EthFilter` to the set
37    pub async fn add_filter(&self, filter: EthFilter) -> String {
38        let id = new_id();
39        trace!(target: "node::filter", "Adding new filter id {}", id);
40        let mut filters = self.active_filters.lock().await;
41        filters.insert(id.clone(), (filter, self.next_deadline()));
42        id
43    }
44
45    pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
46        {
47            let mut filters = self.active_filters.lock().await;
48            if let Some((filter, deadline)) = filters.get_mut(id) {
49                let resp = filter
50                    .next()
51                    .await
52                    .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
53                *deadline = self.next_deadline();
54                return resp
55            }
56        }
57        warn!(target: "node::filter", "No filter found for {}", id);
58        ResponseResult::success(Vec::<()>::new())
59    }
60
61    /// Returns the original `Filter` of an `eth_newFilter`
62    pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
63        let filters = self.active_filters.lock().await;
64        if let Some((EthFilter::Logs(ref log), _)) = filters.get(id) {
65            return log.filter.filter.clone()
66        }
67        None
68    }
69
70    /// Removes the filter identified with the `id`
71    pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
72        trace!(target: "node::filter", "Uninstalling filter id {}", id);
73        self.active_filters.lock().await.remove(id).map(|(f, _)| f)
74    }
75
76    /// The duration how long to keep alive stale filters
77    pub fn keep_alive(&self) -> Duration {
78        self.keepalive
79    }
80
81    /// Returns the timestamp after which a filter should expire
82    fn next_deadline(&self) -> Instant {
83        Instant::now() + self.keep_alive()
84    }
85
86    /// Evict all filters that weren't updated and reached there deadline
87    pub async fn evict(&self) {
88        trace!(target: "node::filter", "Evicting stale filters");
89        let now = Instant::now();
90        let mut active_filters = self.active_filters.lock().await;
91        active_filters.retain(|id, (_, deadline)| {
92            if now > *deadline {
93                trace!(target: "node::filter",?id, "Evicting stale filter");
94                return false
95            }
96            true
97        });
98    }
99}
100
101impl Default for Filters {
102    fn default() -> Self {
103        Self {
104            active_filters: Arc::new(Default::default()),
105            keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
106        }
107    }
108}
109
110/// returns a new random hex id
111fn new_id() -> String {
112    SubscriptionId::random_hex().to_string()
113}
114
115/// Represents a poll based filter
116#[derive(Debug)]
117pub enum EthFilter {
118    Logs(Box<LogsFilter>),
119    Blocks(NewBlockNotifications),
120    PendingTransactions(Receiver<TxHash>),
121}
122
123impl Stream for EthFilter {
124    type Item = ResponseResult;
125
126    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        let pin = self.get_mut();
128        match pin {
129            Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
130            Self::Blocks(blocks) => {
131                let mut new_blocks = Vec::new();
132                while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
133                    new_blocks.push(block.hash);
134                }
135                Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
136            }
137            Self::PendingTransactions(tx) => {
138                let mut new_txs = Vec::new();
139                while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
140                    new_txs.push(tx_hash);
141                }
142                Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
143            }
144        }
145    }
146}
147
148/// Listens for new blocks and matching logs emitted in that block
149#[derive(Debug)]
150pub struct LogsFilter {
151    /// listener for new blocks
152    pub blocks: NewBlockNotifications,
153    /// accessor for block storage
154    pub storage: StorageInfo,
155    /// matcher with all provided filter params
156    pub filter: FilteredParams,
157    /// existing logs that matched the filter when the listener was installed
158    ///
159    /// They'll be returned on the first pill
160    pub historic: Option<Vec<Log>>,
161}
162
163impl LogsFilter {
164    /// Returns all the logs since the last time this filter was polled
165    pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
166        let mut logs = self.historic.take().unwrap_or_default();
167        while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
168            let b = self.storage.block(block.hash);
169            let receipts = self.storage.receipts(block.hash);
170            if let (Some(receipts), Some(block)) = (receipts, b) {
171                logs.extend(filter_logs(block, receipts, &self.filter))
172            }
173        }
174        logs
175    }
176}