1use crate::{
30 eth::{
31 error::PoolError,
32 pool::transactions::{
33 PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34 TransactionsIterator, TxMarker,
35 },
36 },
37 mem::storage::MinedBlockOutcome,
38};
39use alloy_primitives::{Address, TxHash, U64};
40use alloy_rpc_types::txpool::TxpoolStatus;
41use anvil_core::eth::transaction::PendingTransaction;
42use futures::channel::mpsc::{channel, Receiver, Sender};
43use parking_lot::{Mutex, RwLock};
44use std::{collections::VecDeque, fmt, sync::Arc};
45
46pub mod transactions;
47
48#[derive(Default)]
50pub struct Pool {
51 inner: RwLock<PoolInner>,
53 transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl Pool {
60 pub fn ready_transactions(&self) -> TransactionsIterator {
62 self.inner.read().ready_transactions()
63 }
64
65 pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction>> {
67 self.inner.read().pending_transactions.transactions().collect()
68 }
69
70 pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
72 self.inner.read().get_transaction(hash)
73 }
74
75 pub fn txpool_status(&self) -> TxpoolStatus {
77 let pending: u64 = self.ready_transactions().count().try_into().unwrap_or(0);
79 let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
80 TxpoolStatus { pending, queued }
81 }
82
83 pub fn on_mined_block(&self, outcome: MinedBlockOutcome) -> PruneResult {
87 let MinedBlockOutcome { block_number, included, invalid } = outcome;
88
89 self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
91
92 let res = self
94 .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
95 trace!(target: "txpool", "pruned transaction markers {:?}", res);
96 res
97 }
98
99 pub fn prune_markers(
104 &self,
105 block_number: U64,
106 markers: impl IntoIterator<Item = TxMarker>,
107 ) -> PruneResult {
108 debug!(target: "txpool", ?block_number, "pruning transactions");
109 self.inner.write().prune_markers(markers)
110 }
111
112 pub fn add_transaction(&self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
114 let added = self.inner.write().add_transaction(tx)?;
115 if let AddedTransaction::Ready(ref ready) = added {
116 self.notify_listener(ready.hash);
117 for promoted in ready.promoted.iter().copied() {
119 self.notify_listener(promoted);
120 }
121 }
122 Ok(added)
123 }
124
125 pub fn add_ready_listener(&self) -> Receiver<TxHash> {
128 const TX_LISTENER_BUFFER_SIZE: usize = 2048;
129 let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
130 self.transaction_listener.lock().push(tx);
131 rx
132 }
133
134 pub fn contains(&self, tx_hash: &TxHash) -> bool {
136 self.inner.read().contains(tx_hash)
137 }
138
139 pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
141 self.inner.write().remove_invalid(tx_hashes)
142 }
143
144 pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction>> {
146 self.inner.write().remove_transactions_by_address(sender)
147 }
148
149 pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction>> {
155 trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
156 let removed = {
157 let mut pool = self.inner.write();
158 pool.ready_transactions.remove_with_markers(vec![tx], None)
159 };
160 trace!(target: "txpool", "Dropped transactions: {:?}", removed);
161
162 let mut dropped = None;
163 if !removed.is_empty() {
164 dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
165 }
166 dropped
167 }
168
169 pub fn clear(&self) {
171 let mut pool = self.inner.write();
172 pool.clear();
173 }
174
175 fn notify_listener(&self, hash: TxHash) {
177 let mut listener = self.transaction_listener.lock();
178 for n in (0..listener.len()).rev() {
180 let mut listener_tx = listener.swap_remove(n);
181 let retain = match listener_tx.try_send(hash) {
182 Ok(()) => true,
183 Err(e) => {
184 if e.is_full() {
185 warn!(
186 target: "txpool",
187 "[{:?}] Failed to send tx notification because channel is full",
188 hash,
189 );
190 true
191 } else {
192 false
193 }
194 }
195 };
196 if retain {
197 listener.push(listener_tx)
198 }
199 }
200 }
201}
202
203#[derive(Debug, Default)]
207struct PoolInner {
208 ready_transactions: ReadyTransactions,
209 pending_transactions: PendingTransactions,
210}
211
212impl PoolInner {
215 fn ready_transactions(&self) -> TransactionsIterator {
217 self.ready_transactions.get_transactions()
218 }
219
220 fn clear(&mut self) {
222 self.ready_transactions.clear();
223 self.pending_transactions.clear();
224 }
225
226 fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
230 if let Some(pending) = self.pending_transactions.get(&hash) {
231 return Some(pending.transaction.pending_transaction.clone())
232 }
233 Some(
234 self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
235 )
236 }
237
238 pub fn transactions_by_sender(
240 &self,
241 sender: Address,
242 ) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
243 let pending_txs = self
244 .pending_transactions
245 .transactions()
246 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
247
248 let ready_txs = self
249 .ready_transactions
250 .get_transactions()
251 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
252
253 pending_txs.chain(ready_txs)
254 }
255
256 fn contains(&self, tx_hash: &TxHash) -> bool {
258 self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
259 }
260
261 fn add_transaction(&mut self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
262 if self.contains(&tx.hash()) {
263 warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
264 return Err(PoolError::AlreadyImported(Box::new(tx)))
265 }
266
267 let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
268 trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
269
270 if !tx.is_ready() {
272 let hash = tx.transaction.hash();
273 self.pending_transactions.add_transaction(tx)?;
274 return Ok(AddedTransaction::Pending { hash })
275 }
276 self.add_ready_transaction(tx)
277 }
278
279 fn add_ready_transaction(
281 &mut self,
282 tx: PendingPoolTransaction,
283 ) -> Result<AddedTransaction, PoolError> {
284 let hash = tx.transaction.hash();
285 trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
286 let mut ready = ReadyTransaction::new(hash);
287
288 let mut tx_queue = VecDeque::from([tx]);
289 let mut is_new_tx = true;
291
292 while let Some(current_tx) = tx_queue.pop_front() {
294 tx_queue.extend(
296 self.pending_transactions.mark_and_unlock(¤t_tx.transaction.provides),
297 );
298
299 let current_hash = current_tx.transaction.hash();
300 match self.ready_transactions.add_transaction(current_tx) {
302 Ok(replaced_transactions) => {
303 if !is_new_tx {
304 ready.promoted.push(current_hash);
305 }
306 ready.removed.extend(replaced_transactions);
308 }
309 Err(err) => {
310 if is_new_tx {
312 debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
313 err);
314 return Err(err)
315 } else {
316 ready.discarded.push(current_hash);
317 }
318 }
319 }
320 is_new_tx = false;
321 }
322
323 if ready.removed.iter().any(|tx| *tx.hash() == hash) {
327 self.ready_transactions.clear_transactions(&ready.promoted);
328 return Err(PoolError::CyclicTransaction)
329 }
330
331 Ok(AddedTransaction::Ready(ready))
332 }
333
334 pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult {
339 let mut imports = vec![];
340 let mut pruned = vec![];
341
342 for marker in markers {
343 imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
345 pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
347 }
348
349 let mut promoted = vec![];
350 let mut failed = vec![];
351 for tx in imports {
352 let hash = tx.transaction.hash();
353 match self.add_ready_transaction(tx) {
354 Ok(res) => promoted.push(res),
355 Err(e) => {
356 warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
357 failed.push(hash)
358 }
359 }
360 }
361
362 PruneResult { pruned, failed, promoted }
363 }
364
365 pub fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
367 if tx_hashes.is_empty() {
369 return vec![]
370 }
371 trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
372
373 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
374 removed.extend(self.pending_transactions.remove(tx_hashes));
375
376 trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
377
378 removed
379 }
380
381 pub fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction>> {
383 let tx_hashes =
384 self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
385
386 if tx_hashes.is_empty() {
387 return vec![]
388 }
389
390 trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
391
392 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
393 removed.extend(self.pending_transactions.remove(tx_hashes));
394
395 trace!(target: "txpool", "Removed transactions: {:?}", removed);
396
397 removed
398 }
399}
400
401pub struct PruneResult {
403 pub promoted: Vec<AddedTransaction>,
405 pub failed: Vec<TxHash>,
407 pub pruned: Vec<Arc<PoolTransaction>>,
409}
410
411impl fmt::Debug for PruneResult {
412 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413 write!(fmt, "PruneResult {{ ")?;
414 write!(
415 fmt,
416 "promoted: {:?}, ",
417 self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
418 )?;
419 write!(fmt, "failed: {:?}, ", self.failed)?;
420 write!(
421 fmt,
422 "pruned: {:?}, ",
423 self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
424 )?;
425 write!(fmt, "}}")?;
426 Ok(())
427 }
428}
429
430#[derive(Clone, Debug)]
431pub struct ReadyTransaction {
432 hash: TxHash,
434 promoted: Vec<TxHash>,
436 discarded: Vec<TxHash>,
438 removed: Vec<Arc<PoolTransaction>>,
440}
441
442impl ReadyTransaction {
443 pub fn new(hash: TxHash) -> Self {
444 Self {
445 hash,
446 promoted: Default::default(),
447 discarded: Default::default(),
448 removed: Default::default(),
449 }
450 }
451}
452
453#[derive(Clone, Debug)]
454pub enum AddedTransaction {
455 Ready(ReadyTransaction),
457 Pending {
459 hash: TxHash,
461 },
462}
463
464impl AddedTransaction {
465 pub fn hash(&self) -> &TxHash {
466 match self {
467 Self::Ready(tx) => &tx.hash,
468 Self::Pending { hash } => hash,
469 }
470 }
471}