Skip to main content

anvil/eth/pool/
mod.rs

1//! # Transaction Pool implementation
2//!
3//! The transaction pool is responsible for managing a set of transactions that can be included in
4//! upcoming blocks.
5//!
6//! The main task of the pool is to prepare an ordered list of transactions that are ready to be
7//! included in a new block.
8//!
9//! Each imported block can affect the validity of transactions already in the pool.
10//! The miner expects the most up-to-date transactions when attempting to create a new block.
11//! After being included in a block, a transaction should be removed from the pool, this process is
12//! called _pruning_ and due to separation of concerns is triggered externally.
13//! The pool essentially performs following services:
14//!   * import transactions
15//!   * order transactions
16//!   * provide ordered set of transactions that are ready for inclusion
17//!   * prune transactions
18//!
19//! Each transaction in the pool contains markers that it _provides_ or _requires_. This property is
20//! used to determine whether it can be included in a block (transaction is ready) or whether it
21//! still _requires_ other transactions to be mined first (transaction is pending).
22//! A transaction is associated with the nonce of the account it's sent from. A unique identifying
23//! marker for a transaction is therefore the pair `(nonce + account)`. An incoming transaction with
24//! a `nonce > nonce on chain` will _require_ `(nonce -1, account)` first, before it is ready to be
25//! included in a block.
26//!
27//! This implementation is adapted from <https://github.com/paritytech/substrate/tree/master/client/transaction-pool>
28
29use crate::{
30    eth::{
31        error::PoolError,
32        pool::transactions::{
33            PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34            TransactionsIterator, TxMarker,
35        },
36    },
37    mem::storage::MinedBlockOutcome,
38};
39use alloy_primitives::{Address, TxHash};
40use alloy_rpc_types::txpool::TxpoolStatus;
41use anvil_core::eth::transaction::PendingTransaction;
42use futures::channel::mpsc::{Receiver, Sender, channel};
43use parking_lot::{Mutex, RwLock};
44use std::{collections::VecDeque, fmt, sync::Arc};
45
46pub mod transactions;
47
48/// Transaction pool that performs validation.
49#[derive(Default)]
50pub struct Pool {
51    /// processes all pending transactions
52    inner: RwLock<PoolInner>,
53    /// listeners for new ready transactions
54    transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57// == impl Pool ==
58
59impl Pool {
60    /// Returns an iterator that yields all transactions that are currently ready
61    pub fn ready_transactions(&self) -> TransactionsIterator {
62        self.inner.read().ready_transactions()
63    }
64
65    /// Returns all transactions that are not ready to be included in a block yet
66    pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction>> {
67        self.inner.read().pending_transactions.transactions().collect()
68    }
69
70    /// Returns the _pending_ transaction for that `hash` if it exists in the mempool
71    pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
72        self.inner.read().get_transaction(hash)
73    }
74
75    /// Returns the number of tx that are ready and queued for further execution
76    pub fn txpool_status(&self) -> TxpoolStatus {
77        // Note: naming differs here compared to geth's `TxpoolStatus`
78        let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
79        let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
80        TxpoolStatus { pending, queued }
81    }
82
83    /// Invoked when a set of transactions ([Self::ready_transactions()]) was executed.
84    ///
85    /// This will remove the transactions from the pool.
86    pub fn on_mined_block(&self, outcome: MinedBlockOutcome) -> PruneResult {
87        let MinedBlockOutcome { block_number, included, invalid } = outcome;
88
89        // remove invalid transactions from the pool
90        self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
91
92        // prune all the markers the mined transactions provide
93        let res = self
94            .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
95        trace!(target: "txpool", "pruned transaction markers {:?}", res);
96        res
97    }
98
99    /// Removes ready transactions for the given iterator of identifying markers.
100    ///
101    /// For each marker we can remove transactions in the pool that either provide the marker
102    /// directly or are a dependency of the transaction associated with that marker.
103    pub fn prune_markers(
104        &self,
105        block_number: u64,
106        markers: impl IntoIterator<Item = TxMarker>,
107    ) -> PruneResult {
108        debug!(target: "txpool", ?block_number, "pruning transactions");
109        let res = self.inner.write().prune_markers(markers);
110        for tx in &res.promoted {
111            self.notify_ready(tx);
112        }
113        res
114    }
115
116    /// Adds a new transaction to the pool
117    pub fn add_transaction(&self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
118        let added = self.inner.write().add_transaction(tx)?;
119        self.notify_ready(&added);
120        Ok(added)
121    }
122
123    /// Adds a new transaction listener to the pool that gets notified about every new ready
124    /// transaction
125    pub fn add_ready_listener(&self) -> Receiver<TxHash> {
126        const TX_LISTENER_BUFFER_SIZE: usize = 2048;
127        let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
128        self.transaction_listener.lock().push(tx);
129        rx
130    }
131
132    /// Returns true if this pool already contains the transaction
133    pub fn contains(&self, tx_hash: &TxHash) -> bool {
134        self.inner.read().contains(tx_hash)
135    }
136
137    /// Remove the given transactions from the pool
138    pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
139        self.inner.write().remove_invalid(tx_hashes)
140    }
141
142    /// Remove transactions by sender
143    pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction>> {
144        self.inner.write().remove_transactions_by_address(sender)
145    }
146
147    /// Removes a single transaction from the pool
148    ///
149    /// This is similar to `[Pool::remove_invalid()]` but for a single transaction.
150    ///
151    /// **Note**: this will also drop any transaction that depend on the `tx`
152    pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction>> {
153        trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
154        let removed = {
155            let mut pool = self.inner.write();
156            pool.ready_transactions.remove_with_markers(vec![tx], None)
157        };
158        trace!(target: "txpool", "Dropped transactions: {:?}", removed);
159
160        let mut dropped = None;
161        if !removed.is_empty() {
162            dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
163        }
164        dropped
165    }
166
167    /// Removes all transactions from the pool
168    pub fn clear(&self) {
169        let mut pool = self.inner.write();
170        pool.clear();
171    }
172
173    /// Notifies listeners if the transaction was added to the ready queue.
174    fn notify_ready(&self, tx: &AddedTransaction) {
175        if let AddedTransaction::Ready(ready) = tx {
176            self.notify_listener(ready.hash);
177            for promoted in ready.promoted.iter().copied() {
178                self.notify_listener(promoted);
179            }
180        }
181    }
182
183    /// notifies all listeners about the transaction
184    fn notify_listener(&self, hash: TxHash) {
185        let mut listener = self.transaction_listener.lock();
186        // this is basically a retain but with mut reference
187        for n in (0..listener.len()).rev() {
188            let mut listener_tx = listener.swap_remove(n);
189            let retain = match listener_tx.try_send(hash) {
190                Ok(()) => true,
191                Err(e) => {
192                    if e.is_full() {
193                        warn!(
194                            target: "txpool",
195                            "[{:?}] Failed to send tx notification because channel is full",
196                            hash,
197                        );
198                        true
199                    } else {
200                        false
201                    }
202                }
203            };
204            if retain {
205                listener.push(listener_tx)
206            }
207        }
208    }
209}
210
211/// A Transaction Pool
212///
213/// Contains all transactions that are ready to be executed
214#[derive(Debug, Default)]
215struct PoolInner {
216    ready_transactions: ReadyTransactions,
217    pending_transactions: PendingTransactions,
218}
219
220// == impl PoolInner ==
221
222impl PoolInner {
223    /// Returns an iterator over transactions that are ready.
224    fn ready_transactions(&self) -> TransactionsIterator {
225        self.ready_transactions.get_transactions()
226    }
227
228    /// Clears
229    fn clear(&mut self) {
230        self.ready_transactions.clear();
231        self.pending_transactions.clear();
232    }
233
234    /// checks both pools for the matching transaction
235    ///
236    /// Returns `None` if the transaction does not exist in the pool
237    fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
238        if let Some(pending) = self.pending_transactions.get(&hash) {
239            return Some(pending.transaction.pending_transaction.clone());
240        }
241        Some(
242            self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
243        )
244    }
245
246    /// Returns an iterator over all transactions in the pool filtered by the sender
247    pub fn transactions_by_sender(
248        &self,
249        sender: Address,
250    ) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
251        let pending_txs = self
252            .pending_transactions
253            .transactions()
254            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
255
256        let ready_txs = self
257            .ready_transactions
258            .get_transactions()
259            .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
260
261        pending_txs.chain(ready_txs)
262    }
263
264    /// Returns true if this pool already contains the transaction
265    fn contains(&self, tx_hash: &TxHash) -> bool {
266        self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
267    }
268
269    fn add_transaction(&mut self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
270        if self.contains(&tx.hash()) {
271            warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
272            return Err(PoolError::AlreadyImported(Box::new(tx)));
273        }
274
275        let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
276        trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
277
278        // If all markers are not satisfied import to future
279        if !tx.is_ready() {
280            let hash = tx.transaction.hash();
281            self.pending_transactions.add_transaction(tx)?;
282            return Ok(AddedTransaction::Pending { hash });
283        }
284        self.add_ready_transaction(tx)
285    }
286
287    /// Adds the transaction to the ready queue
288    fn add_ready_transaction(
289        &mut self,
290        tx: PendingPoolTransaction,
291    ) -> Result<AddedTransaction, PoolError> {
292        let hash = tx.transaction.hash();
293        trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
294        let mut ready = ReadyTransaction::new(hash);
295
296        let mut tx_queue = VecDeque::from([tx]);
297        // tracks whether we're processing the given `tx`
298        let mut is_new_tx = true;
299
300        // take first transaction from the list
301        while let Some(current_tx) = tx_queue.pop_front() {
302            // also add the transaction that the current transaction unlocks
303            tx_queue.extend(
304                self.pending_transactions.mark_and_unlock(&current_tx.transaction.provides),
305            );
306
307            let current_hash = current_tx.transaction.hash();
308            // try to add the transaction to the ready pool
309            match self.ready_transactions.add_transaction(current_tx) {
310                Ok(replaced_transactions) => {
311                    if !is_new_tx {
312                        ready.promoted.push(current_hash);
313                    }
314                    // tx removed from ready pool
315                    ready.removed.extend(replaced_transactions);
316                }
317                Err(err) => {
318                    // failed to add transaction
319                    if is_new_tx {
320                        debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
321        err);
322                        return Err(err);
323                    } else {
324                        ready.discarded.push(current_hash);
325                    }
326                }
327            }
328            is_new_tx = false;
329        }
330
331        // check for a cycle where importing a transaction resulted in pending transactions to be
332        // added while removing current transaction. in which case we move this transaction back to
333        // the pending queue
334        if ready.removed.iter().any(|tx| *tx.hash() == hash) {
335            self.ready_transactions.clear_transactions(&ready.promoted);
336            return Err(PoolError::CyclicTransaction);
337        }
338
339        Ok(AddedTransaction::Ready(ready))
340    }
341
342    /// Prunes the transactions that provide the given markers
343    ///
344    /// This will effectively remove those transactions that satisfy the markers and transactions
345    /// from the pending queue might get promoted to if the markers unlock them.
346    pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult {
347        let mut imports = vec![];
348        let mut pruned = vec![];
349
350        for marker in markers {
351            // mark as satisfied and store the transactions that got unlocked
352            imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
353            // prune transactions
354            pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
355        }
356
357        let mut promoted = vec![];
358        let mut failed = vec![];
359        for tx in imports {
360            let hash = tx.transaction.hash();
361            match self.add_ready_transaction(tx) {
362                Ok(res) => promoted.push(res),
363                Err(e) => {
364                    warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
365                    failed.push(hash)
366                }
367            }
368        }
369
370        PruneResult { pruned, failed, promoted }
371    }
372
373    /// Remove the given transactions from the pool
374    pub fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
375        // early exit in case there is no invalid transactions.
376        if tx_hashes.is_empty() {
377            return vec![];
378        }
379        trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
380
381        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
382        removed.extend(self.pending_transactions.remove(tx_hashes));
383
384        trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
385
386        removed
387    }
388
389    /// Remove transactions by sender address
390    pub fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction>> {
391        let tx_hashes =
392            self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
393
394        if tx_hashes.is_empty() {
395            return vec![];
396        }
397
398        trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
399
400        let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
401        removed.extend(self.pending_transactions.remove(tx_hashes));
402
403        trace!(target: "txpool", "Removed transactions: {:?}", removed);
404
405        removed
406    }
407}
408
409/// Represents the outcome of a prune
410pub struct PruneResult {
411    /// a list of added transactions that a pruned marker satisfied
412    pub promoted: Vec<AddedTransaction>,
413    /// all transactions that  failed to be promoted and now are discarded
414    pub failed: Vec<TxHash>,
415    /// all transactions that were pruned from the ready pool
416    pub pruned: Vec<Arc<PoolTransaction>>,
417}
418
419impl fmt::Debug for PruneResult {
420    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
421        write!(fmt, "PruneResult {{ ")?;
422        write!(
423            fmt,
424            "promoted: {:?}, ",
425            self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
426        )?;
427        write!(fmt, "failed: {:?}, ", self.failed)?;
428        write!(
429            fmt,
430            "pruned: {:?}, ",
431            self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
432        )?;
433        write!(fmt, "}}")?;
434        Ok(())
435    }
436}
437
438#[derive(Clone, Debug)]
439pub struct ReadyTransaction {
440    /// the hash of the submitted transaction
441    hash: TxHash,
442    /// transactions promoted to the ready queue
443    promoted: Vec<TxHash>,
444    /// transaction that failed and became discarded
445    discarded: Vec<TxHash>,
446    /// Transactions removed from the Ready pool
447    removed: Vec<Arc<PoolTransaction>>,
448}
449
450impl ReadyTransaction {
451    pub fn new(hash: TxHash) -> Self {
452        Self {
453            hash,
454            promoted: Default::default(),
455            discarded: Default::default(),
456            removed: Default::default(),
457        }
458    }
459}
460
461#[derive(Clone, Debug)]
462pub enum AddedTransaction {
463    /// transaction was successfully added and being processed
464    Ready(ReadyTransaction),
465    /// Transaction was successfully added but not yet queued for processing
466    Pending {
467        /// the hash of the submitted transaction
468        hash: TxHash,
469    },
470}
471
472impl AddedTransaction {
473    pub fn hash(&self) -> &TxHash {
474        match self {
475            Self::Ready(tx) => &tx.hash,
476            Self::Pending { hash } => hash,
477        }
478    }
479}