anvil/eth/pool/
mod.rs

1//! # Transaction Pool implementation
2//!
3//! The transaction pool is responsible for managing a set of transactions that can be included in
4//! upcoming blocks.
5//!
6//! The main task of the pool is to prepare an ordered list of transactions that are ready to be
7//! included in a new block.
8//!
9//! Each imported block can affect the validity of transactions already in the pool.
10//! The miner expects the most up-to-date transactions when attempting to create a new block.
11//! After being included in a block, a transaction should be removed from the pool, this process is
12//! called _pruning_ and due to separation of concerns is triggered externally.
13//! The pool essentially performs following services:
14//!   * import transactions
15//!   * order transactions
16//!   * provide ordered set of transactions that are ready for inclusion
17//!   * prune transactions
18//!
19//! Each transaction in the pool contains markers that it _provides_ or _requires_. This property is
20//! used to determine whether it can be included in a block (transaction is ready) or whether it
21//! still _requires_ other transactions to be mined first (transaction is pending).
22//! A transaction is associated with the nonce of the account it's sent from. A unique identifying
23//! marker for a transaction is therefore the pair `(nonce + account)`. An incoming transaction with
24//! a `nonce > nonce on chain` will _require_ `(nonce -1, account)` first, before it is ready to be
25//! included in a block.
26//!
27//! This implementation is adapted from <https://github.com/paritytech/substrate/tree/master/client/transaction-pool>
28
29use crate::{
30    eth::{
31        error::PoolError,
32        pool::transactions::{
33            PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34            TransactionsIterator, TxMarker,
35        },
36    },
37    mem::storage::MinedBlockOutcome,
38};
39use alloy_primitives::{Address, TxHash, U64};
40use alloy_rpc_types::txpool::TxpoolStatus;
41use anvil_core::eth::transaction::PendingTransaction;
42use futures::channel::mpsc::{channel, Receiver, Sender};
43use parking_lot::{Mutex, RwLock};
44use std::{collections::VecDeque, fmt, sync::Arc};
45
46pub mod transactions;
47
48/// Transaction pool that performs validation.
49#[derive(Default)]
50pub struct Pool {
51    /// processes all pending transactions
52    inner: RwLock<PoolInner>,
53    /// listeners for new ready transactions
54    transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57// == impl Pool ==
58
59impl Pool {
60    /// Returns an iterator that yields all transactions that are currently ready
61    pub fn ready_transactions(&self) -> TransactionsIterator {
62        self.inner.read().ready_transactions()
63    }
64
65    /// Returns all transactions that are not ready to be included in a block yet
66    pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction>> {
67        self.inner.read().pending_transactions.transactions().collect()
68    }
69
70    /// Returns the _pending_ transaction for that `hash` if it exists in the mempool
71    pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
72        self.inner.read().get_transaction(hash)
73    }
74
75    /// Returns the number of tx that are ready and queued for further execution
76    pub fn txpool_status(&self) -> TxpoolStatus {
77        // Note: naming differs here compared to geth's `TxpoolStatus`
78        let pending: u64 = self.ready_transactions().count().try_into().unwrap_or(0);
79        let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
80        TxpoolStatus { pending, queued }
81    }
82
83    /// Invoked when a set of transactions ([Self::ready_transactions()]) was executed.
84    ///
85    /// This will remove the transactions from the pool.
86    pub fn on_mined_block(&self, outcome: MinedBlockOutcome) -> PruneResult {
87        let MinedBlockOutcome { block_number, included, invalid } = outcome;
88
89        // remove invalid transactions from the pool
90        self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
91
92        // prune all the markers the mined transactions provide
93        let res = self
94            .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
95        trace!(target: "txpool", "pruned transaction markers {:?}", res);
96        res
97    }
98
99    /// Removes ready transactions for the given iterator of identifying markers.
100    ///
101    /// For each marker we can remove transactions in the pool that either provide the marker
102    /// directly or are a dependency of the transaction associated with that marker.
103    pub fn prune_markers(
104        &self,
105        block_number: U64,
106        markers: impl IntoIterator<Item = TxMarker>,
107    ) -> PruneResult {
108        debug!(target: "txpool", ?block_number, "pruning transactions");
109        self.inner.write().prune_markers(markers)
110    }
111
112    /// Adds a new transaction to the pool
113    pub fn add_transaction(&self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
114        let added = self.inner.write().add_transaction(tx)?;
115        if let AddedTransaction::Ready(ref ready) = added {
116            self.notify_listener(ready.hash);
117            // also notify promoted transactions
118            for promoted in ready.promoted.iter().copied() {
119                self.notify_listener(promoted);
120            }
121        }
122        Ok(added)
123    }
124
125    /// Adds a new transaction listener to the pool that gets notified about every new ready
126    /// transaction
127    pub fn add_ready_listener(&self) -> Receiver<TxHash> {
128        const TX_LISTENER_BUFFER_SIZE: usize = 2048;
129        let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
130        self.transaction_listener.lock().push(tx);
131        rx
132    }
133
134    /// Returns true if this pool already contains the transaction
135    pub fn contains(&self, tx_hash: &TxHash) -> bool {
136        self.inner.read().contains(tx_hash)
137    }
138
139    /// Remove the given transactions from the pool
140    pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
141        self.inner.write().remove_invalid(tx_hashes)
142    }
143
144    /// Remove transactions by sender
145    pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction>> {
146        self.inner.write().remove_transactions_by_address(sender)
147    }
148
149    /// Removes a single transaction from the pool
150    ///
151    /// This is similar to `[Pool::remove_invalid()]` but for a single transaction.
152    ///
153    /// **Note**: this will also drop any transaction that depend on the `tx`
154    pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction>> {
155        trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
156        let removed = {
157            let mut pool = self.inner.write();
158            pool.ready_transactions.remove_with_markers(vec![tx], None)
159        };
160        trace!(target: "txpool", "Dropped transactions: {:?}", removed);
161
162        let mut dropped = None;
163        if !removed.is_empty() {
164            dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
165        }
166        dropped
167    }
168
169    /// Removes all transactions from the pool
170    pub fn clear(&self) {
171        let mut pool = self.inner.write();
172        pool.clear();
173    }
174
175    /// notifies all listeners about the transaction
176    fn notify_listener(&self, hash: TxHash) {
177        let mut listener = self.transaction_listener.lock();
178        // this is basically a retain but with mut reference
179        for n in (0..listener.len()).rev() {
180            let mut listener_tx = listener.swap_remove(n);
181            let retain = match listener_tx.try_send(hash) {
182                Ok(()) => true,
183                Err(e) => {
184                    if e.is_full() {
185                        warn!(
186                            target: "txpool",
187                            "[{:?}] Failed to send tx notification because channel is full",
188                            hash,
189                        );
190                        true
191                    } else {
192                        false
193                    }
194                }
195            };
196            if retain {
197                listener.push(listener_tx)
198            }
199        }
200    }
201}
202
203/// A Transaction Pool
204///
205/// Contains all transactions that are ready to be executed
206#[derive(Debug, Default)]
207struct PoolInner {
208    ready_transactions: ReadyTransactions,
209    pending_transactions: PendingTransactions,
210}
211
212// == impl PoolInner ==
213
214impl PoolInner {
215    /// Returns an iterator over transactions that are ready.
216    fn ready_transactions(&self) -> TransactionsIterator {
217        self.ready_transactions.get_transactions()
218    }
219
220    /// Clears
221    fn clear(&mut self) {
222        self.ready_transactions.clear();
223        self.pending_transactions.clear();
224    }
225
226    /// checks both pools for the matching transaction
227    ///
228    /// Returns `None` if the transaction does not exist in the pool
229    fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
230        if let Some(pending) = self.pending_transactions.get(&hash) {
231            return Some(pending.transaction.pending_transaction.clone())
232        }
233        Some(
234            self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
235        )
236    }
237
238    /// Returns an iterator over all transactions in the pool filtered by the sender
239    pub fn transactions_by_sender(
240        &self,
241        sender: Address,
242    ) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
243        let pending_txs = self
244            .pending_transactions
245            .transactions()
246            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
247
248        let ready_txs = self
249            .ready_transactions
250            .get_transactions()
251            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
252
253        pending_txs.chain(ready_txs)
254    }
255
256    /// Returns true if this pool already contains the transaction
257    fn contains(&self, tx_hash: &TxHash) -> bool {
258        self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
259    }
260
261    fn add_transaction(&mut self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
262        if self.contains(&tx.hash()) {
263            warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
264            return Err(PoolError::AlreadyImported(Box::new(tx)))
265        }
266
267        let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
268        trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
269
270        // If all markers are not satisfied import to future
271        if !tx.is_ready() {
272            let hash = tx.transaction.hash();
273            self.pending_transactions.add_transaction(tx)?;
274            return Ok(AddedTransaction::Pending { hash })
275        }
276        self.add_ready_transaction(tx)
277    }
278
279    /// Adds the transaction to the ready queue
280    fn add_ready_transaction(
281        &mut self,
282        tx: PendingPoolTransaction,
283    ) -> Result<AddedTransaction, PoolError> {
284        let hash = tx.transaction.hash();
285        trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
286        let mut ready = ReadyTransaction::new(hash);
287
288        let mut tx_queue = VecDeque::from([tx]);
289        // tracks whether we're processing the given `tx`
290        let mut is_new_tx = true;
291
292        // take first transaction from the list
293        while let Some(current_tx) = tx_queue.pop_front() {
294            // also add the transaction that the current transaction unlocks
295            tx_queue.extend(
296                self.pending_transactions.mark_and_unlock(&current_tx.transaction.provides),
297            );
298
299            let current_hash = current_tx.transaction.hash();
300            // try to add the transaction to the ready pool
301            match self.ready_transactions.add_transaction(current_tx) {
302                Ok(replaced_transactions) => {
303                    if !is_new_tx {
304                        ready.promoted.push(current_hash);
305                    }
306                    // tx removed from ready pool
307                    ready.removed.extend(replaced_transactions);
308                }
309                Err(err) => {
310                    // failed to add transaction
311                    if is_new_tx {
312                        debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
313        err);
314                        return Err(err)
315                    } else {
316                        ready.discarded.push(current_hash);
317                    }
318                }
319            }
320            is_new_tx = false;
321        }
322
323        // check for a cycle where importing a transaction resulted in pending transactions to be
324        // added while removing current transaction. in which case we move this transaction back to
325        // the pending queue
326        if ready.removed.iter().any(|tx| *tx.hash() == hash) {
327            self.ready_transactions.clear_transactions(&ready.promoted);
328            return Err(PoolError::CyclicTransaction)
329        }
330
331        Ok(AddedTransaction::Ready(ready))
332    }
333
334    /// Prunes the transactions that provide the given markers
335    ///
336    /// This will effectively remove those transactions that satisfy the markers and transactions
337    /// from the pending queue might get promoted to if the markers unlock them.
338    pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult {
339        let mut imports = vec![];
340        let mut pruned = vec![];
341
342        for marker in markers {
343            // mark as satisfied and store the transactions that got unlocked
344            imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
345            // prune transactions
346            pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
347        }
348
349        let mut promoted = vec![];
350        let mut failed = vec![];
351        for tx in imports {
352            let hash = tx.transaction.hash();
353            match self.add_ready_transaction(tx) {
354                Ok(res) => promoted.push(res),
355                Err(e) => {
356                    warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
357                    failed.push(hash)
358                }
359            }
360        }
361
362        PruneResult { pruned, failed, promoted }
363    }
364
365    /// Remove the given transactions from the pool
366    pub fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
367        // early exit in case there is no invalid transactions.
368        if tx_hashes.is_empty() {
369            return vec![]
370        }
371        trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
372
373        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
374        removed.extend(self.pending_transactions.remove(tx_hashes));
375
376        trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
377
378        removed
379    }
380
381    /// Remove transactions by sender address
382    pub fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction>> {
383        let tx_hashes =
384            self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
385
386        if tx_hashes.is_empty() {
387            return vec![]
388        }
389
390        trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
391
392        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
393        removed.extend(self.pending_transactions.remove(tx_hashes));
394
395        trace!(target: "txpool", "Removed transactions: {:?}", removed);
396
397        removed
398    }
399}
400
401/// Represents the outcome of a prune
402pub struct PruneResult {
403    /// a list of added transactions that a pruned marker satisfied
404    pub promoted: Vec<AddedTransaction>,
405    /// all transactions that  failed to be promoted and now are discarded
406    pub failed: Vec<TxHash>,
407    /// all transactions that were pruned from the ready pool
408    pub pruned: Vec<Arc<PoolTransaction>>,
409}
410
411impl fmt::Debug for PruneResult {
412    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413        write!(fmt, "PruneResult {{ ")?;
414        write!(
415            fmt,
416            "promoted: {:?}, ",
417            self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
418        )?;
419        write!(fmt, "failed: {:?}, ", self.failed)?;
420        write!(
421            fmt,
422            "pruned: {:?}, ",
423            self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
424        )?;
425        write!(fmt, "}}")?;
426        Ok(())
427    }
428}
429
430#[derive(Clone, Debug)]
431pub struct ReadyTransaction {
432    /// the hash of the submitted transaction
433    hash: TxHash,
434    /// transactions promoted to the ready queue
435    promoted: Vec<TxHash>,
436    /// transaction that failed and became discarded
437    discarded: Vec<TxHash>,
438    /// Transactions removed from the Ready pool
439    removed: Vec<Arc<PoolTransaction>>,
440}
441
442impl ReadyTransaction {
443    pub fn new(hash: TxHash) -> Self {
444        Self {
445            hash,
446            promoted: Default::default(),
447            discarded: Default::default(),
448            removed: Default::default(),
449        }
450    }
451}
452
453#[derive(Clone, Debug)]
454pub enum AddedTransaction {
455    /// transaction was successfully added and being processed
456    Ready(ReadyTransaction),
457    /// Transaction was successfully added but not yet queued for processing
458    Pending {
459        /// the hash of the submitted transaction
460        hash: TxHash,
461    },
462}
463
464impl AddedTransaction {
465    pub fn hash(&self) -> &TxHash {
466        match self {
467            Self::Ready(tx) => &tx.hash,
468            Self::Pending { hash } => hash,
469        }
470    }
471}