1use crate::{
30 eth::{
31 error::PoolError,
32 pool::transactions::{
33 PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34 TransactionsIterator, TxMarker,
35 },
36 },
37 mem::storage::MinedBlockOutcome,
38};
39use alloy_consensus::Transaction;
40use alloy_primitives::{Address, TxHash};
41use alloy_rpc_types::txpool::TxpoolStatus;
42use anvil_core::eth::transaction::PendingTransaction;
43use futures::channel::mpsc::{Receiver, Sender, channel};
44use parking_lot::{Mutex, RwLock};
45use std::{collections::VecDeque, fmt, sync::Arc};
46
47pub mod transactions;
48
49pub struct Pool<T> {
51 inner: RwLock<PoolInner<T>>,
53 transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl<T> Default for Pool<T> {
58 fn default() -> Self {
59 Self { inner: RwLock::new(PoolInner::default()), transaction_listener: Default::default() }
60 }
61}
62
63impl<T> Pool<T> {
66 pub fn ready_transactions(&self) -> TransactionsIterator<T> {
68 self.inner.read().ready_transactions()
69 }
70
71 pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction<T>>> {
73 self.inner.read().pending_transactions.transactions().collect()
74 }
75
76 pub fn txpool_status(&self) -> TxpoolStatus {
78 let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
80 let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
81 TxpoolStatus { pending, queued }
82 }
83
84 pub fn add_ready_listener(&self) -> Receiver<TxHash> {
87 const TX_LISTENER_BUFFER_SIZE: usize = 2048;
88 let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
89 self.transaction_listener.lock().push(tx);
90 rx
91 }
92
93 pub fn contains(&self, tx_hash: &TxHash) -> bool {
95 self.inner.read().contains(tx_hash)
96 }
97
98 pub fn clear(&self) {
100 let mut pool = self.inner.write();
101 pool.clear();
102 }
103
104 pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
106 self.inner.write().remove_invalid(tx_hashes)
107 }
108
109 pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
111 self.inner.write().remove_transactions_by_address(sender)
112 }
113
114 pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction<T>>> {
120 trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
121 let removed = {
122 let mut pool = self.inner.write();
123 pool.ready_transactions.remove_with_markers(vec![tx], None)
124 };
125 trace!(target: "txpool", "Dropped transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
126
127 let mut dropped = None;
128 if !removed.is_empty() {
129 dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
130 }
131 dropped
132 }
133
134 fn notify_ready(&self, tx: &AddedTransaction<T>) {
136 if let AddedTransaction::Ready(ready) = tx {
137 self.notify_listener(ready.hash);
138 for promoted in ready.promoted.iter().copied() {
139 self.notify_listener(promoted);
140 }
141 }
142 }
143
144 fn notify_listener(&self, hash: TxHash) {
146 let mut listener = self.transaction_listener.lock();
147 for n in (0..listener.len()).rev() {
149 let mut listener_tx = listener.swap_remove(n);
150 let retain = match listener_tx.try_send(hash) {
151 Ok(()) => true,
152 Err(e) => {
153 if e.is_full() {
154 warn!(
155 target: "txpool",
156 "[{:?}] Failed to send tx notification because channel is full",
157 hash,
158 );
159 true
160 } else {
161 false
162 }
163 }
164 };
165 if retain {
166 listener.push(listener_tx)
167 }
168 }
169 }
170}
171
172impl<T: Clone> Pool<T> {
173 pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
175 self.inner.read().get_transaction(hash)
176 }
177}
178
179impl<T: Transaction> Pool<T> {
180 pub fn on_mined_block(self: &Arc<Self>, outcome: MinedBlockOutcome<T>) -> PruneResult<T> {
184 let MinedBlockOutcome { block_number, included, invalid, not_yet_valid } = outcome;
185
186 self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
188
189 let res = self
191 .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
192 trace!(target: "txpool", "pruned transaction markers {:?}", res);
193
194 if !not_yet_valid.is_empty() {
197 let tx_hashes: Vec<_> = not_yet_valid.iter().map(|tx| tx.hash()).collect();
198 let pool = Arc::clone(self);
199 tokio::spawn(async move {
200 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
201 for hash in tx_hashes {
202 trace!(target: "txpool", "re-notifying for not-yet-valid tx: {:?}", hash);
203 pool.notify_listener(hash);
204 }
205 });
206 }
207
208 res
209 }
210
211 pub fn prune_markers(
216 &self,
217 block_number: u64,
218 markers: impl IntoIterator<Item = TxMarker>,
219 ) -> PruneResult<T> {
220 debug!(target: "txpool", ?block_number, "pruning transactions");
221 let res = self.inner.write().prune_markers(markers);
222 for tx in &res.promoted {
223 self.notify_ready(tx);
224 }
225 res
226 }
227
228 pub fn add_transaction(
230 &self,
231 tx: PoolTransaction<T>,
232 ) -> Result<AddedTransaction<T>, PoolError> {
233 let added = self.inner.write().add_transaction(tx)?;
234 self.notify_ready(&added);
235 Ok(added)
236 }
237}
238
239#[derive(Debug)]
243struct PoolInner<T> {
244 ready_transactions: ReadyTransactions<T>,
245 pending_transactions: PendingTransactions<T>,
246}
247
248impl<T> Default for PoolInner<T> {
249 fn default() -> Self {
250 Self { ready_transactions: Default::default(), pending_transactions: Default::default() }
251 }
252}
253
254impl<T> PoolInner<T> {
257 fn ready_transactions(&self) -> TransactionsIterator<T> {
259 self.ready_transactions.get_transactions()
260 }
261
262 fn clear(&mut self) {
264 self.ready_transactions.clear();
265 self.pending_transactions.clear();
266 }
267
268 pub fn transactions_by_sender(
270 &self,
271 sender: Address,
272 ) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
273 let pending_txs = self
274 .pending_transactions
275 .transactions()
276 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
277
278 let ready_txs = self
279 .ready_transactions
280 .get_transactions()
281 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
282
283 pending_txs.chain(ready_txs)
284 }
285
286 fn contains(&self, tx_hash: &TxHash) -> bool {
288 self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
289 }
290
291 fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
293 if tx_hashes.is_empty() {
295 return vec![];
296 }
297 trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
298
299 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
300 removed.extend(self.pending_transactions.remove(tx_hashes));
301
302 trace!(target: "txpool", "Removed invalid transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
303
304 removed
305 }
306
307 fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
309 let tx_hashes =
310 self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
311
312 if tx_hashes.is_empty() {
313 return vec![];
314 }
315
316 trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
317
318 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
319 removed.extend(self.pending_transactions.remove(tx_hashes));
320
321 trace!(target: "txpool", "Removed transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
322
323 removed
324 }
325}
326
327impl<T: Clone> PoolInner<T> {
328 fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
332 if let Some(pending) = self.pending_transactions.get(&hash) {
333 return Some(pending.transaction.pending_transaction.clone());
334 }
335 Some(
336 self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
337 )
338 }
339}
340
341impl<T: Transaction> PoolInner<T> {
342 fn add_transaction(
343 &mut self,
344 tx: PoolTransaction<T>,
345 ) -> Result<AddedTransaction<T>, PoolError> {
346 if self.contains(&tx.hash()) {
347 debug!(target: "txpool", "[{:?}] Already imported", tx.hash());
348 return Err(PoolError::AlreadyImported(tx.hash()));
349 }
350
351 let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
352 trace!(target: "txpool", "[{:?}] ready={}", tx.transaction.hash(), tx.is_ready());
353
354 if !tx.is_ready() {
356 let hash = tx.transaction.hash();
357 self.pending_transactions.add_transaction(tx)?;
358 return Ok(AddedTransaction::Pending { hash });
359 }
360 self.add_ready_transaction(tx)
361 }
362
363 fn add_ready_transaction(
365 &mut self,
366 tx: PendingPoolTransaction<T>,
367 ) -> Result<AddedTransaction<T>, PoolError> {
368 let hash = tx.transaction.hash();
369 trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
370 let mut ready = ReadyTransaction::new(hash);
371
372 let mut tx_queue = VecDeque::from([tx]);
373 let mut is_new_tx = true;
375
376 while let Some(current_tx) = tx_queue.pop_front() {
378 tx_queue.extend(
380 self.pending_transactions.mark_and_unlock(¤t_tx.transaction.provides),
381 );
382
383 let current_hash = current_tx.transaction.hash();
384 match self.ready_transactions.add_transaction(current_tx) {
386 Ok(replaced_transactions) => {
387 if !is_new_tx {
388 ready.promoted.push(current_hash);
389 }
390 ready.removed.extend(replaced_transactions);
392 }
393 Err(err) => {
394 if is_new_tx {
396 debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
397 err);
398 return Err(err);
399 }
400 ready.discarded.push(current_hash);
401 }
402 }
403 is_new_tx = false;
404 }
405
406 if ready.removed.iter().any(|tx| *tx.hash() == hash) {
410 self.ready_transactions.clear_transactions(&ready.promoted);
411 return Err(PoolError::CyclicTransaction);
412 }
413
414 Ok(AddedTransaction::Ready(ready))
415 }
416
417 pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult<T> {
422 let mut imports = vec![];
423 let mut pruned = vec![];
424
425 for marker in markers {
426 imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
428 pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
430 }
431
432 let mut promoted = vec![];
433 let mut failed = vec![];
434 for tx in imports {
435 let hash = tx.transaction.hash();
436 match self.add_ready_transaction(tx) {
437 Ok(res) => promoted.push(res),
438 Err(e) => {
439 warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
440 failed.push(hash)
441 }
442 }
443 }
444
445 PruneResult { pruned, failed, promoted }
446 }
447}
448
449pub struct PruneResult<T> {
451 pub promoted: Vec<AddedTransaction<T>>,
453 pub failed: Vec<TxHash>,
455 pub pruned: Vec<Arc<PoolTransaction<T>>>,
457}
458
459impl<T> fmt::Debug for PruneResult<T> {
460 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
461 write!(fmt, "PruneResult {{ ")?;
462 write!(
463 fmt,
464 "promoted: {:?}, ",
465 self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
466 )?;
467 write!(fmt, "failed: {:?}, ", self.failed)?;
468 write!(
469 fmt,
470 "pruned: {:?}, ",
471 self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
472 )?;
473 write!(fmt, "}}")?;
474 Ok(())
475 }
476}
477
478#[derive(Clone, Debug)]
479pub struct ReadyTransaction<T> {
480 hash: TxHash,
482 promoted: Vec<TxHash>,
484 discarded: Vec<TxHash>,
486 removed: Vec<Arc<PoolTransaction<T>>>,
488}
489
490impl<T> ReadyTransaction<T> {
491 pub fn new(hash: TxHash) -> Self {
492 Self {
493 hash,
494 promoted: Default::default(),
495 discarded: Default::default(),
496 removed: Default::default(),
497 }
498 }
499}
500
501#[derive(Clone, Debug)]
502pub enum AddedTransaction<T> {
503 Ready(ReadyTransaction<T>),
505 Pending {
507 hash: TxHash,
509 },
510}
511
512impl<T> AddedTransaction<T> {
513 pub fn hash(&self) -> &TxHash {
514 match self {
515 Self::Ready(tx) => &tx.hash,
516 Self::Pending { hash } => hash,
517 }
518 }
519}