1use crate::{
3 StorageInfo,
4 eth::{backend::notifications::NewBlockNotifications, error::ToRpcResponseResult},
5 pubsub::filter_logs,
6};
7use alloy_consensus::TxReceipt;
8use alloy_network::Network;
9use alloy_primitives::{TxHash, map::HashMap};
10use alloy_rpc_types::{Filter, FilteredParams, Log};
11use anvil_core::eth::subscription::SubscriptionId;
12use anvil_rpc::{
13 error::{ErrorCode, RpcError},
14 response::ResponseResult,
15};
16use futures::{Stream, StreamExt, channel::mpsc::Receiver};
17use std::{
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21 time::{Duration, Instant},
22};
23use tokio::sync::Mutex;
24
25type FilterMap<N> = Arc<Mutex<HashMap<String, (EthFilter<N>, Instant)>>>;
27
28pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
30
31pub struct Filters<N: Network> {
33 active_filters: FilterMap<N>,
35 keepalive: Duration,
37}
38
39impl<N: Network> Clone for Filters<N> {
40 fn clone(&self) -> Self {
41 Self { active_filters: self.active_filters.clone(), keepalive: self.keepalive }
42 }
43}
44
45impl<N: Network> std::fmt::Debug for Filters<N> {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("Filters").field("keepalive", &self.keepalive).finish_non_exhaustive()
48 }
49}
50
51impl<N: Network> Filters<N> {
52 pub async fn add_filter(&self, filter: EthFilter<N>) -> String {
54 let id = new_id();
55 trace!(target: "node::filter", "Adding new filter id {}", id);
56 let mut filters = self.active_filters.lock().await;
57 filters.insert(id.clone(), (filter, self.next_deadline()));
58 id
59 }
60
61 pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
63 let filters = self.active_filters.lock().await;
64 if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
65 return log.filter.filter.clone();
66 }
67 None
68 }
69
70 pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter<N>> {
72 trace!(target: "node::filter", "Uninstalling filter id {}", id);
73 self.active_filters.lock().await.remove(id).map(|(f, _)| f)
74 }
75
76 pub fn keep_alive(&self) -> Duration {
78 self.keepalive
79 }
80
81 fn next_deadline(&self) -> Instant {
83 Instant::now() + self.keep_alive()
84 }
85
86 pub async fn evict(&self) {
88 trace!(target: "node::filter", "Evicting stale filters");
89 let now = Instant::now();
90 let mut active_filters = self.active_filters.lock().await;
91 active_filters.retain(|id, (_, deadline)| {
92 if now > *deadline {
93 trace!(target: "node::filter",?id, "Evicting stale filter");
94 return false;
95 }
96 true
97 });
98 }
99}
100
101impl<N: Network> Filters<N>
102where
103 N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
104{
105 pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
106 {
107 let mut filters = self.active_filters.lock().await;
108 if let Some((filter, deadline)) = filters.get_mut(id) {
109 let resp = filter
110 .next()
111 .await
112 .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
113 *deadline = self.next_deadline();
114 return resp;
115 }
116 }
117 warn!(target: "node::filter", "No filter found for {}", id);
118 ResponseResult::error(RpcError {
119 code: ErrorCode::ServerError(-32000),
120 message: "filter not found".into(),
121 data: None,
122 })
123 }
124}
125
126impl<N: Network> Default for Filters<N> {
127 fn default() -> Self {
128 Self {
129 active_filters: Arc::new(Default::default()),
130 keepalive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
131 }
132 }
133}
134
135fn new_id() -> String {
137 SubscriptionId::random_hex().to_string()
138}
139
140pub enum EthFilter<N: Network> {
142 Logs(Box<LogsFilter<N>>),
143 Blocks(NewBlockNotifications),
144 PendingTransactions(Receiver<TxHash>),
145}
146
147impl<N: Network> std::fmt::Debug for EthFilter<N> {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 match self {
150 Self::Logs(_) => f.debug_tuple("Logs").finish(),
151 Self::Blocks(_) => f.debug_tuple("Blocks").finish(),
152 Self::PendingTransactions(_) => f.debug_tuple("PendingTransactions").finish(),
153 }
154 }
155}
156
157impl<N: Network> Stream for EthFilter<N>
158where
159 N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
160{
161 type Item = ResponseResult;
162
163 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
164 let pin = self.get_mut();
165 match pin {
166 Self::Logs(logs) => Poll::Ready(Some(Ok(logs.poll(cx)).to_rpc_result())),
167 Self::Blocks(blocks) => {
168 let mut new_blocks = Vec::new();
169 while let Poll::Ready(Some(block)) = blocks.poll_next_unpin(cx) {
170 new_blocks.push(block.hash);
171 }
172 Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
173 }
174 Self::PendingTransactions(tx) => {
175 let mut new_txs = Vec::new();
176 while let Poll::Ready(Some(tx_hash)) = tx.poll_next_unpin(cx) {
177 new_txs.push(tx_hash);
178 }
179 Poll::Ready(Some(Ok(new_txs).to_rpc_result()))
180 }
181 }
182 }
183}
184
185pub struct LogsFilter<N: Network> {
187 pub blocks: NewBlockNotifications,
189 pub storage: StorageInfo<N>,
191 pub filter: FilteredParams,
193 pub historic: Option<Vec<Log>>,
197}
198
199impl<N: Network> std::fmt::Debug for LogsFilter<N> {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 f.debug_struct("LogsFilter").field("filter", &self.filter).finish_non_exhaustive()
202 }
203}
204
205impl<N: Network> LogsFilter<N>
206where
207 N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log> + Clone,
208{
209 pub fn poll(&mut self, cx: &mut Context<'_>) -> Vec<Log> {
211 let mut logs = self.historic.take().unwrap_or_default();
212 while let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
213 let b = self.storage.block(block.hash);
214 let receipts = self.storage.receipts(block.hash);
215 if let (Some(receipts), Some(block)) = (receipts, b) {
216 logs.extend(filter_logs(block, receipts, &self.filter))
217 }
218 }
219 logs
220 }
221}