1use crate::{
30 eth::{
31 error::PoolError,
32 pool::transactions::{
33 PendingPoolTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
34 TransactionsIterator, TxMarker,
35 },
36 },
37 mem::storage::MinedBlockOutcome,
38};
39use alloy_primitives::{Address, TxHash};
40use alloy_rpc_types::txpool::TxpoolStatus;
41use anvil_core::eth::transaction::PendingTransaction;
42use futures::channel::mpsc::{Receiver, Sender, channel};
43use parking_lot::{Mutex, RwLock};
44use std::{collections::VecDeque, fmt, sync::Arc};
45
46pub mod transactions;
47
48#[derive(Default)]
50pub struct Pool {
51 inner: RwLock<PoolInner>,
53 transaction_listener: Mutex<Vec<Sender<TxHash>>>,
55}
56
57impl Pool {
60 pub fn ready_transactions(&self) -> TransactionsIterator {
62 self.inner.read().ready_transactions()
63 }
64
65 pub fn pending_transactions(&self) -> Vec<Arc<PoolTransaction>> {
67 self.inner.read().pending_transactions.transactions().collect()
68 }
69
70 pub fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
72 self.inner.read().get_transaction(hash)
73 }
74
75 pub fn txpool_status(&self) -> TxpoolStatus {
77 let pending: u64 = self.inner.read().ready_transactions.len().try_into().unwrap_or(0);
79 let queued: u64 = self.inner.read().pending_transactions.len().try_into().unwrap_or(0);
80 TxpoolStatus { pending, queued }
81 }
82
83 pub fn on_mined_block(&self, outcome: MinedBlockOutcome) -> PruneResult {
87 let MinedBlockOutcome { block_number, included, invalid } = outcome;
88
89 self.remove_invalid(invalid.into_iter().map(|tx| tx.hash()).collect());
91
92 let res = self
94 .prune_markers(block_number, included.into_iter().flat_map(|tx| tx.provides.clone()));
95 trace!(target: "txpool", "pruned transaction markers {:?}", res);
96 res
97 }
98
99 pub fn prune_markers(
104 &self,
105 block_number: u64,
106 markers: impl IntoIterator<Item = TxMarker>,
107 ) -> PruneResult {
108 debug!(target: "txpool", ?block_number, "pruning transactions");
109 let res = self.inner.write().prune_markers(markers);
110 for tx in &res.promoted {
111 self.notify_ready(tx);
112 }
113 res
114 }
115
116 pub fn add_transaction(&self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
118 let added = self.inner.write().add_transaction(tx)?;
119 self.notify_ready(&added);
120 Ok(added)
121 }
122
123 pub fn add_ready_listener(&self) -> Receiver<TxHash> {
126 const TX_LISTENER_BUFFER_SIZE: usize = 2048;
127 let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
128 self.transaction_listener.lock().push(tx);
129 rx
130 }
131
132 pub fn contains(&self, tx_hash: &TxHash) -> bool {
134 self.inner.read().contains(tx_hash)
135 }
136
137 pub fn remove_invalid(&self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
139 self.inner.write().remove_invalid(tx_hashes)
140 }
141
142 pub fn remove_transactions_by_address(&self, sender: Address) -> Vec<Arc<PoolTransaction>> {
144 self.inner.write().remove_transactions_by_address(sender)
145 }
146
147 pub fn drop_transaction(&self, tx: TxHash) -> Option<Arc<PoolTransaction>> {
153 trace!(target: "txpool", "Dropping transaction: [{:?}]", tx);
154 let removed = {
155 let mut pool = self.inner.write();
156 pool.ready_transactions.remove_with_markers(vec![tx], None)
157 };
158 trace!(target: "txpool", "Dropped transactions: {:?}", removed);
159
160 let mut dropped = None;
161 if !removed.is_empty() {
162 dropped = removed.into_iter().find(|t| *t.pending_transaction.hash() == tx);
163 }
164 dropped
165 }
166
167 pub fn clear(&self) {
169 let mut pool = self.inner.write();
170 pool.clear();
171 }
172
173 fn notify_ready(&self, tx: &AddedTransaction) {
175 if let AddedTransaction::Ready(ready) = tx {
176 self.notify_listener(ready.hash);
177 for promoted in ready.promoted.iter().copied() {
178 self.notify_listener(promoted);
179 }
180 }
181 }
182
183 fn notify_listener(&self, hash: TxHash) {
185 let mut listener = self.transaction_listener.lock();
186 for n in (0..listener.len()).rev() {
188 let mut listener_tx = listener.swap_remove(n);
189 let retain = match listener_tx.try_send(hash) {
190 Ok(()) => true,
191 Err(e) => {
192 if e.is_full() {
193 warn!(
194 target: "txpool",
195 "[{:?}] Failed to send tx notification because channel is full",
196 hash,
197 );
198 true
199 } else {
200 false
201 }
202 }
203 };
204 if retain {
205 listener.push(listener_tx)
206 }
207 }
208 }
209}
210
211#[derive(Debug, Default)]
215struct PoolInner {
216 ready_transactions: ReadyTransactions,
217 pending_transactions: PendingTransactions,
218}
219
220impl PoolInner {
223 fn ready_transactions(&self) -> TransactionsIterator {
225 self.ready_transactions.get_transactions()
226 }
227
228 fn clear(&mut self) {
230 self.ready_transactions.clear();
231 self.pending_transactions.clear();
232 }
233
234 fn get_transaction(&self, hash: TxHash) -> Option<PendingTransaction> {
238 if let Some(pending) = self.pending_transactions.get(&hash) {
239 return Some(pending.transaction.pending_transaction.clone());
240 }
241 Some(
242 self.ready_transactions.get(&hash)?.transaction.transaction.pending_transaction.clone(),
243 )
244 }
245
246 pub fn transactions_by_sender(
248 &self,
249 sender: Address,
250 ) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
251 let pending_txs = self
252 .pending_transactions
253 .transactions()
254 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
255
256 let ready_txs = self
257 .ready_transactions
258 .get_transactions()
259 .filter(move |tx| tx.pending_transaction.sender().eq(&sender));
260
261 pending_txs.chain(ready_txs)
262 }
263
264 fn contains(&self, tx_hash: &TxHash) -> bool {
266 self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
267 }
268
269 fn add_transaction(&mut self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
270 if self.contains(&tx.hash()) {
271 warn!(target: "txpool", "[{:?}] Already imported", tx.hash());
272 return Err(PoolError::AlreadyImported(Box::new(tx)));
273 }
274
275 let tx = PendingPoolTransaction::new(tx, self.ready_transactions.provided_markers());
276 trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
277
278 if !tx.is_ready() {
280 let hash = tx.transaction.hash();
281 self.pending_transactions.add_transaction(tx)?;
282 return Ok(AddedTransaction::Pending { hash });
283 }
284 self.add_ready_transaction(tx)
285 }
286
287 fn add_ready_transaction(
289 &mut self,
290 tx: PendingPoolTransaction,
291 ) -> Result<AddedTransaction, PoolError> {
292 let hash = tx.transaction.hash();
293 trace!(target: "txpool", "adding ready transaction [{:?}]", hash);
294 let mut ready = ReadyTransaction::new(hash);
295
296 let mut tx_queue = VecDeque::from([tx]);
297 let mut is_new_tx = true;
299
300 while let Some(current_tx) = tx_queue.pop_front() {
302 tx_queue.extend(
304 self.pending_transactions.mark_and_unlock(¤t_tx.transaction.provides),
305 );
306
307 let current_hash = current_tx.transaction.hash();
308 match self.ready_transactions.add_transaction(current_tx) {
310 Ok(replaced_transactions) => {
311 if !is_new_tx {
312 ready.promoted.push(current_hash);
313 }
314 ready.removed.extend(replaced_transactions);
316 }
317 Err(err) => {
318 if is_new_tx {
320 debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
321 err);
322 return Err(err);
323 } else {
324 ready.discarded.push(current_hash);
325 }
326 }
327 }
328 is_new_tx = false;
329 }
330
331 if ready.removed.iter().any(|tx| *tx.hash() == hash) {
335 self.ready_transactions.clear_transactions(&ready.promoted);
336 return Err(PoolError::CyclicTransaction);
337 }
338
339 Ok(AddedTransaction::Ready(ready))
340 }
341
342 pub fn prune_markers(&mut self, markers: impl IntoIterator<Item = TxMarker>) -> PruneResult {
347 let mut imports = vec![];
348 let mut pruned = vec![];
349
350 for marker in markers {
351 imports.extend(self.pending_transactions.mark_and_unlock(Some(&marker)));
353 pruned.extend(self.ready_transactions.prune_tags(marker.clone()));
355 }
356
357 let mut promoted = vec![];
358 let mut failed = vec![];
359 for tx in imports {
360 let hash = tx.transaction.hash();
361 match self.add_ready_transaction(tx) {
362 Ok(res) => promoted.push(res),
363 Err(e) => {
364 warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
365 failed.push(hash)
366 }
367 }
368 }
369
370 PruneResult { pruned, failed, promoted }
371 }
372
373 pub fn remove_invalid(&mut self, tx_hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
375 if tx_hashes.is_empty() {
377 return vec![];
378 }
379 trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
380
381 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
382 removed.extend(self.pending_transactions.remove(tx_hashes));
383
384 trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
385
386 removed
387 }
388
389 pub fn remove_transactions_by_address(&mut self, sender: Address) -> Vec<Arc<PoolTransaction>> {
391 let tx_hashes =
392 self.transactions_by_sender(sender).map(move |tx| tx.hash()).collect::<Vec<TxHash>>();
393
394 if tx_hashes.is_empty() {
395 return vec![];
396 }
397
398 trace!(target: "txpool", "Removing transactions: {:?}", tx_hashes);
399
400 let mut removed = self.ready_transactions.remove_with_markers(tx_hashes.clone(), None);
401 removed.extend(self.pending_transactions.remove(tx_hashes));
402
403 trace!(target: "txpool", "Removed transactions: {:?}", removed);
404
405 removed
406 }
407}
408
409pub struct PruneResult {
411 pub promoted: Vec<AddedTransaction>,
413 pub failed: Vec<TxHash>,
415 pub pruned: Vec<Arc<PoolTransaction>>,
417}
418
419impl fmt::Debug for PruneResult {
420 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
421 write!(fmt, "PruneResult {{ ")?;
422 write!(
423 fmt,
424 "promoted: {:?}, ",
425 self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
426 )?;
427 write!(fmt, "failed: {:?}, ", self.failed)?;
428 write!(
429 fmt,
430 "pruned: {:?}, ",
431 self.pruned.iter().map(|tx| *tx.pending_transaction.hash()).collect::<Vec<_>>()
432 )?;
433 write!(fmt, "}}")?;
434 Ok(())
435 }
436}
437
438#[derive(Clone, Debug)]
439pub struct ReadyTransaction {
440 hash: TxHash,
442 promoted: Vec<TxHash>,
444 discarded: Vec<TxHash>,
446 removed: Vec<Arc<PoolTransaction>>,
448}
449
450impl ReadyTransaction {
451 pub fn new(hash: TxHash) -> Self {
452 Self {
453 hash,
454 promoted: Default::default(),
455 discarded: Default::default(),
456 removed: Default::default(),
457 }
458 }
459}
460
461#[derive(Clone, Debug)]
462pub enum AddedTransaction {
463 Ready(ReadyTransaction),
465 Pending {
467 hash: TxHash,
469 },
470}
471
472impl AddedTransaction {
473 pub fn hash(&self) -> &TxHash {
474 match self {
475 Self::Ready(tx) => &tx.hash,
476 Self::Pending { hash } => hash,
477 }
478 }
479}