1use crate::eth::{error::PoolError, util::hex_fmt_many};
2use alloy_network::AnyRpcTransaction;
3use alloy_primitives::{
4 map::{HashMap, HashSet},
5 Address, TxHash,
6};
7use anvil_core::eth::transaction::{PendingTransaction, TypedTransaction};
8use parking_lot::RwLock;
9use std::{cmp::Ordering, collections::BTreeSet, fmt, str::FromStr, sync::Arc, time::Instant};
10
11pub type TxMarker = Vec<u8>;
13
14pub fn to_marker(nonce: u64, from: Address) -> TxMarker {
16 let mut data = [0u8; 28];
17 data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
18 data[8..].copy_from_slice(&from.0[..]);
19 data.to_vec()
20}
21
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
26pub enum TransactionOrder {
27 Fifo,
32 #[default]
34 Fees,
35}
36
37impl TransactionOrder {
38 pub fn priority(&self, tx: &TypedTransaction) -> TransactionPriority {
40 match self {
41 Self::Fifo => TransactionPriority::default(),
42 Self::Fees => TransactionPriority(tx.gas_price()),
43 }
44 }
45}
46
47impl FromStr for TransactionOrder {
48 type Err = String;
49
50 fn from_str(s: &str) -> Result<Self, Self::Err> {
51 let s = s.to_lowercase();
52 let order = match s.as_str() {
53 "fees" => Self::Fees,
54 "fifo" => Self::Fifo,
55 _ => return Err(format!("Unknown TransactionOrder: `{s}`")),
56 };
57 Ok(order)
58 }
59}
60
61#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
66pub struct TransactionPriority(pub u128);
67
68#[derive(Clone, PartialEq, Eq)]
70pub struct PoolTransaction {
71 pub pending_transaction: PendingTransaction,
73 pub requires: Vec<TxMarker>,
75 pub provides: Vec<TxMarker>,
77 pub priority: TransactionPriority,
79}
80
81impl PoolTransaction {
84 pub fn new(transaction: PendingTransaction) -> Self {
85 Self {
86 pending_transaction: transaction,
87 requires: vec![],
88 provides: vec![],
89 priority: TransactionPriority(0),
90 }
91 }
92 pub fn hash(&self) -> TxHash {
94 *self.pending_transaction.hash()
95 }
96
97 pub fn gas_price(&self) -> u128 {
99 self.pending_transaction.transaction.gas_price()
100 }
101
102 pub fn tx_type(&self) -> u8 {
104 self.pending_transaction.transaction.r#type().unwrap_or_default()
105 }
106}
107
108impl fmt::Debug for PoolTransaction {
109 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
110 write!(fmt, "Transaction {{ ")?;
111 write!(fmt, "hash: {:?}, ", &self.pending_transaction.hash())?;
112 write!(fmt, "requires: [{}], ", hex_fmt_many(self.requires.iter()))?;
113 write!(fmt, "provides: [{}], ", hex_fmt_many(self.provides.iter()))?;
114 write!(fmt, "raw tx: {:?}", &self.pending_transaction)?;
115 write!(fmt, "}}")?;
116 Ok(())
117 }
118}
119
120impl TryFrom<AnyRpcTransaction> for PoolTransaction {
121 type Error = eyre::Error;
122 fn try_from(value: AnyRpcTransaction) -> Result<Self, Self::Error> {
123 let typed_transaction = TypedTransaction::try_from(value)?;
124 let pending_transaction = PendingTransaction::new(typed_transaction)?;
125 Ok(Self {
126 pending_transaction,
127 requires: vec![],
128 provides: vec![],
129 priority: TransactionPriority(0),
130 })
131 }
132}
133#[derive(Clone, Debug, Default)]
137pub struct PendingTransactions {
138 required_markers: HashMap<TxMarker, HashSet<TxHash>>,
140 waiting_markers: HashMap<Vec<TxMarker>, TxHash>,
142 waiting_queue: HashMap<TxHash, PendingPoolTransaction>,
144}
145
146impl PendingTransactions {
149 pub fn len(&self) -> usize {
151 self.waiting_queue.len()
152 }
153
154 pub fn is_empty(&self) -> bool {
155 self.waiting_queue.is_empty()
156 }
157
158 pub fn clear(&mut self) {
160 self.required_markers.clear();
161 self.waiting_markers.clear();
162 self.waiting_queue.clear();
163 }
164
165 pub fn transactions(&self) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
167 self.waiting_queue.values().map(|tx| tx.transaction.clone())
168 }
169
170 pub fn add_transaction(&mut self, tx: PendingPoolTransaction) -> Result<(), PoolError> {
172 assert!(!tx.is_ready(), "transaction must not be ready");
173 assert!(
174 !self.waiting_queue.contains_key(&tx.transaction.hash()),
175 "transaction is already added"
176 );
177
178 if let Some(replace) = self
179 .waiting_markers
180 .get(&tx.transaction.provides)
181 .and_then(|hash| self.waiting_queue.get(hash))
182 {
183 if tx.transaction.gas_price() < replace.transaction.gas_price() {
185 warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", tx.transaction.hash());
186 return Err(PoolError::ReplacementUnderpriced(Box::new(
187 tx.transaction.as_ref().clone(),
188 )))
189 }
190 }
191
192 for marker in &tx.missing_markers {
194 self.required_markers.entry(marker.clone()).or_default().insert(tx.transaction.hash());
195 }
196
197 self.waiting_markers.insert(tx.transaction.provides.clone(), tx.transaction.hash());
199 self.waiting_queue.insert(tx.transaction.hash(), tx);
201
202 Ok(())
203 }
204
205 pub fn contains(&self, hash: &TxHash) -> bool {
207 self.waiting_queue.contains_key(hash)
208 }
209
210 pub fn get(&self, hash: &TxHash) -> Option<&PendingPoolTransaction> {
212 self.waiting_queue.get(hash)
213 }
214
215 pub fn mark_and_unlock(
220 &mut self,
221 markers: impl IntoIterator<Item = impl AsRef<TxMarker>>,
222 ) -> Vec<PendingPoolTransaction> {
223 let mut unlocked_ready = Vec::new();
224 for mark in markers {
225 let mark = mark.as_ref();
226 if let Some(tx_hashes) = self.required_markers.remove(mark) {
227 for hash in tx_hashes {
228 let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
229 tx.mark(mark);
230
231 if tx.is_ready() {
232 let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
233 self.waiting_markers.remove(&tx.transaction.provides);
234
235 unlocked_ready.push(tx);
236 }
237 }
238 }
239 }
240
241 unlocked_ready
242 }
243
244 pub fn remove(&mut self, hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
248 let mut removed = vec![];
249 for hash in hashes {
250 if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
251 self.waiting_markers.remove(&waiting_tx.transaction.provides);
252 for marker in waiting_tx.missing_markers {
253 let remove = if let Some(required) = self.required_markers.get_mut(&marker) {
254 required.remove(&hash);
255 required.is_empty()
256 } else {
257 false
258 };
259 if remove {
260 self.required_markers.remove(&marker);
261 }
262 }
263 removed.push(waiting_tx.transaction)
264 }
265 }
266 removed
267 }
268}
269
270#[derive(Clone)]
272pub struct PendingPoolTransaction {
273 pub transaction: Arc<PoolTransaction>,
274 pub missing_markers: HashSet<TxMarker>,
276 pub added_at: Instant,
278}
279
280impl PendingPoolTransaction {
283 pub fn new(transaction: PoolTransaction, provided: &HashMap<TxMarker, TxHash>) -> Self {
288 let missing_markers = transaction
289 .requires
290 .iter()
291 .filter(|marker| {
292 !provided.contains_key(&**marker)
294 })
295 .cloned()
296 .collect();
297
298 Self { transaction: Arc::new(transaction), missing_markers, added_at: Instant::now() }
299 }
300
301 pub fn mark(&mut self, marker: &TxMarker) {
303 self.missing_markers.remove(marker);
304 }
305
306 pub fn is_ready(&self) -> bool {
308 self.missing_markers.is_empty()
309 }
310}
311
312impl fmt::Debug for PendingPoolTransaction {
313 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
314 write!(fmt, "PendingTransaction {{ ")?;
315 write!(fmt, "added_at: {:?}, ", self.added_at)?;
316 write!(fmt, "tx: {:?}, ", self.transaction)?;
317 write!(fmt, "missing_markers: {{{}}}", hex_fmt_many(self.missing_markers.iter()))?;
318 write!(fmt, "}}")
319 }
320}
321
322pub struct TransactionsIterator {
323 all: HashMap<TxHash, ReadyTransaction>,
324 awaiting: HashMap<TxHash, (usize, PoolTransactionRef)>,
325 independent: BTreeSet<PoolTransactionRef>,
326 _invalid: HashSet<TxHash>,
327}
328
329impl TransactionsIterator {
332 fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef) {
335 if satisfied >= tx_ref.transaction.requires.len() {
336 self.independent.insert(tx_ref);
338 } else {
339 self.awaiting.insert(tx_ref.transaction.hash(), (satisfied, tx_ref));
341 }
342 }
343}
344
345impl Iterator for TransactionsIterator {
346 type Item = Arc<PoolTransaction>;
347
348 fn next(&mut self) -> Option<Self::Item> {
349 loop {
350 let best = self.independent.iter().next_back()?.clone();
351 let best = self.independent.take(&best)?;
352 let hash = best.transaction.hash();
353
354 let ready =
355 if let Some(ready) = self.all.get(&hash).cloned() { ready } else { continue };
356
357 for hash in &ready.unlocks {
359 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
361 satisfied += 1;
362 Some((satisfied, tx_ref))
363 } else {
365 self.all
366 .get(hash)
367 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
368 };
369 if let Some((satisfied, tx_ref)) = res {
370 self.independent_or_awaiting(satisfied, tx_ref)
371 }
372 }
373
374 return Some(best.transaction)
375 }
376 }
377}
378
379#[derive(Clone, Debug, Default)]
381pub struct ReadyTransactions {
382 id: u64,
386 provided_markers: HashMap<TxMarker, TxHash>,
388 ready_tx: Arc<RwLock<HashMap<TxHash, ReadyTransaction>>>,
390 independent_transactions: BTreeSet<PoolTransactionRef>,
393}
394
395impl ReadyTransactions {
398 pub fn get_transactions(&self) -> TransactionsIterator {
400 TransactionsIterator {
401 all: self.ready_tx.read().clone(),
402 independent: self.independent_transactions.clone(),
403 awaiting: Default::default(),
404 _invalid: Default::default(),
405 }
406 }
407
408 pub fn clear(&mut self) {
410 self.provided_markers.clear();
411 self.ready_tx.write().clear();
412 self.independent_transactions.clear();
413 }
414
415 pub fn contains(&self, hash: &TxHash) -> bool {
417 self.ready_tx.read().contains_key(hash)
418 }
419
420 pub fn get(&self, hash: &TxHash) -> Option<ReadyTransaction> {
422 self.ready_tx.read().get(hash).cloned()
423 }
424
425 pub fn provided_markers(&self) -> &HashMap<TxMarker, TxHash> {
426 &self.provided_markers
427 }
428
429 fn next_id(&mut self) -> u64 {
430 let id = self.id;
431 self.id = self.id.wrapping_add(1);
432 id
433 }
434
435 pub fn add_transaction(
442 &mut self,
443 tx: PendingPoolTransaction,
444 ) -> Result<Vec<Arc<PoolTransaction>>, PoolError> {
445 assert!(tx.is_ready(), "transaction must be ready",);
446 assert!(
447 !self.ready_tx.read().contains_key(&tx.transaction.hash()),
448 "transaction already included"
449 );
450
451 let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
452
453 let id = self.next_id();
454 let hash = tx.transaction.hash();
455
456 let mut independent = true;
457 let mut requires_offset = 0;
458 let mut ready = self.ready_tx.write();
459 for mark in &tx.transaction.requires {
461 if let Some(other) = self.provided_markers.get(mark) {
463 let tx = ready.get_mut(other).expect("hash included;");
464 tx.unlocks.push(hash);
465 independent = false;
467 } else {
468 requires_offset += 1;
469 }
470 }
471
472 for mark in tx.transaction.provides.iter().cloned() {
474 self.provided_markers.insert(mark, hash);
475 }
476
477 let transaction = PoolTransactionRef { id, transaction: tx.transaction };
478
479 if independent {
481 self.independent_transactions.insert(transaction.clone());
482 }
483
484 ready.insert(hash, ReadyTransaction { transaction, unlocks, requires_offset });
486
487 Ok(replaced_tx)
488 }
489
490 fn replaced_transactions(
492 &mut self,
493 tx: &PoolTransaction,
494 ) -> Result<(Vec<Arc<PoolTransaction>>, Vec<TxHash>), PoolError> {
495 let remove_hashes: HashSet<_> =
497 tx.provides.iter().filter_map(|mark| self.provided_markers.get(mark)).collect();
498
499 if remove_hashes.is_empty() {
501 return Ok((Vec::new(), Vec::new()))
502 }
503
504 let mut unlocked_tx = Vec::new();
507 {
508 let ready = self.ready_tx.read();
511 for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(*hash)) {
512 if to_remove.provides() == tx.provides {
515 if tx.pending_transaction.transaction.gas_price() <= to_remove.gas_price() {
517 warn!(target: "txpool", "ready replacement transaction underpriced [{:?}]", tx.hash());
518 return Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone())))
519 } else {
520 trace!(target: "txpool", "replacing ready transaction [{:?}] with higher gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
521 }
522 }
523
524 unlocked_tx.extend(to_remove.unlocks.iter().copied())
525 }
526 }
527
528 let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
529
530 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
531 let removed_tx = self.remove_with_markers(remove_hashes, Some(new_provides));
532
533 Ok((removed_tx, unlocked_tx))
534 }
535
536 pub fn clear_transactions(&mut self, tx_hashes: &[TxHash]) -> Vec<Arc<PoolTransaction>> {
539 self.remove_with_markers(tx_hashes.to_vec(), None)
540 }
541
542 pub fn prune_tags(&mut self, marker: TxMarker) -> Vec<Arc<PoolTransaction>> {
547 let mut removed_tx = vec![];
548
549 let mut remove = vec![marker];
551
552 while let Some(marker) = remove.pop() {
553 let res = self
554 .provided_markers
555 .remove(&marker)
556 .and_then(|hash| self.ready_tx.write().remove(&hash));
557
558 if let Some(tx) = res {
559 let unlocks = tx.unlocks;
560 self.independent_transactions.remove(&tx.transaction);
561 let tx = tx.transaction.transaction;
562
563 {
565 let hash = tx.hash();
566 let mut ready = self.ready_tx.write();
567
568 let mut previous_markers = |marker| -> Option<Vec<TxMarker>> {
569 let prev_hash = self.provided_markers.get(marker)?;
570 let tx2 = ready.get_mut(prev_hash)?;
571 if let Some(idx) = tx2.unlocks.iter().position(|i| i == &hash) {
573 tx2.unlocks.swap_remove(idx);
574 }
575 if tx2.unlocks.is_empty() {
576 Some(tx2.transaction.transaction.provides.clone())
577 } else {
578 None
579 }
580 };
581
582 for marker in &tx.requires {
584 if let Some(mut tags_to_remove) = previous_markers(marker) {
585 remove.append(&mut tags_to_remove);
586 }
587 }
588 }
589
590 for hash in unlocks {
592 if let Some(tx) = self.ready_tx.write().get_mut(&hash) {
593 tx.requires_offset += 1;
594 if tx.requires_offset == tx.transaction.transaction.requires.len() {
595 self.independent_transactions.insert(tx.transaction.clone());
596 }
597 }
598 }
599 let current_marker = ▮
601 for marker in &tx.provides {
602 let removed = self.provided_markers.remove(marker);
603 assert_eq!(
604 removed,
605 if current_marker == marker { None } else { Some(tx.hash()) },
606 "The pool contains exactly one transaction providing given tag; the removed transaction
607 claims to provide that tag, so it has to be mapped to it's hash; qed"
608 );
609 }
610 removed_tx.push(tx);
611 }
612 }
613
614 removed_tx
615 }
616
617 pub fn remove_with_markers(
620 &mut self,
621 mut tx_hashes: Vec<TxHash>,
622 marker_filter: Option<HashSet<TxMarker>>,
623 ) -> Vec<Arc<PoolTransaction>> {
624 let mut removed = Vec::new();
625 let mut ready = self.ready_tx.write();
626
627 while let Some(hash) = tx_hashes.pop() {
628 if let Some(mut tx) = ready.remove(&hash) {
629 let invalidated = tx.transaction.transaction.provides.iter().filter(|mark| {
630 marker_filter.as_ref().map(|filter| !filter.contains(&**mark)).unwrap_or(true)
631 });
632
633 let mut removed_some_marks = false;
634 for mark in invalidated {
636 removed_some_marks = true;
637 self.provided_markers.remove(mark);
638 }
639
640 for mark in &tx.transaction.transaction.requires {
642 if let Some(hash) = self.provided_markers.get(mark) {
643 if let Some(tx) = ready.get_mut(hash) {
644 if let Some(idx) = tx.unlocks.iter().position(|i| i == hash) {
645 tx.unlocks.swap_remove(idx);
646 }
647 }
648 }
649 }
650
651 self.independent_transactions.remove(&tx.transaction);
653
654 if removed_some_marks {
655 tx_hashes.append(&mut tx.unlocks);
657 }
658
659 removed.push(tx.transaction.transaction);
661 }
662 }
663
664 removed
665 }
666}
667
668#[derive(Clone, Debug)]
670pub struct PoolTransactionRef {
671 pub transaction: Arc<PoolTransaction>,
673 pub id: u64,
675}
676
677impl Eq for PoolTransactionRef {}
678
679impl PartialEq<Self> for PoolTransactionRef {
680 fn eq(&self, other: &Self) -> bool {
681 self.cmp(other) == Ordering::Equal
682 }
683}
684
685impl PartialOrd<Self> for PoolTransactionRef {
686 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
687 Some(self.cmp(other))
688 }
689}
690
691impl Ord for PoolTransactionRef {
692 fn cmp(&self, other: &Self) -> Ordering {
693 self.transaction
694 .priority
695 .cmp(&other.transaction.priority)
696 .then_with(|| other.id.cmp(&self.id))
697 }
698}
699
700#[derive(Clone, Debug)]
701pub struct ReadyTransaction {
702 pub transaction: PoolTransactionRef,
704 pub unlocks: Vec<TxHash>,
706 pub requires_offset: usize,
708}
709
710impl ReadyTransaction {
711 pub fn provides(&self) -> &[TxMarker] {
712 &self.transaction.transaction.provides
713 }
714
715 pub fn gas_price(&self) -> u128 {
716 self.transaction.transaction.gas_price()
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723
724 #[test]
725 fn can_id_txs() {
726 let addr = Address::random();
727 assert_eq!(to_marker(1, addr), to_marker(1, addr));
728 assert_ne!(to_marker(2, addr), to_marker(1, addr));
729 }
730}