1use crate::eth::{error::PoolError, util::hex_fmt_many};
2use alloy_consensus::{
3 Transaction, Typed2718,
4 crypto::RecoveryError,
5 transaction::{SignerRecoverable, TxHashRef},
6};
7use alloy_network::AnyRpcTransaction;
8use alloy_primitives::{
9 Address, TxHash,
10 map::{HashMap, HashSet},
11};
12use alloy_rlp::Encodable;
13use anvil_core::eth::transaction::PendingTransaction;
14use parking_lot::RwLock;
15use std::{cmp::Ordering, collections::BTreeSet, fmt, str::FromStr, sync::Arc, time::Instant};
16
17pub type TxMarker = Vec<u8>;
19
20type ReplacedTransactions<T> = (Vec<Arc<PoolTransaction<T>>>, Vec<TxHash>);
23
24pub fn to_marker(nonce: u64, from: Address) -> TxMarker {
26 let mut data = [0u8; 28];
27 data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
28 data[8..].copy_from_slice(&from.0[..]);
29 data.to_vec()
30}
31
32#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36pub enum TransactionOrder {
37 Fifo,
42 #[default]
44 Fees,
45}
46
47impl TransactionOrder {
48 pub fn priority<T: Transaction>(&self, tx: &T) -> TransactionPriority {
50 match self {
51 Self::Fifo => TransactionPriority::default(),
52 Self::Fees => TransactionPriority(tx.max_fee_per_gas()),
53 }
54 }
55}
56
57impl FromStr for TransactionOrder {
58 type Err = String;
59
60 fn from_str(s: &str) -> Result<Self, Self::Err> {
61 let s = s.to_lowercase();
62 let order = match s.as_str() {
63 "fees" => Self::Fees,
64 "fifo" => Self::Fifo,
65 _ => return Err(format!("Unknown TransactionOrder: `{s}`")),
66 };
67 Ok(order)
68 }
69}
70
71#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
76pub struct TransactionPriority(pub u128);
77
78#[derive(Clone, PartialEq, Eq)]
80pub struct PoolTransaction<T> {
81 pub pending_transaction: PendingTransaction<T>,
83 pub requires: Vec<TxMarker>,
85 pub provides: Vec<TxMarker>,
87 pub priority: TransactionPriority,
89}
90
91impl<T> PoolTransaction<T> {
94 pub fn new(transaction: PendingTransaction<T>) -> Self {
95 Self {
96 pending_transaction: transaction,
97 requires: vec![],
98 provides: vec![],
99 priority: TransactionPriority(0),
100 }
101 }
102
103 pub fn hash(&self) -> TxHash {
105 *self.pending_transaction.hash()
106 }
107}
108
109impl<T: Transaction> PoolTransaction<T> {
110 pub fn max_fee_per_gas(&self) -> u128 {
112 self.pending_transaction.transaction.max_fee_per_gas()
113 }
114}
115
116impl<T: Typed2718> PoolTransaction<T> {
117 pub fn tx_type(&self) -> u8 {
119 self.pending_transaction.transaction.ty()
120 }
121}
122
123impl<T: fmt::Debug> fmt::Debug for PoolTransaction<T> {
124 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
125 write!(fmt, "Transaction {{ ")?;
126 write!(fmt, "hash: {:?}, ", &self.pending_transaction.hash())?;
127 write!(fmt, "requires: [{}], ", hex_fmt_many(self.requires.iter()))?;
128 write!(fmt, "provides: [{}], ", hex_fmt_many(self.provides.iter()))?;
129 write!(fmt, "raw tx: {:?}", &self.pending_transaction)?;
130 write!(fmt, "}}")?;
131 Ok(())
132 }
133}
134
135impl<T> TryFrom<AnyRpcTransaction> for PoolTransaction<T>
136where
137 T: SignerRecoverable + TxHashRef + Encodable + TryFrom<AnyRpcTransaction>,
138 <T as TryFrom<AnyRpcTransaction>>::Error: Into<eyre::Error>,
139 RecoveryError: Into<eyre::Error>,
140{
141 type Error = eyre::Error;
142 fn try_from(value: AnyRpcTransaction) -> Result<Self, Self::Error> {
143 let typed_transaction = T::try_from(value).map_err(Into::into)?;
144 let pending_transaction = PendingTransaction::new(typed_transaction)?;
145 Ok(Self {
146 pending_transaction,
147 requires: vec![],
148 provides: vec![],
149 priority: TransactionPriority(0),
150 })
151 }
152}
153
154#[derive(Clone, Debug)]
158pub struct PendingTransactions<T> {
159 required_markers: HashMap<TxMarker, HashSet<TxHash>>,
161 waiting_markers: HashMap<Vec<TxMarker>, TxHash>,
163 waiting_queue: HashMap<TxHash, PendingPoolTransaction<T>>,
165}
166
167impl<T> Default for PendingTransactions<T> {
168 fn default() -> Self {
169 Self {
170 required_markers: Default::default(),
171 waiting_markers: Default::default(),
172 waiting_queue: Default::default(),
173 }
174 }
175}
176
177impl<T> PendingTransactions<T> {
178 pub fn len(&self) -> usize {
180 self.waiting_queue.len()
181 }
182
183 pub fn is_empty(&self) -> bool {
184 self.waiting_queue.is_empty()
185 }
186
187 pub fn clear(&mut self) {
189 self.required_markers.clear();
190 self.waiting_markers.clear();
191 self.waiting_queue.clear();
192 }
193
194 pub fn transactions(&self) -> impl Iterator<Item = Arc<PoolTransaction<T>>> + '_ {
196 self.waiting_queue.values().map(|tx| tx.transaction.clone())
197 }
198
199 pub fn contains(&self, hash: &TxHash) -> bool {
201 self.waiting_queue.contains_key(hash)
202 }
203
204 pub fn get(&self, hash: &TxHash) -> Option<&PendingPoolTransaction<T>> {
206 self.waiting_queue.get(hash)
207 }
208
209 pub fn mark_and_unlock(
214 &mut self,
215 markers: impl IntoIterator<Item = impl AsRef<TxMarker>>,
216 ) -> Vec<PendingPoolTransaction<T>> {
217 let mut unlocked_ready = Vec::new();
218 for mark in markers {
219 let mark = mark.as_ref();
220 if let Some(tx_hashes) = self.required_markers.remove(mark) {
221 for hash in tx_hashes {
222 let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
223 tx.mark(mark);
224
225 if tx.is_ready() {
226 let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
227 self.waiting_markers.remove(&tx.transaction.provides);
228
229 unlocked_ready.push(tx);
230 }
231 }
232 }
233 }
234
235 unlocked_ready
236 }
237
238 pub fn remove(&mut self, hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction<T>>> {
242 let mut removed = vec![];
243 for hash in hashes {
244 if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
245 self.waiting_markers.remove(&waiting_tx.transaction.provides);
246 for marker in waiting_tx.missing_markers {
247 let remove = if let Some(required) = self.required_markers.get_mut(&marker) {
248 required.remove(&hash);
249 required.is_empty()
250 } else {
251 false
252 };
253 if remove {
254 self.required_markers.remove(&marker);
255 }
256 }
257 removed.push(waiting_tx.transaction)
258 }
259 }
260 removed
261 }
262}
263
264impl<T: Transaction> PendingTransactions<T> {
265 pub fn add_transaction(&mut self, tx: PendingPoolTransaction<T>) -> Result<(), PoolError> {
267 assert!(!tx.is_ready(), "transaction must not be ready");
268 assert!(
269 !self.waiting_queue.contains_key(&tx.transaction.hash()),
270 "transaction is already added"
271 );
272
273 if let Some(replace) = self
274 .waiting_markers
275 .get(&tx.transaction.provides)
276 .and_then(|hash| self.waiting_queue.get(hash))
277 {
278 if tx.transaction.max_fee_per_gas() <= replace.transaction.max_fee_per_gas() {
280 warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", tx.transaction.hash());
281 return Err(PoolError::ReplacementUnderpriced(tx.transaction.hash()));
282 }
283 }
284
285 for marker in &tx.missing_markers {
287 self.required_markers.entry(marker.clone()).or_default().insert(tx.transaction.hash());
288 }
289
290 self.waiting_markers.insert(tx.transaction.provides.clone(), tx.transaction.hash());
292 self.waiting_queue.insert(tx.transaction.hash(), tx);
294
295 Ok(())
296 }
297}
298
299#[derive(Clone)]
301pub struct PendingPoolTransaction<T> {
302 pub transaction: Arc<PoolTransaction<T>>,
303 pub missing_markers: HashSet<TxMarker>,
305 pub added_at: Instant,
307}
308
309impl<T> PendingPoolTransaction<T> {
310 pub fn new(transaction: PoolTransaction<T>, provided: &HashMap<TxMarker, TxHash>) -> Self {
315 let missing_markers = transaction
316 .requires
317 .iter()
318 .filter(|marker| {
319 !provided.contains_key(&**marker)
321 })
322 .cloned()
323 .collect();
324
325 Self { transaction: Arc::new(transaction), missing_markers, added_at: Instant::now() }
326 }
327
328 pub fn mark(&mut self, marker: &TxMarker) {
330 self.missing_markers.remove(marker);
331 }
332
333 pub fn is_ready(&self) -> bool {
335 self.missing_markers.is_empty()
336 }
337}
338
339impl<T: fmt::Debug> fmt::Debug for PendingPoolTransaction<T> {
340 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
341 write!(fmt, "PendingTransaction {{ ")?;
342 write!(fmt, "added_at: {:?}, ", self.added_at)?;
343 write!(fmt, "tx: {:?}, ", self.transaction)?;
344 write!(fmt, "missing_markers: {{{}}}", hex_fmt_many(self.missing_markers.iter()))?;
345 write!(fmt, "}}")
346 }
347}
348
349pub struct TransactionsIterator<T> {
350 all: HashMap<TxHash, ReadyTransaction<T>>,
351 awaiting: HashMap<TxHash, (usize, PoolTransactionRef<T>)>,
352 independent: BTreeSet<PoolTransactionRef<T>>,
353 _invalid: HashSet<TxHash>,
354}
355
356impl<T> TransactionsIterator<T> {
357 fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef<T>) {
360 if satisfied >= tx_ref.transaction.requires.len() {
361 self.independent.insert(tx_ref);
363 } else {
364 self.awaiting.insert(tx_ref.transaction.hash(), (satisfied, tx_ref));
366 }
367 }
368}
369
370impl<T> Iterator for TransactionsIterator<T> {
371 type Item = Arc<PoolTransaction<T>>;
372
373 fn next(&mut self) -> Option<Self::Item> {
374 loop {
375 let best = self.independent.iter().next_back()?.clone();
376 let best = self.independent.take(&best)?;
377 let hash = best.transaction.hash();
378
379 let ready =
380 if let Some(ready) = self.all.get(&hash).cloned() { ready } else { continue };
381
382 for hash in &ready.unlocks {
384 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
386 satisfied += 1;
387 Some((satisfied, tx_ref))
388 } else {
390 self.all
391 .get(hash)
392 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
393 };
394 if let Some((satisfied, tx_ref)) = res {
395 self.independent_or_awaiting(satisfied, tx_ref)
396 }
397 }
398
399 return Some(best.transaction);
400 }
401 }
402}
403
404#[derive(Clone, Debug)]
406pub struct ReadyTransactions<T> {
407 id: u64,
411 provided_markers: HashMap<TxMarker, TxHash>,
413 ready_tx: Arc<RwLock<HashMap<TxHash, ReadyTransaction<T>>>>,
415 independent_transactions: BTreeSet<PoolTransactionRef<T>>,
418}
419
420impl<T> Default for ReadyTransactions<T> {
421 fn default() -> Self {
422 Self {
423 id: 0,
424 provided_markers: Default::default(),
425 ready_tx: Default::default(),
426 independent_transactions: Default::default(),
427 }
428 }
429}
430
431impl<T> ReadyTransactions<T> {
432 pub fn get_transactions(&self) -> TransactionsIterator<T> {
434 TransactionsIterator {
435 all: self.ready_tx.read().clone(),
436 independent: self.independent_transactions.clone(),
437 awaiting: Default::default(),
438 _invalid: Default::default(),
439 }
440 }
441
442 pub fn clear(&mut self) {
444 self.provided_markers.clear();
445 self.ready_tx.write().clear();
446 self.independent_transactions.clear();
447 }
448
449 pub fn contains(&self, hash: &TxHash) -> bool {
451 self.ready_tx.read().contains_key(hash)
452 }
453
454 pub fn len(&self) -> usize {
456 self.ready_tx.read().len()
457 }
458
459 pub fn is_empty(&self) -> bool {
461 self.ready_tx.read().is_empty()
462 }
463
464 pub fn get(&self, hash: &TxHash) -> Option<ReadyTransaction<T>> {
466 self.ready_tx.read().get(hash).cloned()
467 }
468
469 pub fn provided_markers(&self) -> &HashMap<TxMarker, TxHash> {
470 &self.provided_markers
471 }
472
473 fn next_id(&mut self) -> u64 {
474 let id = self.id;
475 self.id = self.id.wrapping_add(1);
476 id
477 }
478
479 pub fn clear_transactions(&mut self, tx_hashes: &[TxHash]) -> Vec<Arc<PoolTransaction<T>>> {
482 self.remove_with_markers(tx_hashes.to_vec(), None)
483 }
484
485 pub fn prune_tags(&mut self, marker: TxMarker) -> Vec<Arc<PoolTransaction<T>>> {
490 let mut removed_tx = vec![];
491
492 let mut remove = vec![marker];
494
495 while let Some(marker) = remove.pop() {
496 let res = self
497 .provided_markers
498 .remove(&marker)
499 .and_then(|hash| self.ready_tx.write().remove(&hash));
500
501 if let Some(tx) = res {
502 let unlocks = tx.unlocks;
503 self.independent_transactions.remove(&tx.transaction);
504 let tx = tx.transaction.transaction;
505
506 {
508 let hash = tx.hash();
509 let mut ready = self.ready_tx.write();
510
511 let mut previous_markers = |marker| -> Option<Vec<TxMarker>> {
512 let prev_hash = self.provided_markers.get(marker)?;
513 let tx2 = ready.get_mut(prev_hash)?;
514 if let Some(idx) = tx2.unlocks.iter().position(|i| i == &hash) {
516 tx2.unlocks.swap_remove(idx);
517 }
518 tx2.unlocks.is_empty().then(|| tx2.transaction.transaction.provides.clone())
519 };
520
521 for marker in &tx.requires {
523 if let Some(mut tags_to_remove) = previous_markers(marker) {
524 remove.append(&mut tags_to_remove);
525 }
526 }
527 }
528
529 for hash in unlocks {
531 if let Some(tx) = self.ready_tx.write().get_mut(&hash) {
532 tx.requires_offset += 1;
533 if tx.requires_offset == tx.transaction.transaction.requires.len() {
534 self.independent_transactions.insert(tx.transaction.clone());
535 }
536 }
537 }
538 let current_marker = ▮
540 for marker in &tx.provides {
541 let removed = self.provided_markers.remove(marker);
542 assert_eq!(
543 removed,
544 if current_marker == marker { None } else { Some(tx.hash()) },
545 "The pool contains exactly one transaction providing given tag; the removed transaction
546 claims to provide that tag, so it has to be mapped to it's hash; qed"
547 );
548 }
549 removed_tx.push(tx);
550 }
551 }
552
553 removed_tx
554 }
555
556 pub fn remove_with_markers(
559 &mut self,
560 mut tx_hashes: Vec<TxHash>,
561 marker_filter: Option<HashSet<TxMarker>>,
562 ) -> Vec<Arc<PoolTransaction<T>>> {
563 let mut removed = Vec::new();
564 let mut ready = self.ready_tx.write();
565
566 while let Some(hash) = tx_hashes.pop() {
567 if let Some(mut tx) = ready.remove(&hash) {
568 let invalidated = tx.transaction.transaction.provides.iter().filter(|mark| {
569 marker_filter.as_ref().map(|filter| !filter.contains(&**mark)).unwrap_or(true)
570 });
571
572 let mut removed_some_marks = false;
573 for mark in invalidated {
575 removed_some_marks = true;
576 self.provided_markers.remove(mark);
577 }
578
579 for mark in &tx.transaction.transaction.requires {
581 if let Some(provider_hash) = self.provided_markers.get(mark)
582 && let Some(provider_tx) = ready.get_mut(provider_hash)
583 && let Some(idx) = provider_tx.unlocks.iter().position(|i| i == &hash)
584 {
585 provider_tx.unlocks.swap_remove(idx);
586 }
587 }
588
589 self.independent_transactions.remove(&tx.transaction);
591
592 if removed_some_marks {
593 tx_hashes.append(&mut tx.unlocks);
595 }
596
597 removed.push(tx.transaction.transaction);
599 }
600 }
601
602 removed
603 }
604}
605
606impl<T: Transaction> ReadyTransactions<T> {
607 pub fn add_transaction(
614 &mut self,
615 tx: PendingPoolTransaction<T>,
616 ) -> Result<Vec<Arc<PoolTransaction<T>>>, PoolError> {
617 assert!(tx.is_ready(), "transaction must be ready",);
618 assert!(
619 !self.ready_tx.read().contains_key(&tx.transaction.hash()),
620 "transaction already included"
621 );
622
623 let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
624
625 let id = self.next_id();
626 let hash = tx.transaction.hash();
627
628 let mut independent = true;
629 let mut requires_offset = 0;
630 let mut ready = self.ready_tx.write();
631 for mark in &tx.transaction.requires {
633 if let Some(other) = self.provided_markers.get(mark) {
635 let tx = ready.get_mut(other).expect("hash included;");
636 tx.unlocks.push(hash);
637 independent = false;
639 } else {
640 requires_offset += 1;
641 }
642 }
643
644 for mark in tx.transaction.provides.iter().cloned() {
646 self.provided_markers.insert(mark, hash);
647 }
648
649 let transaction = PoolTransactionRef { id, transaction: tx.transaction };
650
651 if independent {
653 self.independent_transactions.insert(transaction.clone());
654 }
655
656 ready.insert(hash, ReadyTransaction { transaction, unlocks, requires_offset });
658
659 Ok(replaced_tx)
660 }
661
662 fn replaced_transactions(
664 &mut self,
665 tx: &PoolTransaction<T>,
666 ) -> Result<ReplacedTransactions<T>, PoolError> {
667 let remove_hashes: HashSet<_> =
669 tx.provides.iter().filter_map(|mark| self.provided_markers.get(mark)).collect();
670
671 if remove_hashes.is_empty() {
673 return Ok((Vec::new(), Vec::new()));
674 }
675
676 let mut unlocked_tx = Vec::new();
678 {
679 let ready = self.ready_tx.read();
682 for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(*hash)) {
683 if to_remove.provides() == tx.provides {
686 if tx.pending_transaction.transaction.max_fee_per_gas()
688 <= to_remove.max_fee_per_gas()
689 {
690 warn!(target: "txpool", "ready replacement transaction underpriced [{:?}]", tx.hash());
691 return Err(PoolError::ReplacementUnderpriced(tx.hash()));
692 }
693 trace!(target: "txpool", "replacing ready transaction [{:?}] with higher gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
694 }
695
696 unlocked_tx.extend(to_remove.unlocks.iter().copied())
697 }
698 }
699
700 let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
701
702 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
703 let removed_tx = self.remove_with_markers(remove_hashes, Some(new_provides));
704
705 Ok((removed_tx, unlocked_tx))
706 }
707}
708
709#[derive(Debug)]
711pub struct PoolTransactionRef<T> {
712 pub transaction: Arc<PoolTransaction<T>>,
714 pub id: u64,
716}
717
718impl<T> Clone for PoolTransactionRef<T> {
719 fn clone(&self) -> Self {
720 Self { transaction: Arc::clone(&self.transaction), id: self.id }
721 }
722}
723
724impl<T> Eq for PoolTransactionRef<T> {}
725
726impl<T> PartialEq<Self> for PoolTransactionRef<T> {
727 fn eq(&self, other: &Self) -> bool {
728 self.cmp(other) == Ordering::Equal
729 }
730}
731
732impl<T> PartialOrd<Self> for PoolTransactionRef<T> {
733 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
734 Some(self.cmp(other))
735 }
736}
737
738impl<T> Ord for PoolTransactionRef<T> {
739 fn cmp(&self, other: &Self) -> Ordering {
740 self.transaction
741 .priority
742 .cmp(&other.transaction.priority)
743 .then_with(|| other.id.cmp(&self.id))
744 }
745}
746
747#[derive(Debug)]
748pub struct ReadyTransaction<T> {
749 pub transaction: PoolTransactionRef<T>,
751 pub unlocks: Vec<TxHash>,
753 pub requires_offset: usize,
755}
756
757impl<T> Clone for ReadyTransaction<T> {
758 fn clone(&self) -> Self {
759 Self {
760 transaction: self.transaction.clone(),
761 unlocks: self.unlocks.clone(),
762 requires_offset: self.requires_offset,
763 }
764 }
765}
766
767impl<T> ReadyTransaction<T> {
768 pub fn provides(&self) -> &[TxMarker] {
769 &self.transaction.transaction.provides
770 }
771}
772
773impl<T: Transaction> ReadyTransaction<T> {
774 pub fn max_fee_per_gas(&self) -> u128 {
775 self.transaction.transaction.max_fee_per_gas()
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782
783 #[test]
784 fn can_id_txs() {
785 let addr = Address::random();
786 assert_eq!(to_marker(1, addr), to_marker(1, addr));
787 assert_ne!(to_marker(2, addr), to_marker(1, addr));
788 }
789}