Skip to main content

anvil/
filter.rs

1//! Support for polling based filters
2use crate::{
3    StorageInfo,
4    eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
5    pubsub::filter_logs,
6};
7use alloy_primitives::{TxHash, map::HashMap};
8use alloy_rpc_types::{Filter, FilteredParams, Log};
9use anvil_core::eth::subscription::SubscriptionId;
10use anvil_rpc::{
11    error::{ErrorCode, RpcError},
12    response::ResponseResult,
13};
14use futures::{Stream, StreamExt, channel::mpsc::Receiver};
15use std::{
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19    time::{Duration, Instant},
20};
21use tokio::sync::Mutex;
22
23/// Type alias for filters identified by their id and their expiration timestamp
24type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
25
26/// timeout after which to remove an active filter if it wasn't polled since then
27pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
28
29/// Contains all registered filters
30#[derive(Clone, Debug)]
31pub struct Filters {
32    /// all currently active filters
33    active_filters: FilterMap,
34    /// How long we keep a live the filter after the last poll
35    keepalive: Duration,
36}
37
38impl Filters {
39    /// Adds a new `EthFilter` to the set
40    pub async fn add_filter(&self, filter: EthFilter) -> String {
41        let id = new_id();
42        trace!(target: "node::filter", "Adding new filter id {}", id);
43        let mut filters = self.active_filters.lock().await;
44        filters.insert(id.clone(), (filter, self.next_deadline()));
45        id
46    }
47
48    pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
49        {
50            let mut filters = self.active_filters.lock().await;
51            if let Some((filter, deadline)) = filters.get_mut(id) {
52                let resp = filter
53                    .next()
54                    .await
55                    .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
56                *deadline = self.next_deadline();
57                return resp;
58            }
59        }
60        warn!(target: "node::filter", "No filter found for {}", id);
61        ResponseResult::error(RpcError {
62            code: ErrorCode::ServerError(-32000),
63            message: "filter not found".into(),
64            data: None,
65        })
66    }
67
68    /// Returns the original `Filter` of an `eth_newFilter`
69    pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
70        let filters = self.active_filters.lock().await;
71        if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
72            return log.filter.filter.clone();
73        }
74        None
75    }
76
77    /// Removes the filter identified with the `id`
78    pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
79        trace!(target: "node::filter", "Uninstalling filter id {}", id);
80        self.active_filters.lock().await.remove(id).map(|(f, _)| f)
81    }
82
83    /// The duration how long to keep alive stale filters
84    pub fn keep_alive(&self) -> Duration {
85        self.keepalive
86    }
87
88    /// Returns the timestamp after which a filter should expire
89    fn next_deadline(&self) -> Instant {
90        Instant::now() + self.keep_alive()
91    }
92
93    /// Evict all filters that weren't updated and reached there deadline
94    pub async fn evict(&self) {
95        trace!(target: "node::filter", "Evicting stale filters");
96        let now = Instant::now();
97        let mut active_filters = self.active_filters.lock().await;
98        active_filters.retain(|id, (_, deadline)| {
99            if now > *deadline {
100                trace!(target: "node::filter",?id, "Evicting stale filter");
101                return false;
102            }
103            true
104        });
105    }
106}
107
108impl Default for Filters {
109    fn default() -> Self {
110        Self {
111            active_filters: Arc::new(Default::default()),
112            keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
113        }
114    }
115}
116
117/// returns a new random hex id
118fn new_id() -> String {
119    SubscriptionId::random_hex().to_string()
120}
121
122/// Represents a poll based filter
123#[derive(Debug)]
124pub enum EthFilter {
125    Logs(Box<LogsFilter>),
126    Blocks(NewBlockNotifications),
127    PendingTransactions(Receiver<TxHash>),
128}
129
130impl Stream for EthFilter {
131    type Item = ResponseResult;
132
133    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134        let pin = self.get_mut();
135        match pin {
136            Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
137            Self::Blocks(blocks) => {
138                let mut new_blocks = Vec::new();
139                while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
140                    new_blocks.push(block.hash);
141                }
142                Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
143            }
144            Self::PendingTransactions(tx) => {
145                let mut new_txs = Vec::new();
146                while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
147                    new_txs.push(tx_hash);
148                }
149                Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
150            }
151        }
152    }
153}
154
155/// Listens for new blocks and matching logs emitted in that block
156#[derive(Debug)]
157pub struct LogsFilter {
158    /// listener for new blocks
159    pub blocks: NewBlockNotifications,
160    /// accessor for block storage
161    pub storage: StorageInfo,
162    /// matcher with all provided filter params
163    pub filter: FilteredParams,
164    /// existing logs that matched the filter when the listener was installed
165    ///
166    /// They'll be returned on the first poll
167    pub historic: Option<Vec<Log>>,
168}
169
170impl LogsFilter {
171    /// Returns all the logs since the last time this filter was polled
172    pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
173        let mut logs = self.historic.take().unwrap_or_default();
174        while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
175            let b = self.storage.block(block.hash);
176            let receipts = self.storage.receipts(block.hash);
177            if let (Some(receipts), Some(block)) = (receipts, b) {
178                logs.extend(filter_logs(block, receipts, &self.filter))
179            }
180        }
181        logs
182    }
183}