Skip to main content

anvil/eth/pool/
mod.rs

1//! # Transaction Pool implementation
2//!
3//! The transaction pool is responsible for managing a set of transactions that can be included in
4//! upcoming blocks.
5//!
6//! The main task of the pool is to prepare an ordered list of transactions that are ready to be
7//! included in a new block.
8//!
9//! Each imported block can affect the validity of transactions already in the pool.
10//! The miner expects the most up-to-date transactions when attempting to create a new block.
11//! After being included in a block, a transaction should be removed from the pool, this process is
12//! called _pruning_ and due to separation of concerns is triggered externally.
13//! The pool essentially performs following services:
14//!   * import transactions
15//!   * order transactions
16//!   * provide ordered set of transactions that are ready for inclusion
17//!   * prune transactions
18//!
19//! Each transaction in the pool contains markers that it _provides_ or _requires_. This property is
20//! used to determine whether it can be included in a block (transaction is ready) or whether it
21//! still _requires_ other transactions to be mined first (transaction is pending).
22//! A transaction is associated with the nonce of the account it's sent from. A unique identifying
23//! marker for a transaction is therefore the pair `(nonce + account)`. An incoming transaction with
24//! a `nonce > nonce on chain` will _require_ `(nonce -1, account)` first, before it is ready to be
25//! included in a block.
26//!
27//! This implementation is adapted from <https://github.com/paritytech/substrate/tree/master/client/transaction-pool>
28
29use crate::{
30    eth::{
31        error::PoolError,
32        pool::transactions::{
33            PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34            TransactionsIterator, TxMarker,
35        },
36    },
37    mem::storage::MinedBlockOutcome,
38};
39use alloy_consensus::Transaction;
40use alloy_primitives::{Address, TxHash};
41use alloy_rpc_types::txpool::TxpoolStatus;
42use anvil_core::eth::transaction::PendingTransaction;
43use futures::channel::mpsc::{Receiver, Sender, channel};
44use parking_lot::{Mutex, RwLock};
45use std::{collections::VecDeque, fmt, sync::Arc};
46
47pub mod transactions;
48
49/// Transaction pool that performs validation.
50pub struct Pool<T> {
51    /// processes all pending transactions
52    inner: RwLock<PoolInner<T>>,
53    /// listeners for new ready transactions
54    transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl<T> Default for Pool<T> {
58    fn default() -> Self {
59        Self { inner: RwLock::new(PoolInner::default()), transaction_listener: Default::default() }
60    }
61}
62
63// == impl Pool ==
64
65impl<T> Pool<T> {
66    /// Returns an iterator that yields all transactions that are currently ready
67    pub fn ready_transactions(&self) -> TransactionsIterator<T> {
68        self.inner.read().ready_transactions()
69    }
70
71    /// Returns all transactions that are not ready to be included in a block yet
72    pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction<T>>> {
73        self.inner.read().pending_transactions.transactions().collect()
74    }
75
76    /// Returns the number of tx that are ready and queued for further execution
77    pub fn txpool_status(&self) -> TxpoolStatus {
78        // Note: naming differs here compared to geth's `TxpoolStatus`
79        let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
80        let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
81        TxpoolStatus { pending, queued }
82    }
83
84    /// Adds a new transaction listener to the pool that gets notified about every new ready
85    /// transaction
86    pub fn add_ready_listener(&self) -> Receiver<TxHash> {
87        const TX_LISTENER_BUFFER_SIZE: usize = 2048;
88        let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
89        self.transaction_listener.lock().push(tx);
90        rx
91    }
92
93    /// Returns true if this pool already contains the transaction
94    pub fn contains(&self, tx_hash: &TxHash) -> bool {
95        self.inner.read().contains(tx_hash)
96    }
97
98    /// Removes all transactions from the pool
99    pub fn clear(&self) {
100        let mut pool = self.inner.write();
101        pool.clear();
102    }
103
104    /// Remove the given transactions from the pool
105    pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
106        self.inner.write().remove_invalid(tx_hashes)
107    }
108
109    /// Remove transactions by sender
110    pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
111        self.inner.write().remove_transactions_by_address(sender)
112    }
113
114    /// Removes a single transaction from the pool
115    ///
116    /// This is similar to `[Pool::remove_invalid()]` but for a single transaction.
117    ///
118    /// **Note**: this will also drop any transaction that depend on the `tx`
119    pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction<T>>> {
120        trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
121        let removed = {
122            let mut pool = self.inner.write();
123            pool.ready_transactions.remove_with_markers(vec![tx], None)
124        };
125        trace!(target: "txpool", "Dropped transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
126
127        let mut dropped = None;
128        if !removed.is_empty() {
129            dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
130        }
131        dropped
132    }
133
134    /// Notifies listeners if the transaction was added to the ready queue.
135    fn notify_ready(&self, tx: &AddedTransaction<T>) {
136        if let AddedTransaction::Ready(ready) = tx {
137            self.notify_listener(ready.hash);
138            for promoted in ready.promoted.iter().copied() {
139                self.notify_listener(promoted);
140            }
141        }
142    }
143
144    /// notifies all listeners about the transaction
145    fn notify_listener(&self, hash: TxHash) {
146        let mut listener = self.transaction_listener.lock();
147        // this is basically a retain but with mut reference
148        for n in (0..listener.len()).rev() {
149            let mut listener_tx = listener.swap_remove(n);
150            let retain = match listener_tx.try_send(hash) {
151                Ok(()) => true,
152                Err(e) => {
153                    if e.is_full() {
154                        warn!(
155                            target: "txpool",
156                            "[{:?}] Failed to send tx notification because channel is full",
157                            hash,
158                        );
159                        true
160                    } else {
161                        false
162                    }
163                }
164            };
165            if retain {
166                listener.push(listener_tx)
167            }
168        }
169    }
170}
171
172impl<T: Clone> Pool<T> {
173    /// Returns the _pending_ transaction for that `hash` if it exists in the mempool
174    pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
175        self.inner.read().get_transaction(hash)
176    }
177}
178
179impl<T: Transaction> Pool<T> {
180    /// Invoked when a set of transactions ([Self::ready_transactions()]) was executed.
181    ///
182    /// This will remove the transactions from the pool.
183    pub fn on_mined_block(&self, outcome: MinedBlockOutcome<T>) -> PruneResult<T> {
184        let MinedBlockOutcome { block_number, included, invalid } = outcome;
185
186        // remove invalid transactions from the pool
187        self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
188
189        // prune all the markers the mined transactions provide
190        let res = self
191            .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
192        trace!(target: "txpool", "pruned transaction markers {:?}", res);
193        res
194    }
195
196    /// Removes ready transactions for the given iterator of identifying markers.
197    ///
198    /// For each marker we can remove transactions in the pool that either provide the marker
199    /// directly or are a dependency of the transaction associated with that marker.
200    pub fn prune_markers(
201        &self,
202        block_number: u64,
203        markers: impl IntoIterator<Item = TxMarker>,
204    ) -> PruneResult<T> {
205        debug!(target: "txpool", ?block_number, "pruning transactions");
206        let res = self.inner.write().prune_markers(markers);
207        for tx in &res.promoted {
208            self.notify_ready(tx);
209        }
210        res
211    }
212
213    /// Adds a new transaction to the pool
214    pub fn add_transaction(
215        &self,
216        tx: PoolTransaction<T>,
217    ) -> Result<AddedTransaction<T>, PoolError> {
218        let added = self.inner.write().add_transaction(tx)?;
219        self.notify_ready(&added);
220        Ok(added)
221    }
222}
223
224/// A Transaction Pool
225///
226/// Contains all transactions that are ready to be executed
227#[derive(Debug)]
228struct PoolInner<T> {
229    ready_transactions: ReadyTransactions<T>,
230    pending_transactions: PendingTransactions<T>,
231}
232
233impl<T> Default for PoolInner<T> {
234    fn default() -> Self {
235        Self { ready_transactions: Default::default(), pending_transactions: Default::default() }
236    }
237}
238
239// == impl PoolInner ==
240
241impl<T> PoolInner<T> {
242    /// Returns an iterator over transactions that are ready.
243    fn ready_transactions(&self) -> TransactionsIterator<T> {
244        self.ready_transactions.get_transactions()
245    }
246
247    /// Clears
248    fn clear(&mut self) {
249        self.ready_transactions.clear();
250        self.pending_transactions.clear();
251    }
252
253    /// Returns an iterator over all transactions in the pool filtered by the sender
254    pub fn transactions_by_sender(
255        &self,
256        sender: Address,
257    ) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
258        let pending_txs = self
259            .pending_transactions
260            .transactions()
261            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
262
263        let ready_txs = self
264            .ready_transactions
265            .get_transactions()
266            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
267
268        pending_txs.chain(ready_txs)
269    }
270
271    /// Returns true if this pool already contains the transaction
272    fn contains(&self, tx_hash: &TxHash) -> bool {
273        self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
274    }
275
276    /// Remove the given transactions from the pool
277    fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
278        // early exit in case there is no invalid transactions.
279        if tx_hashes.is_empty() {
280            return vec![];
281        }
282        trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
283
284        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
285        removed.extend(self.pending_transactions.remove(tx_hashes));
286
287        trace!(target: "txpool", "Removed invalid transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
288
289        removed
290    }
291
292    /// Remove transactions by sender address
293    fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction<T>>> {
294        let tx_hashes =
295            self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
296
297        if tx_hashes.is_empty() {
298            return vec![];
299        }
300
301        trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
302
303        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
304        removed.extend(self.pending_transactions.remove(tx_hashes));
305
306        trace!(target: "txpool", "Removed transactions: {:?}", removed.iter().map(|tx| tx.hash()).collect::<Vec<_>>());
307
308        removed
309    }
310}
311
312impl<T: Clone> PoolInner<T> {
313    /// checks both pools for the matching transaction
314    ///
315    /// Returns `None` if the transaction does not exist in the pool
316    fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction<T>> {
317        if let Some(pending) = self.pending_transactions.get(&hash) {
318            return Some(pending.transaction.pending_transaction.clone());
319        }
320        Some(
321            self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
322        )
323    }
324}
325
326impl<T: Transaction> PoolInner<T> {
327    fn add_transaction(
328        &mut self,
329        tx: PoolTransaction<T>,
330    ) -> Result<AddedTransaction<T>, PoolError> {
331        if self.contains(&tx.hash()) {
332            warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
333            return Err(PoolError::AlreadyImported(tx.hash()));
334        }
335
336        let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
337        trace!(target: "txpool", "[{:?}] ready={}", tx.transaction.hash(), tx.is_ready());
338
339        // If all markers are not satisfied import to future
340        if !tx.is_ready() {
341            let hash = tx.transaction.hash();
342            self.pending_transactions.add_transaction(tx)?;
343            return Ok(AddedTransaction::Pending { hash });
344        }
345        self.add_ready_transaction(tx)
346    }
347
348    /// Adds the transaction to the ready queue
349    fn add_ready_transaction(
350        &mut self,
351        tx: PendingPoolTransaction<T>,
352    ) -> Result<AddedTransaction<T>, PoolError> {
353        let hash = tx.transaction.hash();
354        trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
355        let mut ready = ReadyTransaction::new(hash);
356
357        let mut tx_queue = VecDeque::from([tx]);
358        // tracks whether we're processing the given `tx`
359        let mut is_new_tx = true;
360
361        // take first transaction from the list
362        while let Some(current_tx) = tx_queue.pop_front() {
363            // also add the transaction that the current transaction unlocks
364            tx_queue.extend(
365                self.pending_transactions.mark_and_unlock(&current_tx.transaction.provides),
366            );
367
368            let current_hash = current_tx.transaction.hash();
369            // try to add the transaction to the ready pool
370            match self.ready_transactions.add_transaction(current_tx) {
371                Ok(replaced_transactions) => {
372                    if !is_new_tx {
373                        ready.promoted.push(current_hash);
374                    }
375                    // tx removed from ready pool
376                    ready.removed.extend(replaced_transactions);
377                }
378                Err(err) => {
379                    // failed to add transaction
380                    if is_new_tx {
381                        debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
382        err);
383                        return Err(err);
384                    } else {
385                        ready.discarded.push(current_hash);
386                    }
387                }
388            }
389            is_new_tx = false;
390        }
391
392        // check for a cycle where importing a transaction resulted in pending transactions to be
393        // added while removing current transaction. in which case we move this transaction back to
394        // the pending queue
395        if ready.removed.iter().any(|tx| *tx.hash() == hash) {
396            self.ready_transactions.clear_transactions(&ready.promoted);
397            return Err(PoolError::CyclicTransaction);
398        }
399
400        Ok(AddedTransaction::Ready(ready))
401    }
402
403    /// Prunes the transactions that provide the given markers
404    ///
405    /// This will effectively remove those transactions that satisfy the markers and transactions
406    /// from the pending queue might get promoted to if the markers unlock them.
407    pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult<T> {
408        let mut imports = vec![];
409        let mut pruned = vec![];
410
411        for marker in markers {
412            // mark as satisfied and store the transactions that got unlocked
413            imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
414            // prune transactions
415            pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
416        }
417
418        let mut promoted = vec![];
419        let mut failed = vec![];
420        for tx in imports {
421            let hash = tx.transaction.hash();
422            match self.add_ready_transaction(tx) {
423                Ok(res) => promoted.push(res),
424                Err(e) => {
425                    warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
426                    failed.push(hash)
427                }
428            }
429        }
430
431        PruneResult { pruned, failed, promoted }
432    }
433}
434
435/// Represents the outcome of a prune
436pub struct PruneResult<T> {
437    /// a list of added transactions that a pruned marker satisfied
438    pub promoted: Vec<AddedTransaction<T>>,
439    /// all transactions that  failed to be promoted and now are discarded
440    pub failed: Vec<TxHash>,
441    /// all transactions that were pruned from the ready pool
442    pub pruned: Vec<Arc<PoolTransaction<T>>>,
443}
444
445impl<T> fmt::Debug for PruneResult<T> {
446    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
447        write!(fmt, "PruneResult {{ ")?;
448        write!(
449            fmt,
450            "promoted: {:?}, ",
451            self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
452        )?;
453        write!(fmt, "failed: {:?}, ", self.failed)?;
454        write!(
455            fmt,
456            "pruned: {:?}, ",
457            self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
458        )?;
459        write!(fmt, "}}")?;
460        Ok(())
461    }
462}
463
464#[derive(Clone, Debug)]
465pub struct ReadyTransaction<T> {
466    /// the hash of the submitted transaction
467    hash: TxHash,
468    /// transactions promoted to the ready queue
469    promoted: Vec<TxHash>,
470    /// transaction that failed and became discarded
471    discarded: Vec<TxHash>,
472    /// Transactions removed from the Ready pool
473    removed: Vec<Arc<PoolTransaction<T>>>,
474}
475
476impl<T> ReadyTransaction<T> {
477    pub fn new(hash: TxHash) -> Self {
478        Self {
479            hash,
480            promoted: Default::default(),
481            discarded: Default::default(),
482            removed: Default::default(),
483        }
484    }
485}
486
487#[derive(Clone, Debug)]
488pub enum AddedTransaction<T> {
489    /// transaction was successfully added and being processed
490    Ready(ReadyTransaction<T>),
491    /// Transaction was successfully added but not yet queued for processing
492    Pending {
493        /// the hash of the submitted transaction
494        hash: TxHash,
495    },
496}
497
498impl<T> AddedTransaction<T> {
499    pub fn hash(&self) -> &TxHash {
500        match self {
501            Self::Ready(tx) => &tx.hash,
502            Self::Pending { hash } => hash,
503        }
504    }
505}