1use crate::{
30 eth::{
31 error::PoolError,
32 pool::transactions::{
33 PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34 TransactionsIterator, TxMarker,
35 },
36 },
37 mem::storage::MinedBlockOutcome,
38};
39use alloy_consensus::Transaction;
40use alloy_primitives::{Address, TxHash};
41use alloy_rpc_types::txpool::TxpoolStatus;
42use anvil_core::eth::transaction::PendingTransaction;
43use futures::channel::mpsc::{Receiver, Sender, channel};
44use parking_lot::{Mutex, RwLock};
45use std::{collections::VecDeque, fmt, sync::Arc};
46
47pub mod transactions;
48
49pub struct Pool<T> {
51 inner: RwLock<PoolInner<T>>,
53 transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl<T> Default for Pool<T> {
58 fn default() -> Self {
59 Self { inner: RwLock::new(PoolInner::default()), transaction_listener: Default::default() }
60 }
61}
62
63impl<T> Pool<T> {
66 pub fn ready_transactions(&self) -> TransactionsIterator<T> {
68 self.inner.read().ready_transactions()
69 }
70
71 pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction<T>>> {
73 self.inner.read().pending_transactions.transactions().collect()
74 }
75
76 pub fn txpool_status(&self) -> TxpoolStatus {
78 let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
80 let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
81 TxpoolStatus { pending, queued }
82 }
83
84 pub fn add_ready_listener(&self) -> Receiver<TxHash> {
87 const TX_LISTENER_BUFFER_SIZE: usize = 2048;
88 let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
89 self.transaction_listener.lock().push(tx);
90 rx
91 }
92
93 pub fn contains(&self, tx_hash: &TxHash) -> bool {
95 self.inner.read().contains(tx_hash)
96 }
97
98 pub fn clear(&self) {
100 let mut pool = self.inner.write();
101 pool.clear();
102 }
103
104 pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
106 self.inner.write().remove_invalid(tx_hashes)
107 }
108
109 pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
111 self.inner.write().remove_transactions_by_address(sender)
112 }
113
114 pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction<T>>> {
120 trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
121 let removed = {
122 let mut pool = self.inner.write();
123 pool.ready_transactions.remove_with_markers(vec![tx], None)
124 };
125 trace!(target: "txpool", "Dropped transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
126
127 let mut dropped = None;
128 if !removed.is_empty() {
129 dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
130 }
131 dropped
132 }
133
134 fn notify_ready(&self, tx: &AddedTransaction<T>) {
136 if let AddedTransaction::Ready(ready) = tx {
137 self.notify_listener(ready.hash);
138 for promoted in ready.promoted.iter().copied() {
139 self.notify_listener(promoted);
140 }
141 }
142 }
143
144 fn notify_listener(&self, hash: TxHash) {
146 let mut listener = self.transaction_listener.lock();
147 for n in (0..listener.len()).rev() {
149 let mut listener_tx = listener.swap_remove(n);
150 let retain = match listener_tx.try_send(hash) {
151 Ok(()) => true,
152 Err(e) => {
153 if e.is_full() {
154 warn!(
155 target: "txpool",
156 "[{:?}] Failed to send tx notification because channel is full",
157 hash,
158 );
159 true
160 } else {
161 false
162 }
163 }
164 };
165 if retain {
166 listener.push(listener_tx)
167 }
168 }
169 }
170}
171
172impl<T: Clone> Pool<T> {
173 pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
175 self.inner.read().get_transaction(hash)
176 }
177}
178
179impl<T: Transaction> Pool<T> {
180 pub fn on_mined_block(&self, outcome: MinedBlockOutcome<T>) -> PruneResult<T> {
184 let MinedBlockOutcome { block_number, included, invalid } = outcome;
185
186 self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
188
189 let res = self
191 .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
192 trace!(target: "txpool", "pruned transaction markers {:?}", res);
193 res
194 }
195
196 pub fn prune_markers(
201 &self,
202 block_number: u64,
203 markers: impl IntoIterator<Item = TxMarker>,
204 ) -> PruneResult<T> {
205 debug!(target: "txpool", ?block_number, "pruning transactions");
206 let res = self.inner.write().prune_markers(markers);
207 for tx in &res.promoted {
208 self.notify_ready(tx);
209 }
210 res
211 }
212
213 pub fn add_transaction(
215 &self,
216 tx: PoolTransaction<T>,
217 ) -> Result<AddedTransaction<T>, PoolError> {
218 let added = self.inner.write().add_transaction(tx)?;
219 self.notify_ready(&added);
220 Ok(added)
221 }
222}
223
224#[derive(Debug)]
228struct PoolInner<T> {
229 ready_transactions: ReadyTransactions<T>,
230 pending_transactions: PendingTransactions<T>,
231}
232
233impl<T> Default for PoolInner<T> {
234 fn default() -> Self {
235 Self { ready_transactions: Default::default(), pending_transactions: Default::default() }
236 }
237}
238
239impl<T> PoolInner<T> {
242 fn ready_transactions(&self) -> TransactionsIterator<T> {
244 self.ready_transactions.get_transactions()
245 }
246
247 fn clear(&mut self) {
249 self.ready_transactions.clear();
250 self.pending_transactions.clear();
251 }
252
253 pub fn transactions_by_sender(
255 &self,
256 sender: Address,
257 ) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
258 let pending_txs = self
259 .pending_transactions
260 .transactions()
261 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
262
263 let ready_txs = self
264 .ready_transactions
265 .get_transactions()
266 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
267
268 pending_txs.chain(ready_txs)
269 }
270
271 fn contains(&self, tx_hash: &TxHash) -> bool {
273 self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
274 }
275
276 fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
278 if tx_hashes.is_empty() {
280 return vec![];
281 }
282 trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
283
284 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
285 removed.extend(self.pending_transactions.remove(tx_hashes));
286
287 trace!(target: "txpool", "Removed invalid transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
288
289 removed
290 }
291
292 fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
294 let tx_hashes =
295 self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
296
297 if tx_hashes.is_empty() {
298 return vec![];
299 }
300
301 trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
302
303 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
304 removed.extend(self.pending_transactions.remove(tx_hashes));
305
306 trace!(target: "txpool", "Removed transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
307
308 removed
309 }
310}
311
312impl<T: Clone> PoolInner<T> {
313 fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
317 if let Some(pending) = self.pending_transactions.get(&hash) {
318 return Some(pending.transaction.pending_transaction.clone());
319 }
320 Some(
321 self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
322 )
323 }
324}
325
326impl<T: Transaction> PoolInner<T> {
327 fn add_transaction(
328 &mut self,
329 tx: PoolTransaction<T>,
330 ) -> Result<AddedTransaction<T>, PoolError> {
331 if self.contains(&tx.hash()) {
332 warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
333 return Err(PoolError::AlreadyImported(tx.hash()));
334 }
335
336 let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
337 trace!(target: "txpool", "[{:?}] ready={}", tx.transaction.hash(), tx.is_ready());
338
339 if !tx.is_ready() {
341 let hash = tx.transaction.hash();
342 self.pending_transactions.add_transaction(tx)?;
343 return Ok(AddedTransaction::Pending { hash });
344 }
345 self.add_ready_transaction(tx)
346 }
347
348 fn add_ready_transaction(
350 &mut self,
351 tx: PendingPoolTransaction<T>,
352 ) -> Result<AddedTransaction<T>, PoolError> {
353 let hash = tx.transaction.hash();
354 trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
355 let mut ready = ReadyTransaction::new(hash);
356
357 let mut tx_queue = VecDeque::from([tx]);
358 let mut is_new_tx = true;
360
361 while let Some(current_tx) = tx_queue.pop_front() {
363 tx_queue.extend(
365 self.pending_transactions.mark_and_unlock(¤t_tx.transaction.provides),
366 );
367
368 let current_hash = current_tx.transaction.hash();
369 match self.ready_transactions.add_transaction(current_tx) {
371 Ok(replaced_transactions) => {
372 if !is_new_tx {
373 ready.promoted.push(current_hash);
374 }
375 ready.removed.extend(replaced_transactions);
377 }
378 Err(err) => {
379 if is_new_tx {
381 debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
382 err);
383 return Err(err);
384 } else {
385 ready.discarded.push(current_hash);
386 }
387 }
388 }
389 is_new_tx = false;
390 }
391
392 if ready.removed.iter().any(|tx| *tx.hash() == hash) {
396 self.ready_transactions.clear_transactions(&ready.promoted);
397 return Err(PoolError::CyclicTransaction);
398 }
399
400 Ok(AddedTransaction::Ready(ready))
401 }
402
403 pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult<T> {
408 let mut imports = vec![];
409 let mut pruned = vec![];
410
411 for marker in markers {
412 imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
414 pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
416 }
417
418 let mut promoted = vec![];
419 let mut failed = vec![];
420 for tx in imports {
421 let hash = tx.transaction.hash();
422 match self.add_ready_transaction(tx) {
423 Ok(res) => promoted.push(res),
424 Err(e) => {
425 warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
426 failed.push(hash)
427 }
428 }
429 }
430
431 PruneResult { pruned, failed, promoted }
432 }
433}
434
435pub struct PruneResult<T> {
437 pub promoted: Vec<AddedTransaction<T>>,
439 pub failed: Vec<TxHash>,
441 pub pruned: Vec<Arc<PoolTransaction<T>>>,
443}
444
445impl<T> fmt::Debug for PruneResult<T> {
446 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
447 write!(fmt, "PruneResult {{ ")?;
448 write!(
449 fmt,
450 "promoted: {:?}, ",
451 self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
452 )?;
453 write!(fmt, "failed: {:?}, ", self.failed)?;
454 write!(
455 fmt,
456 "pruned: {:?}, ",
457 self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
458 )?;
459 write!(fmt, "}}")?;
460 Ok(())
461 }
462}
463
464#[derive(Clone, Debug)]
465pub struct ReadyTransaction<T> {
466 hash: TxHash,
468 promoted: Vec<TxHash>,
470 discarded: Vec<TxHash>,
472 removed: Vec<Arc<PoolTransaction<T>>>,
474}
475
476impl<T> ReadyTransaction<T> {
477 pub fn new(hash: TxHash) -> Self {
478 Self {
479 hash,
480 promoted: Default::default(),
481 discarded: Default::default(),
482 removed: Default::default(),
483 }
484 }
485}
486
487#[derive(Clone, Debug)]
488pub enum AddedTransaction<T> {
489 Ready(ReadyTransaction<T>),
491 Pending {
493 hash: TxHash,
495 },
496}
497
498impl<T> AddedTransaction<T> {
499 pub fn hash(&self) -> &TxHash {
500 match self {
501 Self::Ready(tx) => &tx.hash,
502 Self::Pending { hash } => hash,
503 }
504 }
505}