1use crate::{
3 StorageInfo,
4 eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
5 pubsub::filter_logs,
6};
7use alloy_primitives::{TxHash, map::HashMap};
8use alloy_rpc_types::{Filter, FilteredParams, Log};
9use anvil_core::eth::subscription::SubscriptionId;
10use anvil_rpc::{
11 error::{ErrorCode, RpcError},
12 response::ResponseResult,
13};
14use futures::{Stream, StreamExt, channel::mpsc::Receiver};
15use std::{
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19 time::{Duration, Instant},
20};
21use tokio::sync::Mutex;
22
23type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
25
26pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
28
29#[derive(Clone, Debug)]
31pub struct Filters {
32 active_filters: FilterMap,
34 keepalive: Duration,
36}
37
38impl Filters {
39 pub async fn add_filter(&self, filter: EthFilter) -> String {
41 let id = new_id();
42 trace!(target: "node::filter", "Adding new filter id {}", id);
43 let mut filters = self.active_filters.lock().await;
44 filters.insert(id.clone(), (filter, self.next_deadline()));
45 id
46 }
47
48 pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
49 {
50 let mut filters = self.active_filters.lock().await;
51 if let Some((filter, deadline)) = filters.get_mut(id) {
52 let resp = filter
53 .next()
54 .await
55 .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
56 *deadline = self.next_deadline();
57 return resp;
58 }
59 }
60 warn!(target: "node::filter", "No filter found for {}", id);
61 ResponseResult::error(RpcError {
62 code: ErrorCode::ServerError(-32000),
63 message: "filter not found".into(),
64 data: None,
65 })
66 }
67
68 pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
70 let filters = self.active_filters.lock().await;
71 if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
72 return log.filter.filter.clone();
73 }
74 None
75 }
76
77 pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
79 trace!(target: "node::filter", "Uninstalling filter id {}", id);
80 self.active_filters.lock().await.remove(id).map(|(f, _)| f)
81 }
82
83 pub fn keep_alive(&self) -> Duration {
85 self.keepalive
86 }
87
88 fn next_deadline(&self) -> Instant {
90 Instant::now() + self.keep_alive()
91 }
92
93 pub async fn evict(&self) {
95 trace!(target: "node::filter", "Evicting stale filters");
96 let now = Instant::now();
97 let mut active_filters = self.active_filters.lock().await;
98 active_filters.retain(|id, (_, deadline)| {
99 if now > *deadline {
100 trace!(target: "node::filter",?id, "Evicting stale filter");
101 return false;
102 }
103 true
104 });
105 }
106}
107
108impl Default for Filters {
109 fn default() -> Self {
110 Self {
111 active_filters: Arc::new(Default::default()),
112 keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
113 }
114 }
115}
116
117fn new_id() -> String {
119 SubscriptionId::random_hex().to_string()
120}
121
122#[derive(Debug)]
124pub enum EthFilter {
125 Logs(Box<LogsFilter>),
126 Blocks(NewBlockNotifications),
127 PendingTransactions(Receiver<TxHash>),
128}
129
130impl Stream for EthFilter {
131 type Item = ResponseResult;
132
133 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134 let pin = self.get_mut();
135 match pin {
136 Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
137 Self::Blocks(blocks) => {
138 let mut new_blocks = Vec::new();
139 while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
140 new_blocks.push(block.hash);
141 }
142 Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
143 }
144 Self::PendingTransactions(tx) => {
145 let mut new_txs = Vec::new();
146 while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
147 new_txs.push(tx_hash);
148 }
149 Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
150 }
151 }
152 }
153}
154
155#[derive(Debug)]
157pub struct LogsFilter {
158 pub blocks: NewBlockNotifications,
160 pub storage: StorageInfo,
162 pub filter: FilteredParams,
164 pub historic: Option<Vec<Log>>,
168}
169
170impl LogsFilter {
171 pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
173 let mut logs = self.historic.take().unwrap_or_default();
174 while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
175 let b = self.storage.block(block.hash);
176 let receipts = self.storage.receipts(block.hash);
177 if let (Some(receipts), Some(block)) = (receipts, b) {
178 logs.extend(filter_logs(block, receipts, &self.filter))
179 }
180 }
181 logs
182 }
183}