1use crate::eth::{error::PoolError, util::hex_fmt_many};
2use alloy_consensus::{Transaction, Typed2718};
3use alloy_network::AnyRpcTransaction;
4use alloy_primitives::{
5 Address, TxHash,
6 map::{HashMap, HashSet},
7};
8use anvil_core::eth::transaction::PendingTransaction;
9use foundry_primitives::FoundryTxEnvelope;
10use parking_lot::RwLock;
11use std::{cmp::Ordering, collections::BTreeSet, fmt, str::FromStr, sync::Arc, time::Instant};
12
13pub type TxMarker = Vec<u8>;
15
16pub fn to_marker(nonce: u64, from: Address) -> TxMarker {
18 let mut data = [0u8; 28];
19 data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
20 data[8..].copy_from_slice(&from.0[..]);
21 data.to_vec()
22}
23
24#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
28pub enum TransactionOrder {
29 Fifo,
34 #[default]
36 Fees,
37}
38
39impl TransactionOrder {
40 pub fn priority(&self, tx: &FoundryTxEnvelope) -> TransactionPriority {
42 match self {
43 Self::Fifo => TransactionPriority::default(),
44 Self::Fees => TransactionPriority(tx.max_fee_per_gas()),
45 }
46 }
47}
48
49impl FromStr for TransactionOrder {
50 type Err = String;
51
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 let s = s.to_lowercase();
54 let order = match s.as_str() {
55 "fees" => Self::Fees,
56 "fifo" => Self::Fifo,
57 _ => return Err(format!("Unknown TransactionOrder: `{s}`")),
58 };
59 Ok(order)
60 }
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
68pub struct TransactionPriority(pub u128);
69
70#[derive(Clone, PartialEq, Eq)]
72pub struct PoolTransaction {
73 pub pending_transaction: PendingTransaction,
75 pub requires: Vec<TxMarker>,
77 pub provides: Vec<TxMarker>,
79 pub priority: TransactionPriority,
81}
82
83impl PoolTransaction {
86 pub fn new(transaction: PendingTransaction) -> Self {
87 Self {
88 pending_transaction: transaction,
89 requires: vec![],
90 provides: vec![],
91 priority: TransactionPriority(0),
92 }
93 }
94 pub fn hash(&self) -> TxHash {
96 *self.pending_transaction.hash()
97 }
98
99 pub fn max_fee_per_gas(&self) -> u128 {
101 self.pending_transaction.transaction.max_fee_per_gas()
102 }
103
104 pub fn tx_type(&self) -> u8 {
106 self.pending_transaction.transaction.ty()
107 }
108}
109
110impl fmt::Debug for PoolTransaction {
111 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
112 write!(fmt, "Transaction {{ ")?;
113 write!(fmt, "hash: {:?}, ", &self.pending_transaction.hash())?;
114 write!(fmt, "requires: [{}], ", hex_fmt_many(self.requires.iter()))?;
115 write!(fmt, "provides: [{}], ", hex_fmt_many(self.provides.iter()))?;
116 write!(fmt, "raw tx: {:?}", &self.pending_transaction)?;
117 write!(fmt, "}}")?;
118 Ok(())
119 }
120}
121
122impl TryFrom<AnyRpcTransaction> for PoolTransaction {
123 type Error = eyre::Error;
124 fn try_from(value: AnyRpcTransaction) -> Result<Self, Self::Error> {
125 let typed_transaction = FoundryTxEnvelope::try_from(value)?;
126 let pending_transaction = PendingTransaction::new(typed_transaction)?;
127 Ok(Self {
128 pending_transaction,
129 requires: vec![],
130 provides: vec![],
131 priority: TransactionPriority(0),
132 })
133 }
134}
135#[derive(Clone, Debug, Default)]
139pub struct PendingTransactions {
140 required_markers: HashMap<TxMarker, HashSet<TxHash>>,
142 waiting_markers: HashMap<Vec<TxMarker>, TxHash>,
144 waiting_queue: HashMap<TxHash, PendingPoolTransaction>,
146}
147
148impl PendingTransactions {
151 pub fn len(&self) -> usize {
153 self.waiting_queue.len()
154 }
155
156 pub fn is_empty(&self) -> bool {
157 self.waiting_queue.is_empty()
158 }
159
160 pub fn clear(&mut self) {
162 self.required_markers.clear();
163 self.waiting_markers.clear();
164 self.waiting_queue.clear();
165 }
166
167 pub fn transactions(&self) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
169 self.waiting_queue.values().map(|tx| tx.transaction.clone())
170 }
171
172 pub fn add_transaction(&mut self, tx: PendingPoolTransaction) -> Result<(), PoolError> {
174 assert!(!tx.is_ready(), "transaction must not be ready");
175 assert!(
176 !self.waiting_queue.contains_key(&tx.transaction.hash()),
177 "transaction is already added"
178 );
179
180 if let Some(replace) = self
181 .waiting_markers
182 .get(&tx.transaction.provides)
183 .and_then(|hash| self.waiting_queue.get(hash))
184 {
185 if tx.transaction.max_fee_per_gas() < replace.transaction.max_fee_per_gas() {
187 warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", tx.transaction.hash());
188 return Err(PoolError::ReplacementUnderpriced(Box::new(
189 tx.transaction.as_ref().clone(),
190 )));
191 }
192 }
193
194 for marker in &tx.missing_markers {
196 self.required_markers.entry(marker.clone()).or_default().insert(tx.transaction.hash());
197 }
198
199 self.waiting_markers.insert(tx.transaction.provides.clone(), tx.transaction.hash());
201 self.waiting_queue.insert(tx.transaction.hash(), tx);
203
204 Ok(())
205 }
206
207 pub fn contains(&self, hash: &TxHash) -> bool {
209 self.waiting_queue.contains_key(hash)
210 }
211
212 pub fn get(&self, hash: &TxHash) -> Option<&PendingPoolTransaction> {
214 self.waiting_queue.get(hash)
215 }
216
217 pub fn mark_and_unlock(
222 &mut self,
223 markers: impl IntoIterator<Item = impl AsRef<TxMarker>>,
224 ) -> Vec<PendingPoolTransaction> {
225 let mut unlocked_ready = Vec::new();
226 for mark in markers {
227 let mark = mark.as_ref();
228 if let Some(tx_hashes) = self.required_markers.remove(mark) {
229 for hash in tx_hashes {
230 let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
231 tx.mark(mark);
232
233 if tx.is_ready() {
234 let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
235 self.waiting_markers.remove(&tx.transaction.provides);
236
237 unlocked_ready.push(tx);
238 }
239 }
240 }
241 }
242
243 unlocked_ready
244 }
245
246 pub fn remove(&mut self, hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
250 let mut removed = vec![];
251 for hash in hashes {
252 if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
253 self.waiting_markers.remove(&waiting_tx.transaction.provides);
254 for marker in waiting_tx.missing_markers {
255 let remove = if let Some(required) = self.required_markers.get_mut(&marker) {
256 required.remove(&hash);
257 required.is_empty()
258 } else {
259 false
260 };
261 if remove {
262 self.required_markers.remove(&marker);
263 }
264 }
265 removed.push(waiting_tx.transaction)
266 }
267 }
268 removed
269 }
270}
271
272#[derive(Clone)]
274pub struct PendingPoolTransaction {
275 pub transaction: Arc<PoolTransaction>,
276 pub missing_markers: HashSet<TxMarker>,
278 pub added_at: Instant,
280}
281
282impl PendingPoolTransaction {
285 pub fn new(transaction: PoolTransaction, provided: &HashMap<TxMarker, TxHash>) -> Self {
290 let missing_markers = transaction
291 .requires
292 .iter()
293 .filter(|marker| {
294 !provided.contains_key(&**marker)
296 })
297 .cloned()
298 .collect();
299
300 Self { transaction: Arc::new(transaction), missing_markers, added_at: Instant::now() }
301 }
302
303 pub fn mark(&mut self, marker: &TxMarker) {
305 self.missing_markers.remove(marker);
306 }
307
308 pub fn is_ready(&self) -> bool {
310 self.missing_markers.is_empty()
311 }
312}
313
314impl fmt::Debug for PendingPoolTransaction {
315 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
316 write!(fmt, "PendingTransaction {{ ")?;
317 write!(fmt, "added_at: {:?}, ", self.added_at)?;
318 write!(fmt, "tx: {:?}, ", self.transaction)?;
319 write!(fmt, "missing_markers: {{{}}}", hex_fmt_many(self.missing_markers.iter()))?;
320 write!(fmt, "}}")
321 }
322}
323
324pub struct TransactionsIterator {
325 all: HashMap<TxHash, ReadyTransaction>,
326 awaiting: HashMap<TxHash, (usize, PoolTransactionRef)>,
327 independent: BTreeSet<PoolTransactionRef>,
328 _invalid: HashSet<TxHash>,
329}
330
331impl TransactionsIterator {
334 fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef) {
337 if satisfied >= tx_ref.transaction.requires.len() {
338 self.independent.insert(tx_ref);
340 } else {
341 self.awaiting.insert(tx_ref.transaction.hash(), (satisfied, tx_ref));
343 }
344 }
345}
346
347impl Iterator for TransactionsIterator {
348 type Item = Arc<PoolTransaction>;
349
350 fn next(&mut self) -> Option<Self::Item> {
351 loop {
352 let best = self.independent.iter().next_back()?.clone();
353 let best = self.independent.take(&best)?;
354 let hash = best.transaction.hash();
355
356 let ready =
357 if let Some(ready) = self.all.get(&hash).cloned() { ready } else { continue };
358
359 for hash in &ready.unlocks {
361 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
363 satisfied += 1;
364 Some((satisfied, tx_ref))
365 } else {
367 self.all
368 .get(hash)
369 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
370 };
371 if let Some((satisfied, tx_ref)) = res {
372 self.independent_or_awaiting(satisfied, tx_ref)
373 }
374 }
375
376 return Some(best.transaction);
377 }
378 }
379}
380
381#[derive(Clone, Debug, Default)]
383pub struct ReadyTransactions {
384 id: u64,
388 provided_markers: HashMap<TxMarker, TxHash>,
390 ready_tx: Arc<RwLock<HashMap<TxHash, ReadyTransaction>>>,
392 independent_transactions: BTreeSet<PoolTransactionRef>,
395}
396
397impl ReadyTransactions {
400 pub fn get_transactions(&self) -> TransactionsIterator {
402 TransactionsIterator {
403 all: self.ready_tx.read().clone(),
404 independent: self.independent_transactions.clone(),
405 awaiting: Default::default(),
406 _invalid: Default::default(),
407 }
408 }
409
410 pub fn clear(&mut self) {
412 self.provided_markers.clear();
413 self.ready_tx.write().clear();
414 self.independent_transactions.clear();
415 }
416
417 pub fn contains(&self, hash: &TxHash) -> bool {
419 self.ready_tx.read().contains_key(hash)
420 }
421
422 pub fn len(&self) -> usize {
424 self.ready_tx.read().len()
425 }
426
427 pub fn is_empty(&self) -> bool {
429 self.ready_tx.read().is_empty()
430 }
431
432 pub fn get(&self, hash: &TxHash) -> Option<ReadyTransaction> {
434 self.ready_tx.read().get(hash).cloned()
435 }
436
437 pub fn provided_markers(&self) -> &HashMap<TxMarker, TxHash> {
438 &self.provided_markers
439 }
440
441 fn next_id(&mut self) -> u64 {
442 let id = self.id;
443 self.id = self.id.wrapping_add(1);
444 id
445 }
446
447 pub fn add_transaction(
454 &mut self,
455 tx: PendingPoolTransaction,
456 ) -> Result<Vec<Arc<PoolTransaction>>, PoolError> {
457 assert!(tx.is_ready(), "transaction must be ready",);
458 assert!(
459 !self.ready_tx.read().contains_key(&tx.transaction.hash()),
460 "transaction already included"
461 );
462
463 let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
464
465 let id = self.next_id();
466 let hash = tx.transaction.hash();
467
468 let mut independent = true;
469 let mut requires_offset = 0;
470 let mut ready = self.ready_tx.write();
471 for mark in &tx.transaction.requires {
473 if let Some(other) = self.provided_markers.get(mark) {
475 let tx = ready.get_mut(other).expect("hash included;");
476 tx.unlocks.push(hash);
477 independent = false;
479 } else {
480 requires_offset += 1;
481 }
482 }
483
484 for mark in tx.transaction.provides.iter().cloned() {
486 self.provided_markers.insert(mark, hash);
487 }
488
489 let transaction = PoolTransactionRef { id, transaction: tx.transaction };
490
491 if independent {
493 self.independent_transactions.insert(transaction.clone());
494 }
495
496 ready.insert(hash, ReadyTransaction { transaction, unlocks, requires_offset });
498
499 Ok(replaced_tx)
500 }
501
502 fn replaced_transactions(
504 &mut self,
505 tx: &PoolTransaction,
506 ) -> Result<(Vec<Arc<PoolTransaction>>, Vec<TxHash>), PoolError> {
507 let remove_hashes: HashSet<_> =
509 tx.provides.iter().filter_map(|mark| self.provided_markers.get(mark)).collect();
510
511 if remove_hashes.is_empty() {
513 return Ok((Vec::new(), Vec::new()));
514 }
515
516 let mut unlocked_tx = Vec::new();
519 {
520 let ready = self.ready_tx.read();
523 for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(*hash)) {
524 if to_remove.provides() == tx.provides {
527 if tx.pending_transaction.transaction.max_fee_per_gas()
529 <= to_remove.max_fee_per_gas()
530 {
531 warn!(target: "txpool", "ready replacement transaction underpriced [{:?}]", tx.hash());
532 return Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone())));
533 } else {
534 trace!(target: "txpool", "replacing ready transaction [{:?}] with higher gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
535 }
536 }
537
538 unlocked_tx.extend(to_remove.unlocks.iter().copied())
539 }
540 }
541
542 let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
543
544 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
545 let removed_tx = self.remove_with_markers(remove_hashes, Some(new_provides));
546
547 Ok((removed_tx, unlocked_tx))
548 }
549
550 pub fn clear_transactions(&mut self, tx_hashes: &[TxHash]) -> Vec<Arc<PoolTransaction>> {
553 self.remove_with_markers(tx_hashes.to_vec(), None)
554 }
555
556 pub fn prune_tags(&mut self, marker: TxMarker) -> Vec<Arc<PoolTransaction>> {
561 let mut removed_tx = vec![];
562
563 let mut remove = vec![marker];
565
566 while let Some(marker) = remove.pop() {
567 let res = self
568 .provided_markers
569 .remove(&marker)
570 .and_then(|hash| self.ready_tx.write().remove(&hash));
571
572 if let Some(tx) = res {
573 let unlocks = tx.unlocks;
574 self.independent_transactions.remove(&tx.transaction);
575 let tx = tx.transaction.transaction;
576
577 {
579 let hash = tx.hash();
580 let mut ready = self.ready_tx.write();
581
582 let mut previous_markers = |marker| -> Option<Vec<TxMarker>> {
583 let prev_hash = self.provided_markers.get(marker)?;
584 let tx2 = ready.get_mut(prev_hash)?;
585 if let Some(idx) = tx2.unlocks.iter().position(|i| i == &hash) {
587 tx2.unlocks.swap_remove(idx);
588 }
589 if tx2.unlocks.is_empty() {
590 Some(tx2.transaction.transaction.provides.clone())
591 } else {
592 None
593 }
594 };
595
596 for marker in &tx.requires {
598 if let Some(mut tags_to_remove) = previous_markers(marker) {
599 remove.append(&mut tags_to_remove);
600 }
601 }
602 }
603
604 for hash in unlocks {
606 if let Some(tx) = self.ready_tx.write().get_mut(&hash) {
607 tx.requires_offset += 1;
608 if tx.requires_offset == tx.transaction.transaction.requires.len() {
609 self.independent_transactions.insert(tx.transaction.clone());
610 }
611 }
612 }
613 let current_marker = ▮
615 for marker in &tx.provides {
616 let removed = self.provided_markers.remove(marker);
617 assert_eq!(
618 removed,
619 if current_marker == marker { None } else { Some(tx.hash()) },
620 "The pool contains exactly one transaction providing given tag; the removed transaction
621 claims to provide that tag, so it has to be mapped to it's hash; qed"
622 );
623 }
624 removed_tx.push(tx);
625 }
626 }
627
628 removed_tx
629 }
630
631 pub fn remove_with_markers(
634 &mut self,
635 mut tx_hashes: Vec<TxHash>,
636 marker_filter: Option<HashSet<TxMarker>>,
637 ) -> Vec<Arc<PoolTransaction>> {
638 let mut removed = Vec::new();
639 let mut ready = self.ready_tx.write();
640
641 while let Some(hash) = tx_hashes.pop() {
642 if let Some(mut tx) = ready.remove(&hash) {
643 let invalidated = tx.transaction.transaction.provides.iter().filter(|mark| {
644 marker_filter.as_ref().map(|filter| !filter.contains(&**mark)).unwrap_or(true)
645 });
646
647 let mut removed_some_marks = false;
648 for mark in invalidated {
650 removed_some_marks = true;
651 self.provided_markers.remove(mark);
652 }
653
654 for mark in &tx.transaction.transaction.requires {
656 if let Some(hash) = self.provided_markers.get(mark)
657 && let Some(tx) = ready.get_mut(hash)
658 && let Some(idx) = tx.unlocks.iter().position(|i| i == hash)
659 {
660 tx.unlocks.swap_remove(idx);
661 }
662 }
663
664 self.independent_transactions.remove(&tx.transaction);
666
667 if removed_some_marks {
668 tx_hashes.append(&mut tx.unlocks);
670 }
671
672 removed.push(tx.transaction.transaction);
674 }
675 }
676
677 removed
678 }
679}
680
681#[derive(Clone, Debug)]
683pub struct PoolTransactionRef {
684 pub transaction: Arc<PoolTransaction>,
686 pub id: u64,
688}
689
690impl Eq for PoolTransactionRef {}
691
692impl PartialEq<Self> for PoolTransactionRef {
693 fn eq(&self, other: &Self) -> bool {
694 self.cmp(other) == Ordering::Equal
695 }
696}
697
698impl PartialOrd<Self> for PoolTransactionRef {
699 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
700 Some(self.cmp(other))
701 }
702}
703
704impl Ord for PoolTransactionRef {
705 fn cmp(&self, other: &Self) -> Ordering {
706 self.transaction
707 .priority
708 .cmp(&other.transaction.priority)
709 .then_with(|| other.id.cmp(&self.id))
710 }
711}
712
713#[derive(Clone, Debug)]
714pub struct ReadyTransaction {
715 pub transaction: PoolTransactionRef,
717 pub unlocks: Vec<TxHash>,
719 pub requires_offset: usize,
721}
722
723impl ReadyTransaction {
724 pub fn provides(&self) -> &[TxMarker] {
725 &self.transaction.transaction.provides
726 }
727
728 pub fn max_fee_per_gas(&self) -> u128 {
729 self.transaction.transaction.max_fee_per_gas()
730 }
731}
732
733#[cfg(test)]
734mod tests {
735 use super::*;
736
737 #[test]
738 fn can_id_txs() {
739 let addr = Address::random();
740 assert_eq!(to_marker(1, addr), to_marker(1, addr));
741 assert_ne!(to_marker(2, addr), to_marker(1, addr));
742 }
743}