1use crate::{
3 eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
4 pubsub::filter_logs,
5 StorageInfo,
6};
7use alloy_primitives::{map::HashMap, TxHash};
8use alloy_rpc_types::{Filter, FilteredParams, Log};
9use anvil_core::eth::subscription::SubscriptionId;
10use anvil_rpc::response::ResponseResult;
11use futures::{channel::mpsc::Receiver, Stream, StreamExt};
12use std::{
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16 time::{Duration, Instant},
17};
18use tokio::sync::Mutex;
19
20type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
22
23pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
25
26#[derive(Clone, Debug)]
28pub struct Filters {
29 active_filters: FilterMap,
31 keepalive: Duration,
33}
34
35impl Filters {
36 pub async fn add_filter(&self, filter: EthFilter) -> String {
38 let id = new_id();
39 trace!(target: "node::filter", "Adding new filter id {}", id);
40 let mut filters = self.active_filters.lock().await;
41 filters.insert(id.clone(), (filter, self.next_deadline()));
42 id
43 }
44
45 pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
46 {
47 let mut filters = self.active_filters.lock().await;
48 if let Some((filter, deadline)) = filters.get_mut(id) {
49 let resp = filter
50 .next()
51 .await
52 .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
53 *deadline = self.next_deadline();
54 return resp
55 }
56 }
57 warn!(target: "node::filter", "No filter found for {}", id);
58 ResponseResult::success(Vec::<()>::new())
59 }
60
61 pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
63 let filters = self.active_filters.lock().await;
64 if let Some((EthFilter::Logs(ref log), _)) = filters.get(id) {
65 return log.filter.filter.clone()
66 }
67 None
68 }
69
70 pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
72 trace!(target: "node::filter", "Uninstalling filter id {}", id);
73 self.active_filters.lock().await.remove(id).map(|(f, _)| f)
74 }
75
76 pub fn keep_alive(&self) -> Duration {
78 self.keepalive
79 }
80
81 fn next_deadline(&self) -> Instant {
83 Instant::now() + self.keep_alive()
84 }
85
86 pub async fn evict(&self) {
88 trace!(target: "node::filter", "Evicting stale filters");
89 let now = Instant::now();
90 let mut active_filters = self.active_filters.lock().await;
91 active_filters.retain(|id, (_, deadline)| {
92 if now > *deadline {
93 trace!(target: "node::filter",?id, "Evicting stale filter");
94 return false
95 }
96 true
97 });
98 }
99}
100
101impl Default for Filters {
102 fn default() -> Self {
103 Self {
104 active_filters: Arc::new(Default::default()),
105 keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
106 }
107 }
108}
109
110fn new_id() -> String {
112 SubscriptionId::random_hex().to_string()
113}
114
115#[derive(Debug)]
117pub enum EthFilter {
118 Logs(Box<LogsFilter>),
119 Blocks(NewBlockNotifications),
120 PendingTransactions(Receiver<TxHash>),
121}
122
123impl Stream for EthFilter {
124 type Item = ResponseResult;
125
126 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 let pin = self.get_mut();
128 match pin {
129 Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
130 Self::Blocks(blocks) => {
131 let mut new_blocks = Vec::new();
132 while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
133 new_blocks.push(block.hash);
134 }
135 Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
136 }
137 Self::PendingTransactions(tx) => {
138 let mut new_txs = Vec::new();
139 while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
140 new_txs.push(tx_hash);
141 }
142 Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
143 }
144 }
145 }
146}
147
148#[derive(Debug)]
150pub struct LogsFilter {
151 pub blocks: NewBlockNotifications,
153 pub storage: StorageInfo,
155 pub filter: FilteredParams,
157 pub historic: Option<Vec<Log>>,
161}
162
163impl LogsFilter {
164 pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
166 let mut logs = self.historic.take().unwrap_or_default();
167 while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
168 let b = self.storage.block(block.hash);
169 let receipts = self.storage.receipts(block.hash);
170 if let (Some(receipts), Some(block)) = (receipts, b) {
171 logs.extend(filter_logs(block, receipts, &self.filter))
172 }
173 }
174 logs
175 }
176}