1use crate::eth::{error::PoolError, util::hex_fmt_many};
2use alloy_consensus::{Transaction, Typed2718};
3use alloy_network::AnyRpcTransaction;
4use alloy_primitives::{
5 Address, TxHash,
6 map::{HashMap, HashSet},
7};
8use anvil_core::eth::transaction::PendingTransaction;
9use foundry_primitives::FoundryTxEnvelope;
10use parking_lot::RwLock;
11use std::{cmp::Ordering, collections::BTreeSet, fmt, str::FromStr, sync::Arc, time::Instant};
12
13pub type TxMarker = Vec<u8>;
15
16type ReplacedTransactions<T> = (Vec<Arc<PoolTransaction<T>>>, Vec<TxHash>);
19
20pub fn to_marker(nonce: u64, from: Address) -> TxMarker {
22 let mut data = [0u8; 28];
23 data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
24 data[8..].copy_from_slice(&from.0[..]);
25 data.to_vec()
26}
27
28#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub enum TransactionOrder {
33 Fifo,
38 #[default]
40 Fees,
41}
42
43impl TransactionOrder {
44 pub fn priority<T: Transaction>(&self, tx: &T) -> TransactionPriority {
46 match self {
47 Self::Fifo => TransactionPriority::default(),
48 Self::Fees => TransactionPriority(tx.max_fee_per_gas()),
49 }
50 }
51}
52
53impl FromStr for TransactionOrder {
54 type Err = String;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 let s = s.to_lowercase();
58 let order = match s.as_str() {
59 "fees" => Self::Fees,
60 "fifo" => Self::Fifo,
61 _ => return Err(format!("Unknown TransactionOrder: `{s}`")),
62 };
63 Ok(order)
64 }
65}
66
67#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
72pub struct TransactionPriority(pub u128);
73
74#[derive(Clone, PartialEq, Eq)]
76pub struct PoolTransaction<T> {
77 pub pending_transaction: PendingTransaction<T>,
79 pub requires: Vec<TxMarker>,
81 pub provides: Vec<TxMarker>,
83 pub priority: TransactionPriority,
85}
86
87impl<T> PoolTransaction<T> {
90 pub fn new(transaction: PendingTransaction<T>) -> Self {
91 Self {
92 pending_transaction: transaction,
93 requires: vec![],
94 provides: vec![],
95 priority: TransactionPriority(0),
96 }
97 }
98
99 pub fn hash(&self) -> TxHash {
101 *self.pending_transaction.hash()
102 }
103}
104
105impl<T: Transaction> PoolTransaction<T> {
106 pub fn max_fee_per_gas(&self) -> u128 {
108 self.pending_transaction.transaction.max_fee_per_gas()
109 }
110}
111
112impl<T: Typed2718> PoolTransaction<T> {
113 pub fn tx_type(&self) -> u8 {
115 self.pending_transaction.transaction.ty()
116 }
117}
118
119impl<T: fmt::Debug> fmt::Debug for PoolTransaction<T> {
120 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(fmt, "Transaction {{ ")?;
122 write!(fmt, "hash: {:?}, ", &self.pending_transaction.hash())?;
123 write!(fmt, "requires: [{}], ", hex_fmt_many(self.requires.iter()))?;
124 write!(fmt, "provides: [{}], ", hex_fmt_many(self.provides.iter()))?;
125 write!(fmt, "raw tx: {:?}", &self.pending_transaction)?;
126 write!(fmt, "}}")?;
127 Ok(())
128 }
129}
130
131impl TryFrom<AnyRpcTransaction> for PoolTransaction<FoundryTxEnvelope> {
132 type Error = eyre::Error;
133 fn try_from(value: AnyRpcTransaction) -> Result<Self, Self::Error> {
134 let typed_transaction = FoundryTxEnvelope::try_from(value)?;
135 let pending_transaction = PendingTransaction::new(typed_transaction)?;
136 Ok(Self {
137 pending_transaction,
138 requires: vec![],
139 provides: vec![],
140 priority: TransactionPriority(0),
141 })
142 }
143}
144
145#[derive(Clone, Debug)]
149pub struct PendingTransactions<T> {
150 required_markers: HashMap<TxMarker, HashSet<TxHash>>,
152 waiting_markers: HashMap<Vec<TxMarker>, TxHash>,
154 waiting_queue: HashMap<TxHash, PendingPoolTransaction<T>>,
156}
157
158impl<T> Default for PendingTransactions<T> {
159 fn default() -> Self {
160 Self {
161 required_markers: Default::default(),
162 waiting_markers: Default::default(),
163 waiting_queue: Default::default(),
164 }
165 }
166}
167
168impl<T> PendingTransactions<T> {
169 pub fn len(&self) -> usize {
171 self.waiting_queue.len()
172 }
173
174 pub fn is_empty(&self) -> bool {
175 self.waiting_queue.is_empty()
176 }
177
178 pub fn clear(&mut self) {
180 self.required_markers.clear();
181 self.waiting_markers.clear();
182 self.waiting_queue.clear();
183 }
184
185 pub fn transactions(&self) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
187 self.waiting_queue.values().map(|tx| tx.transaction.clone())
188 }
189
190 pub fn contains(&self, hash: &TxHash) -> bool {
192 self.waiting_queue.contains_key(hash)
193 }
194
195 pub fn get(&self, hash: &TxHash) -> Option<&PendingPoolTransaction<T>> {
197 self.waiting_queue.get(hash)
198 }
199
200 pub fn mark_and_unlock(
205 &mut self,
206 markers: impl IntoIterator<Item = impl AsRef<TxMarker>>,
207 ) -> Vec<PendingPoolTransaction<T>> {
208 let mut unlocked_ready = Vec::new();
209 for mark in markers {
210 let mark = mark.as_ref();
211 if let Some(tx_hashes) = self.required_markers.remove(mark) {
212 for hash in tx_hashes {
213 let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
214 tx.mark(mark);
215
216 if tx.is_ready() {
217 let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
218 self.waiting_markers.remove(&tx.transaction.provides);
219
220 unlocked_ready.push(tx);
221 }
222 }
223 }
224 }
225
226 unlocked_ready
227 }
228
229 pub fn remove(&mut self, hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
233 let mut removed = vec![];
234 for hash in hashes {
235 if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
236 self.waiting_markers.remove(&waiting_tx.transaction.provides);
237 for marker in waiting_tx.missing_markers {
238 let remove = if let Some(required) = self.required_markers.get_mut(&marker) {
239 required.remove(&hash);
240 required.is_empty()
241 } else {
242 false
243 };
244 if remove {
245 self.required_markers.remove(&marker);
246 }
247 }
248 removed.push(waiting_tx.transaction)
249 }
250 }
251 removed
252 }
253}
254
255impl<T: Transaction> PendingTransactions<T> {
256 pub fn add_transaction(&mut self, tx: PendingPoolTransaction<T>) -> Result<(), PoolError> {
258 assert!(!tx.is_ready(), "transaction must not be ready");
259 assert!(
260 !self.waiting_queue.contains_key(&tx.transaction.hash()),
261 "transaction is already added"
262 );
263
264 if let Some(replace) = self
265 .waiting_markers
266 .get(&tx.transaction.provides)
267 .and_then(|hash| self.waiting_queue.get(hash))
268 {
269 if tx.transaction.max_fee_per_gas() < replace.transaction.max_fee_per_gas() {
271 warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", tx.transaction.hash());
272 return Err(PoolError::ReplacementUnderpriced(tx.transaction.hash()));
273 }
274 }
275
276 for marker in &tx.missing_markers {
278 self.required_markers.entry(marker.clone()).or_default().insert(tx.transaction.hash());
279 }
280
281 self.waiting_markers.insert(tx.transaction.provides.clone(), tx.transaction.hash());
283 self.waiting_queue.insert(tx.transaction.hash(), tx);
285
286 Ok(())
287 }
288}
289
290#[derive(Clone)]
292pub struct PendingPoolTransaction<T> {
293 pub transaction: Arc<PoolTransaction<T>>,
294 pub missing_markers: HashSet<TxMarker>,
296 pub added_at: Instant,
298}
299
300impl<T> PendingPoolTransaction<T> {
301 pub fn new(transaction: PoolTransaction<T>, provided: &HashMap<TxMarker, TxHash>) -> Self {
306 let missing_markers = transaction
307 .requires
308 .iter()
309 .filter(|marker| {
310 !provided.contains_key(&**marker)
312 })
313 .cloned()
314 .collect();
315
316 Self { transaction: Arc::new(transaction), missing_markers, added_at: Instant::now() }
317 }
318
319 pub fn mark(&mut self, marker: &TxMarker) {
321 self.missing_markers.remove(marker);
322 }
323
324 pub fn is_ready(&self) -> bool {
326 self.missing_markers.is_empty()
327 }
328}
329
330impl<T: fmt::Debug> fmt::Debug for PendingPoolTransaction<T> {
331 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(fmt, "PendingTransaction {{ ")?;
333 write!(fmt, "added_at: {:?}, ", self.added_at)?;
334 write!(fmt, "tx: {:?}, ", self.transaction)?;
335 write!(fmt, "missing_markers: {{{}}}", hex_fmt_many(self.missing_markers.iter()))?;
336 write!(fmt, "}}")
337 }
338}
339
340pub struct TransactionsIterator<T> {
341 all: HashMap<TxHash, ReadyTransaction<T>>,
342 awaiting: HashMap<TxHash, (usize, PoolTransactionRef<T>)>,
343 independent: BTreeSet<PoolTransactionRef<T>>,
344 _invalid: HashSet<TxHash>,
345}
346
347impl<T> TransactionsIterator<T> {
348 fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef<T>) {
351 if satisfied >= tx_ref.transaction.requires.len() {
352 self.independent.insert(tx_ref);
354 } else {
355 self.awaiting.insert(tx_ref.transaction.hash(), (satisfied, tx_ref));
357 }
358 }
359}
360
361impl<T> Iterator for TransactionsIterator<T> {
362 type Item = Arc<PoolTransaction<T>>;
363
364 fn next(&mut self) -> Option<Self::Item> {
365 loop {
366 let best = self.independent.iter().next_back()?.clone();
367 let best = self.independent.take(&best)?;
368 let hash = best.transaction.hash();
369
370 let ready =
371 if let Some(ready) = self.all.get(&hash).cloned() { ready } else { continue };
372
373 for hash in &ready.unlocks {
375 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
377 satisfied += 1;
378 Some((satisfied, tx_ref))
379 } else {
381 self.all
382 .get(hash)
383 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
384 };
385 if let Some((satisfied, tx_ref)) = res {
386 self.independent_or_awaiting(satisfied, tx_ref)
387 }
388 }
389
390 return Some(best.transaction);
391 }
392 }
393}
394
395#[derive(Clone, Debug)]
397pub struct ReadyTransactions<T> {
398 id: u64,
402 provided_markers: HashMap<TxMarker, TxHash>,
404 ready_tx: Arc<RwLock<HashMap<TxHash, ReadyTransaction<T>>>>,
406 independent_transactions: BTreeSet<PoolTransactionRef<T>>,
409}
410
411impl<T> Default for ReadyTransactions<T> {
412 fn default() -> Self {
413 Self {
414 id: 0,
415 provided_markers: Default::default(),
416 ready_tx: Default::default(),
417 independent_transactions: Default::default(),
418 }
419 }
420}
421
422impl<T> ReadyTransactions<T> {
423 pub fn get_transactions(&self) -> TransactionsIterator<T> {
425 TransactionsIterator {
426 all: self.ready_tx.read().clone(),
427 independent: self.independent_transactions.clone(),
428 awaiting: Default::default(),
429 _invalid: Default::default(),
430 }
431 }
432
433 pub fn clear(&mut self) {
435 self.provided_markers.clear();
436 self.ready_tx.write().clear();
437 self.independent_transactions.clear();
438 }
439
440 pub fn contains(&self, hash: &TxHash) -> bool {
442 self.ready_tx.read().contains_key(hash)
443 }
444
445 pub fn len(&self) -> usize {
447 self.ready_tx.read().len()
448 }
449
450 pub fn is_empty(&self) -> bool {
452 self.ready_tx.read().is_empty()
453 }
454
455 pub fn get(&self, hash: &TxHash) -> Option<ReadyTransaction<T>> {
457 self.ready_tx.read().get(hash).cloned()
458 }
459
460 pub fn provided_markers(&self) -> &HashMap<TxMarker, TxHash> {
461 &self.provided_markers
462 }
463
464 fn next_id(&mut self) -> u64 {
465 let id = self.id;
466 self.id = self.id.wrapping_add(1);
467 id
468 }
469
470 pub fn clear_transactions(&mut self, tx_hashes: &[TxHash]) -> Vec<Arc<PoolTransaction<T>>> {
473 self.remove_with_markers(tx_hashes.to_vec(), None)
474 }
475
476 pub fn prune_tags(&mut self, marker: TxMarker) -> Vec<Arc<PoolTransaction<T>>> {
481 let mut removed_tx = vec![];
482
483 let mut remove = vec![marker];
485
486 while let Some(marker) = remove.pop() {
487 let res = self
488 .provided_markers
489 .remove(&marker)
490 .and_then(|hash| self.ready_tx.write().remove(&hash));
491
492 if let Some(tx) = res {
493 let unlocks = tx.unlocks;
494 self.independent_transactions.remove(&tx.transaction);
495 let tx = tx.transaction.transaction;
496
497 {
499 let hash = tx.hash();
500 let mut ready = self.ready_tx.write();
501
502 let mut previous_markers = |marker| -> Option<Vec<TxMarker>> {
503 let prev_hash = self.provided_markers.get(marker)?;
504 let tx2 = ready.get_mut(prev_hash)?;
505 if let Some(idx) = tx2.unlocks.iter().position(|i| i == &hash) {
507 tx2.unlocks.swap_remove(idx);
508 }
509 if tx2.unlocks.is_empty() {
510 Some(tx2.transaction.transaction.provides.clone())
511 } else {
512 None
513 }
514 };
515
516 for marker in &tx.requires {
518 if let Some(mut tags_to_remove) = previous_markers(marker) {
519 remove.append(&mut tags_to_remove);
520 }
521 }
522 }
523
524 for hash in unlocks {
526 if let Some(tx) = self.ready_tx.write().get_mut(&hash) {
527 tx.requires_offset += 1;
528 if tx.requires_offset == tx.transaction.transaction.requires.len() {
529 self.independent_transactions.insert(tx.transaction.clone());
530 }
531 }
532 }
533 let current_marker = ▮
535 for marker in &tx.provides {
536 let removed = self.provided_markers.remove(marker);
537 assert_eq!(
538 removed,
539 if current_marker == marker { None } else { Some(tx.hash()) },
540 "The pool contains exactly one transaction providing given tag; the removed transaction
541 claims to provide that tag, so it has to be mapped to it's hash; qed"
542 );
543 }
544 removed_tx.push(tx);
545 }
546 }
547
548 removed_tx
549 }
550
551 pub fn remove_with_markers(
554 &mut self,
555 mut tx_hashes: Vec<TxHash>,
556 marker_filter: Option<HashSet<TxMarker>>,
557 ) -> Vec<Arc<PoolTransaction<T>>> {
558 let mut removed = Vec::new();
559 let mut ready = self.ready_tx.write();
560
561 while let Some(hash) = tx_hashes.pop() {
562 if let Some(mut tx) = ready.remove(&hash) {
563 let invalidated = tx.transaction.transaction.provides.iter().filter(|mark| {
564 marker_filter.as_ref().map(|filter| !filter.contains(&**mark)).unwrap_or(true)
565 });
566
567 let mut removed_some_marks = false;
568 for mark in invalidated {
570 removed_some_marks = true;
571 self.provided_markers.remove(mark);
572 }
573
574 for mark in &tx.transaction.transaction.requires {
576 if let Some(provider_hash) = self.provided_markers.get(mark)
577 && let Some(provider_tx) = ready.get_mut(provider_hash)
578 && let Some(idx) = provider_tx.unlocks.iter().position(|i| i == &hash)
579 {
580 provider_tx.unlocks.swap_remove(idx);
581 }
582 }
583
584 self.independent_transactions.remove(&tx.transaction);
586
587 if removed_some_marks {
588 tx_hashes.append(&mut tx.unlocks);
590 }
591
592 removed.push(tx.transaction.transaction);
594 }
595 }
596
597 removed
598 }
599}
600
601impl<T: Transaction> ReadyTransactions<T> {
602 pub fn add_transaction(
609 &mut self,
610 tx: PendingPoolTransaction<T>,
611 ) -> Result<Vec<Arc<PoolTransaction<T>>>, PoolError> {
612 assert!(tx.is_ready(), "transaction must be ready",);
613 assert!(
614 !self.ready_tx.read().contains_key(&tx.transaction.hash()),
615 "transaction already included"
616 );
617
618 let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
619
620 let id = self.next_id();
621 let hash = tx.transaction.hash();
622
623 let mut independent = true;
624 let mut requires_offset = 0;
625 let mut ready = self.ready_tx.write();
626 for mark in &tx.transaction.requires {
628 if let Some(other) = self.provided_markers.get(mark) {
630 let tx = ready.get_mut(other).expect("hash included;");
631 tx.unlocks.push(hash);
632 independent = false;
634 } else {
635 requires_offset += 1;
636 }
637 }
638
639 for mark in tx.transaction.provides.iter().cloned() {
641 self.provided_markers.insert(mark, hash);
642 }
643
644 let transaction = PoolTransactionRef { id, transaction: tx.transaction };
645
646 if independent {
648 self.independent_transactions.insert(transaction.clone());
649 }
650
651 ready.insert(hash, ReadyTransaction { transaction, unlocks, requires_offset });
653
654 Ok(replaced_tx)
655 }
656
657 fn replaced_transactions(
659 &mut self,
660 tx: &PoolTransaction<T>,
661 ) -> Result<ReplacedTransactions<T>, PoolError> {
662 let remove_hashes: HashSet<_> =
664 tx.provides.iter().filter_map(|mark| self.provided_markers.get(mark)).collect();
665
666 if remove_hashes.is_empty() {
668 return Ok((Vec::new(), Vec::new()));
669 }
670
671 let mut unlocked_tx = Vec::new();
673 {
674 let ready = self.ready_tx.read();
677 for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(*hash)) {
678 if to_remove.provides() == tx.provides {
681 if tx.pending_transaction.transaction.max_fee_per_gas()
683 <= to_remove.max_fee_per_gas()
684 {
685 warn!(target: "txpool", "ready replacement transaction underpriced [{:?}]", tx.hash());
686 return Err(PoolError::ReplacementUnderpriced(tx.hash()));
687 } else {
688 trace!(target: "txpool", "replacing ready transaction [{:?}] with higher gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
689 }
690 }
691
692 unlocked_tx.extend(to_remove.unlocks.iter().copied())
693 }
694 }
695
696 let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
697
698 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
699 let removed_tx = self.remove_with_markers(remove_hashes, Some(new_provides));
700
701 Ok((removed_tx, unlocked_tx))
702 }
703}
704
705#[derive(Debug)]
707pub struct PoolTransactionRef<T> {
708 pub transaction: Arc<PoolTransaction<T>>,
710 pub id: u64,
712}
713
714impl<T> Clone for PoolTransactionRef<T> {
715 fn clone(&self) -> Self {
716 Self { transaction: Arc::clone(&self.transaction), id: self.id }
717 }
718}
719
720impl<T> Eq for PoolTransactionRef<T> {}
721
722impl<T> PartialEq<Self> for PoolTransactionRef<T> {
723 fn eq(&self, other: &Self) -> bool {
724 self.cmp(other) == Ordering::Equal
725 }
726}
727
728impl<T> PartialOrd<Self> for PoolTransactionRef<T> {
729 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
730 Some(self.cmp(other))
731 }
732}
733
734impl<T> Ord for PoolTransactionRef<T> {
735 fn cmp(&self, other: &Self) -> Ordering {
736 self.transaction
737 .priority
738 .cmp(&other.transaction.priority)
739 .then_with(|| other.id.cmp(&self.id))
740 }
741}
742
743#[derive(Debug)]
744pub struct ReadyTransaction<T> {
745 pub transaction: PoolTransactionRef<T>,
747 pub unlocks: Vec<TxHash>,
749 pub requires_offset: usize,
751}
752
753impl<T> Clone for ReadyTransaction<T> {
754 fn clone(&self) -> Self {
755 Self {
756 transaction: self.transaction.clone(),
757 unlocks: self.unlocks.clone(),
758 requires_offset: self.requires_offset,
759 }
760 }
761}
762
763impl<T> ReadyTransaction<T> {
764 pub fn provides(&self) -> &[TxMarker] {
765 &self.transaction.transaction.provides
766 }
767}
768
769impl<T: Transaction> ReadyTransaction<T> {
770 pub fn max_fee_per_gas(&self) -> u128 {
771 self.transaction.transaction.max_fee_per_gas()
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778
779 #[test]
780 fn can_id_txs() {
781 let addr = Address::random();
782 assert_eq!(to_marker(1, addr), to_marker(1, addr));
783 assert_ne!(to_marker(2, addr), to_marker(1, addr));
784 }
785}