Skip to main content

anvil/eth/pool/
mod.rs

1//! # Transaction Pool implementation
2//!
3//! The transaction pool is responsible for managing a set of transactions that can be included in
4//! upcoming blocks.
5//!
6//! The main task of the pool is to prepare an ordered list of transactions that are ready to be
7//! included in a new block.
8//!
9//! Each imported block can affect the validity of transactions already in the pool.
10//! The miner expects the most up-to-date transactions when attempting to create a new block.
11//! After being included in a block, a transaction should be removed from the pool, this process is
12//! called _pruning_ and due to separation of concerns is triggered externally.
13//! The pool essentially performs following services:
14//!   * import transactions
15//!   * order transactions
16//!   * provide ordered set of transactions that are ready for inclusion
17//!   * prune transactions
18//!
19//! Each transaction in the pool contains markers that it _provides_ or _requires_. This property is
20//! used to determine whether it can be included in a block (transaction is ready) or whether it
21//! still _requires_ other transactions to be mined first (transaction is pending).
22//! A transaction is associated with the nonce of the account it's sent from. A unique identifying
23//! marker for a transaction is therefore the pair `(nonce + account)`. An incoming transaction with
24//! a `nonce > nonce on chain` will _require_ `(nonce -1, account)` first, before it is ready to be
25//! included in a block.
26//!
27//! This implementation is adapted from <https://github.com/paritytech/substrate/tree/master/client/transaction-pool>
28
29use crate::{
30    eth::{
31        error::PoolError,
32        pool::transactions::{
33            PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34            TransactionsIterator, TxMarker,
35        },
36    },
37    mem::storage::MinedBlockOutcome,
38};
39use alloy_consensus::Transaction;
40use alloy_primitives::{Address, TxHash};
41use alloy_rpc_types::txpool::TxpoolStatus;
42use anvil_core::eth::transaction::PendingTransaction;
43use futures::channel::mpsc::{Receiver, Sender, channel};
44use parking_lot::{Mutex, RwLock};
45use std::{collections::VecDeque, fmt, sync::Arc};
46
47pub mod transactions;
48
49/// Transaction pool that performs validation.
50pub struct Pool<T> {
51    /// processes all pending transactions
52    inner: RwLock<PoolInner<T>>,
53    /// listeners for new ready transactions
54    transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl<T> Default for Pool<T> {
58    fn default() -> Self {
59        Self { inner: RwLock::new(PoolInner::default()), transaction_listener: Default::default() }
60    }
61}
62
63// == impl Pool ==
64
65impl<T> Pool<T> {
66    /// Returns an iterator that yields all transactions that are currently ready
67    pub fn ready_transactions(&self) -> TransactionsIterator<T> {
68        self.inner.read().ready_transactions()
69    }
70
71    /// Returns all transactions that are not ready to be included in a block yet
72    pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction<T>>> {
73        self.inner.read().pending_transactions.transactions().collect()
74    }
75
76    /// Returns the number of tx that are ready and queued for further execution
77    pub fn txpool_status(&self) -> TxpoolStatus {
78        // Note: naming differs here compared to geth's `TxpoolStatus`
79        let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
80        let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
81        TxpoolStatus { pending, queued }
82    }
83
84    /// Adds a new transaction listener to the pool that gets notified about every new ready
85    /// transaction
86    pub fn add_ready_listener(&self) -> Receiver<TxHash> {
87        const TX_LISTENER_BUFFER_SIZE: usize = 2048;
88        let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
89        self.transaction_listener.lock().push(tx);
90        rx
91    }
92
93    /// Returns true if this pool already contains the transaction
94    pub fn contains(&self, tx_hash: &TxHash) -> bool {
95        self.inner.read().contains(tx_hash)
96    }
97
98    /// Removes all transactions from the pool
99    pub fn clear(&self) {
100        let mut pool = self.inner.write();
101        pool.clear();
102    }
103
104    /// Remove the given transactions from the pool
105    pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
106        self.inner.write().remove_invalid(tx_hashes)
107    }
108
109    /// Remove transactions by sender
110    pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
111        self.inner.write().remove_transactions_by_address(sender)
112    }
113
114    /// Removes a single transaction from the pool
115    ///
116    /// This is similar to `[Pool::remove_invalid()]` but for a single transaction.
117    ///
118    /// **Note**: this will also drop any transaction that depend on the `tx`
119    pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction<T>>> {
120        trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
121        let removed = {
122            let mut pool = self.inner.write();
123            pool.ready_transactions.remove_with_markers(vec![tx], None)
124        };
125        trace!(target: "txpool", "Dropped transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
126
127        let mut dropped = None;
128        if !removed.is_empty() {
129            dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
130        }
131        dropped
132    }
133
134    /// Notifies listeners if the transaction was added to the ready queue.
135    fn notify_ready(&self, tx: &AddedTransaction<T>) {
136        if let AddedTransaction::Ready(ready) = tx {
137            self.notify_listener(ready.hash);
138            for promoted in ready.promoted.iter().copied() {
139                self.notify_listener(promoted);
140            }
141        }
142    }
143
144    /// notifies all listeners about the transaction
145    fn notify_listener(&self, hash: TxHash) {
146        let mut listener = self.transaction_listener.lock();
147        // this is basically a retain but with mut reference
148        for n in (0..listener.len()).rev() {
149            let mut listener_tx = listener.swap_remove(n);
150            let retain = match listener_tx.try_send(hash) {
151                Ok(()) => true,
152                Err(e) => {
153                    if e.is_full() {
154                        warn!(
155                            target: "txpool",
156                            "[{:?}] Failed to send tx notification because channel is full",
157                            hash,
158                        );
159                        true
160                    } else {
161                        false
162                    }
163                }
164            };
165            if retain {
166                listener.push(listener_tx)
167            }
168        }
169    }
170}
171
172impl<T: Clone> Pool<T> {
173    /// Returns the _pending_ transaction for that `hash` if it exists in the mempool
174    pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
175        self.inner.read().get_transaction(hash)
176    }
177}
178
179impl<T: Transaction> Pool<T> {
180    /// Invoked when a set of transactions ([Self::ready_transactions()]) was executed.
181    ///
182    /// This will remove the transactions from the pool.
183    pub fn on_mined_block(self: &Arc<Self>, outcome: MinedBlockOutcome<T>) -> PruneResult<T> {
184        let MinedBlockOutcome { block_number, included, invalid, not_yet_valid } = outcome;
185
186        // remove invalid transactions from the pool
187        self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
188
189        // prune all the markers the mined transactions provide
190        let res = self
191            .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
192        trace!(target: "txpool", "pruned transaction markers {:?}", res);
193
194        // Re-notify the miner about not-yet-valid transactions so they'll be retried.
195        // Delay by 1 second to let time advance before the next mining attempt.
196        if !not_yet_valid.is_empty() {
197            let tx_hashes: Vec<_> = not_yet_valid.iter().map(|tx| tx.hash()).collect();
198            let pool = Arc::clone(self);
199            tokio::spawn(async move {
200                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
201                for hash in tx_hashes {
202                    trace!(target: "txpool", "re-notifying for not-yet-valid tx: {:?}", hash);
203                    pool.notify_listener(hash);
204                }
205            });
206        }
207
208        res
209    }
210
211    /// Removes ready transactions for the given iterator of identifying markers.
212    ///
213    /// For each marker we can remove transactions in the pool that either provide the marker
214    /// directly or are a dependency of the transaction associated with that marker.
215    pub fn prune_markers(
216        &self,
217        block_number: u64,
218        markers: impl IntoIterator<Item = TxMarker>,
219    ) -> PruneResult<T> {
220        debug!(target: "txpool", ?block_number, "pruning transactions");
221        let res = self.inner.write().prune_markers(markers);
222        for tx in &res.promoted {
223            self.notify_ready(tx);
224        }
225        res
226    }
227
228    /// Adds a new transaction to the pool
229    pub fn add_transaction(
230        &self,
231        tx: PoolTransaction<T>,
232    ) -> Result<AddedTransaction<T>, PoolError> {
233        let added = self.inner.write().add_transaction(tx)?;
234        self.notify_ready(&added);
235        Ok(added)
236    }
237}
238
239/// A Transaction Pool
240///
241/// Contains all transactions that are ready to be executed
242#[derive(Debug)]
243struct PoolInner<T> {
244    ready_transactions: ReadyTransactions<T>,
245    pending_transactions: PendingTransactions<T>,
246}
247
248impl<T> Default for PoolInner<T> {
249    fn default() -> Self {
250        Self { ready_transactions: Default::default(), pending_transactions: Default::default() }
251    }
252}
253
254// == impl PoolInner ==
255
256impl<T> PoolInner<T> {
257    /// Returns an iterator over transactions that are ready.
258    fn ready_transactions(&self) -> TransactionsIterator<T> {
259        self.ready_transactions.get_transactions()
260    }
261
262    /// Clears
263    fn clear(&mut self) {
264        self.ready_transactions.clear();
265        self.pending_transactions.clear();
266    }
267
268    /// Returns an iterator over all transactions in the pool filtered by the sender
269    pub fn transactions_by_sender(
270        &self,
271        sender: Address,
272    ) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
273        let pending_txs = self
274            .pending_transactions
275            .transactions()
276            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
277
278        let ready_txs = self
279            .ready_transactions
280            .get_transactions()
281            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
282
283        pending_txs.chain(ready_txs)
284    }
285
286    /// Returns true if this pool already contains the transaction
287    fn contains(&self, tx_hash: &TxHash) -> bool {
288        self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
289    }
290
291    /// Remove the given transactions from the pool
292    fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
293        // early exit in case there is no invalid transactions.
294        if tx_hashes.is_empty() {
295            return vec![];
296        }
297        trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
298
299        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
300        removed.extend(self.pending_transactions.remove(tx_hashes));
301
302        trace!(target: "txpool", "Removed invalid transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
303
304        removed
305    }
306
307    /// Remove transactions by sender address
308    fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
309        let tx_hashes =
310            self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
311
312        if tx_hashes.is_empty() {
313            return vec![];
314        }
315
316        trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
317
318        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
319        removed.extend(self.pending_transactions.remove(tx_hashes));
320
321        trace!(target: "txpool", "Removed transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
322
323        removed
324    }
325}
326
327impl<T: Clone> PoolInner<T> {
328    /// checks both pools for the matching transaction
329    ///
330    /// Returns `None` if the transaction does not exist in the pool
331    fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
332        if let Some(pending) = self.pending_transactions.get(&hash) {
333            return Some(pending.transaction.pending_transaction.clone());
334        }
335        Some(
336            self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
337        )
338    }
339}
340
341impl<T: Transaction> PoolInner<T> {
342    fn add_transaction(
343        &mut self,
344        tx: PoolTransaction<T>,
345    ) -> Result<AddedTransaction<T>, PoolError> {
346        if self.contains(&tx.hash()) {
347            debug!(target: "txpool", "[{:?}] Already imported", tx.hash());
348            return Err(PoolError::AlreadyImported(tx.hash()));
349        }
350
351        let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
352        trace!(target: "txpool", "[{:?}] ready={}", tx.transaction.hash(), tx.is_ready());
353
354        // If all markers are not satisfied import to future
355        if !tx.is_ready() {
356            let hash = tx.transaction.hash();
357            self.pending_transactions.add_transaction(tx)?;
358            return Ok(AddedTransaction::Pending { hash });
359        }
360        self.add_ready_transaction(tx)
361    }
362
363    /// Adds the transaction to the ready queue
364    fn add_ready_transaction(
365        &mut self,
366        tx: PendingPoolTransaction<T>,
367    ) -> Result<AddedTransaction<T>, PoolError> {
368        let hash = tx.transaction.hash();
369        trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
370        let mut ready = ReadyTransaction::new(hash);
371
372        let mut tx_queue = VecDeque::from([tx]);
373        // tracks whether we're processing the given `tx`
374        let mut is_new_tx = true;
375
376        // take first transaction from the list
377        while let Some(current_tx) = tx_queue.pop_front() {
378            // also add the transaction that the current transaction unlocks
379            tx_queue.extend(
380                self.pending_transactions.mark_and_unlock(&current_tx.transaction.provides),
381            );
382
383            let current_hash = current_tx.transaction.hash();
384            // try to add the transaction to the ready pool
385            match self.ready_transactions.add_transaction(current_tx) {
386                Ok(replaced_transactions) => {
387                    if !is_new_tx {
388                        ready.promoted.push(current_hash);
389                    }
390                    // tx removed from ready pool
391                    ready.removed.extend(replaced_transactions);
392                }
393                Err(err) => {
394                    // failed to add transaction
395                    if is_new_tx {
396                        debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
397        err);
398                        return Err(err);
399                    }
400                    ready.discarded.push(current_hash);
401                }
402            }
403            is_new_tx = false;
404        }
405
406        // check for a cycle where importing a transaction resulted in pending transactions to be
407        // added while removing current transaction. in which case we move this transaction back to
408        // the pending queue
409        if ready.removed.iter().any(|tx| *tx.hash() == hash) {
410            self.ready_transactions.clear_transactions(&ready.promoted);
411            return Err(PoolError::CyclicTransaction);
412        }
413
414        Ok(AddedTransaction::Ready(ready))
415    }
416
417    /// Prunes the transactions that provide the given markers
418    ///
419    /// This will effectively remove those transactions that satisfy the markers and transactions
420    /// from the pending queue might get promoted to if the markers unlock them.
421    pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult<T> {
422        let mut imports = vec![];
423        let mut pruned = vec![];
424
425        for marker in markers {
426            // mark as satisfied and store the transactions that got unlocked
427            imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
428            // prune transactions
429            pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
430        }
431
432        let mut promoted = vec![];
433        let mut failed = vec![];
434        for tx in imports {
435            let hash = tx.transaction.hash();
436            match self.add_ready_transaction(tx) {
437                Ok(res) => promoted.push(res),
438                Err(e) => {
439                    warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
440                    failed.push(hash)
441                }
442            }
443        }
444
445        PruneResult { pruned, failed, promoted }
446    }
447}
448
449/// Represents the outcome of a prune
450pub struct PruneResult<T> {
451    /// a list of added transactions that a pruned marker satisfied
452    pub promoted: Vec<AddedTransaction<T>>,
453    /// all transactions that  failed to be promoted and now are discarded
454    pub failed: Vec<TxHash>,
455    /// all transactions that were pruned from the ready pool
456    pub pruned: Vec<Arc<PoolTransaction<T>>>,
457}
458
459impl<T> fmt::Debug for PruneResult<T> {
460    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
461        write!(fmt, "PruneResult {{ ")?;
462        write!(
463            fmt,
464            "promoted: {:?}, ",
465            self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
466        )?;
467        write!(fmt, "failed: {:?}, ", self.failed)?;
468        write!(
469            fmt,
470            "pruned: {:?}, ",
471            self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
472        )?;
473        write!(fmt, "}}")?;
474        Ok(())
475    }
476}
477
478#[derive(Clone, Debug)]
479pub struct ReadyTransaction<T> {
480    /// the hash of the submitted transaction
481    hash: TxHash,
482    /// transactions promoted to the ready queue
483    promoted: Vec<TxHash>,
484    /// transaction that failed and became discarded
485    discarded: Vec<TxHash>,
486    /// Transactions removed from the Ready pool
487    removed: Vec<Arc<PoolTransaction<T>>>,
488}
489
490impl<T> ReadyTransaction<T> {
491    pub fn new(hash: TxHash) -> Self {
492        Self {
493            hash,
494            promoted: Default::default(),
495            discarded: Default::default(),
496            removed: Default::default(),
497        }
498    }
499}
500
501#[derive(Clone, Debug)]
502pub enum AddedTransaction<T> {
503    /// transaction was successfully added and being processed
504    Ready(ReadyTransaction<T>),
505    /// Transaction was successfully added but not yet queued for processing
506    Pending {
507        /// the hash of the submitted transaction
508        hash: TxHash,
509    },
510}
511
512impl<T> AddedTransaction<T> {
513    pub fn hash(&self) -> &TxHash {
514        match self {
515            Self::Ready(tx) => &tx.hash,
516            Self::Pending { hash } => hash,
517        }
518    }
519}