1use crate::eth::{error::PoolError, util::hex_fmt_many};
2use alloy_network::AnyRpcTransaction;
3use alloy_primitives::{
4 map::{HashMap, HashSet},
5 Address, TxHash,
6};
7use anvil_core::eth::transaction::{PendingTransaction, TypedTransaction};
8use parking_lot::RwLock;
9use std::{cmp::Ordering, collections::BTreeSet, fmt, str::FromStr, sync::Arc, time::Instant};
10
11pub type TxMarker = Vec<u8>;
13
14pub fn to_marker(nonce: u64, from: Address) -> TxMarker {
16 let mut data = [0u8; 28];
17 data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
18 data[8..].copy_from_slice(&from.0[..]);
19 data.to_vec()
20}
21
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
26pub enum TransactionOrder {
27 Fifo,
32 #[default]
34 Fees,
35}
36
37impl TransactionOrder {
38 pub fn priority(&self, tx: &TypedTransaction) -> TransactionPriority {
40 match self {
41 Self::Fifo => TransactionPriority::default(),
42 Self::Fees => TransactionPriority(tx.gas_price()),
43 }
44 }
45}
46
47impl FromStr for TransactionOrder {
48 type Err = String;
49
50 fn from_str(s: &str) -> Result<Self, Self::Err> {
51 let s = s.to_lowercase();
52 let order = match s.as_str() {
53 "fees" => Self::Fees,
54 "fifo" => Self::Fifo,
55 _ => return Err(format!("Unknown TransactionOrder: `{s}`")),
56 };
57 Ok(order)
58 }
59}
60
61#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
66pub struct TransactionPriority(pub u128);
67
68#[derive(Clone, PartialEq, Eq)]
70pub struct PoolTransaction {
71 pub pending_transaction: PendingTransaction,
73 pub requires: Vec<TxMarker>,
75 pub provides: Vec<TxMarker>,
77 pub priority: TransactionPriority,
79}
80
81impl PoolTransaction {
84 pub fn new(transaction: PendingTransaction) -> Self {
85 Self {
86 pending_transaction: transaction,
87 requires: vec![],
88 provides: vec![],
89 priority: TransactionPriority(0),
90 }
91 }
92 pub fn hash(&self) -> TxHash {
94 *self.pending_transaction.hash()
95 }
96
97 pub fn gas_price(&self) -> u128 {
99 self.pending_transaction.transaction.gas_price()
100 }
101}
102
103impl fmt::Debug for PoolTransaction {
104 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(fmt, "Transaction {{ ")?;
106 write!(fmt, "hash: {:?}, ", &self.pending_transaction.hash())?;
107 write!(fmt, "requires: [{}], ", hex_fmt_many(self.requires.iter()))?;
108 write!(fmt, "provides: [{}], ", hex_fmt_many(self.provides.iter()))?;
109 write!(fmt, "raw tx: {:?}", &self.pending_transaction)?;
110 write!(fmt, "}}")?;
111 Ok(())
112 }
113}
114
115impl TryFrom<AnyRpcTransaction> for PoolTransaction {
116 type Error = eyre::Error;
117 fn try_from(value: AnyRpcTransaction) -> Result<Self, Self::Error> {
118 let typed_transaction = TypedTransaction::try_from(value)?;
119 let pending_transaction = PendingTransaction::new(typed_transaction)?;
120 Ok(Self {
121 pending_transaction,
122 requires: vec![],
123 provides: vec![],
124 priority: TransactionPriority(0),
125 })
126 }
127}
128#[derive(Clone, Debug, Default)]
132pub struct PendingTransactions {
133 required_markers: HashMap<TxMarker, HashSet<TxHash>>,
135 waiting_markers: HashMap<Vec<TxMarker>, TxHash>,
137 waiting_queue: HashMap<TxHash, PendingPoolTransaction>,
139}
140
141impl PendingTransactions {
144 pub fn len(&self) -> usize {
146 self.waiting_queue.len()
147 }
148
149 pub fn is_empty(&self) -> bool {
150 self.waiting_queue.is_empty()
151 }
152
153 pub fn clear(&mut self) {
155 self.required_markers.clear();
156 self.waiting_markers.clear();
157 self.waiting_queue.clear();
158 }
159
160 pub fn transactions(&self) -> impl Iterator<Item = Arc<PoolTransaction>> + '_ {
162 self.waiting_queue.values().map(|tx| tx.transaction.clone())
163 }
164
165 pub fn add_transaction(&mut self, tx: PendingPoolTransaction) -> Result<(), PoolError> {
167 assert!(!tx.is_ready(), "transaction must not be ready");
168 assert!(
169 !self.waiting_queue.contains_key(&tx.transaction.hash()),
170 "transaction is already added"
171 );
172
173 if let Some(replace) = self
174 .waiting_markers
175 .get(&tx.transaction.provides)
176 .and_then(|hash| self.waiting_queue.get(hash))
177 {
178 if tx.transaction.gas_price() < replace.transaction.gas_price() {
180 warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", tx.transaction.hash());
181 return Err(PoolError::ReplacementUnderpriced(Box::new(
182 tx.transaction.as_ref().clone(),
183 )))
184 }
185 }
186
187 for marker in &tx.missing_markers {
189 self.required_markers.entry(marker.clone()).or_default().insert(tx.transaction.hash());
190 }
191
192 self.waiting_markers.insert(tx.transaction.provides.clone(), tx.transaction.hash());
194 self.waiting_queue.insert(tx.transaction.hash(), tx);
196
197 Ok(())
198 }
199
200 pub fn contains(&self, hash: &TxHash) -> bool {
202 self.waiting_queue.contains_key(hash)
203 }
204
205 pub fn get(&self, hash: &TxHash) -> Option<&PendingPoolTransaction> {
207 self.waiting_queue.get(hash)
208 }
209
210 pub fn mark_and_unlock(
215 &mut self,
216 markers: impl IntoIterator<Item = impl AsRef<TxMarker>>,
217 ) -> Vec<PendingPoolTransaction> {
218 let mut unlocked_ready = Vec::new();
219 for mark in markers {
220 let mark = mark.as_ref();
221 if let Some(tx_hashes) = self.required_markers.remove(mark) {
222 for hash in tx_hashes {
223 let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
224 tx.mark(mark);
225
226 if tx.is_ready() {
227 let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
228 self.waiting_markers.remove(&tx.transaction.provides);
229
230 unlocked_ready.push(tx);
231 }
232 }
233 }
234 }
235
236 unlocked_ready
237 }
238
239 pub fn remove(&mut self, hashes: Vec<TxHash>) -> Vec<Arc<PoolTransaction>> {
243 let mut removed = vec![];
244 for hash in hashes {
245 if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
246 self.waiting_markers.remove(&waiting_tx.transaction.provides);
247 for marker in waiting_tx.missing_markers {
248 let remove = if let Some(required) = self.required_markers.get_mut(&marker) {
249 required.remove(&hash);
250 required.is_empty()
251 } else {
252 false
253 };
254 if remove {
255 self.required_markers.remove(&marker);
256 }
257 }
258 removed.push(waiting_tx.transaction)
259 }
260 }
261 removed
262 }
263}
264
265#[derive(Clone)]
267pub struct PendingPoolTransaction {
268 pub transaction: Arc<PoolTransaction>,
269 pub missing_markers: HashSet<TxMarker>,
271 pub added_at: Instant,
273}
274
275impl PendingPoolTransaction {
278 pub fn new(transaction: PoolTransaction, provided: &HashMap<TxMarker, TxHash>) -> Self {
283 let missing_markers = transaction
284 .requires
285 .iter()
286 .filter(|marker| {
287 !provided.contains_key(&**marker)
289 })
290 .cloned()
291 .collect();
292
293 Self { transaction: Arc::new(transaction), missing_markers, added_at: Instant::now() }
294 }
295
296 pub fn mark(&mut self, marker: &TxMarker) {
298 self.missing_markers.remove(marker);
299 }
300
301 pub fn is_ready(&self) -> bool {
303 self.missing_markers.is_empty()
304 }
305}
306
307impl fmt::Debug for PendingPoolTransaction {
308 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
309 write!(fmt, "PendingTransaction {{ ")?;
310 write!(fmt, "added_at: {:?}, ", self.added_at)?;
311 write!(fmt, "tx: {:?}, ", self.transaction)?;
312 write!(fmt, "missing_markers: {{{}}}", hex_fmt_many(self.missing_markers.iter()))?;
313 write!(fmt, "}}")
314 }
315}
316
317pub struct TransactionsIterator {
318 all: HashMap<TxHash, ReadyTransaction>,
319 awaiting: HashMap<TxHash, (usize, PoolTransactionRef)>,
320 independent: BTreeSet<PoolTransactionRef>,
321 _invalid: HashSet<TxHash>,
322}
323
324impl TransactionsIterator {
327 fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef) {
330 if satisfied >= tx_ref.transaction.requires.len() {
331 self.independent.insert(tx_ref);
333 } else {
334 self.awaiting.insert(tx_ref.transaction.hash(), (satisfied, tx_ref));
336 }
337 }
338}
339
340impl Iterator for TransactionsIterator {
341 type Item = Arc<PoolTransaction>;
342
343 fn next(&mut self) -> Option<Self::Item> {
344 loop {
345 let best = self.independent.iter().next_back()?.clone();
346 let best = self.independent.take(&best)?;
347 let hash = best.transaction.hash();
348
349 let ready =
350 if let Some(ready) = self.all.get(&hash).cloned() { ready } else { continue };
351
352 for hash in &ready.unlocks {
354 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
356 satisfied += 1;
357 Some((satisfied, tx_ref))
358 } else {
360 self.all
361 .get(hash)
362 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
363 };
364 if let Some((satisfied, tx_ref)) = res {
365 self.independent_or_awaiting(satisfied, tx_ref)
366 }
367 }
368
369 return Some(best.transaction)
370 }
371 }
372}
373
374#[derive(Clone, Debug, Default)]
376pub struct ReadyTransactions {
377 id: u64,
381 provided_markers: HashMap<TxMarker, TxHash>,
383 ready_tx: Arc<RwLock<HashMap<TxHash, ReadyTransaction>>>,
385 independent_transactions: BTreeSet<PoolTransactionRef>,
388}
389
390impl ReadyTransactions {
393 pub fn get_transactions(&self) -> TransactionsIterator {
395 TransactionsIterator {
396 all: self.ready_tx.read().clone(),
397 independent: self.independent_transactions.clone(),
398 awaiting: Default::default(),
399 _invalid: Default::default(),
400 }
401 }
402
403 pub fn clear(&mut self) {
405 self.provided_markers.clear();
406 self.ready_tx.write().clear();
407 self.independent_transactions.clear();
408 }
409
410 pub fn contains(&self, hash: &TxHash) -> bool {
412 self.ready_tx.read().contains_key(hash)
413 }
414
415 pub fn get(&self, hash: &TxHash) -> Option<ReadyTransaction> {
417 self.ready_tx.read().get(hash).cloned()
418 }
419
420 pub fn provided_markers(&self) -> &HashMap<TxMarker, TxHash> {
421 &self.provided_markers
422 }
423
424 fn next_id(&mut self) -> u64 {
425 let id = self.id;
426 self.id = self.id.wrapping_add(1);
427 id
428 }
429
430 pub fn add_transaction(
437 &mut self,
438 tx: PendingPoolTransaction,
439 ) -> Result<Vec<Arc<PoolTransaction>>, PoolError> {
440 assert!(tx.is_ready(), "transaction must be ready",);
441 assert!(
442 !self.ready_tx.read().contains_key(&tx.transaction.hash()),
443 "transaction already included"
444 );
445
446 let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
447
448 let id = self.next_id();
449 let hash = tx.transaction.hash();
450
451 let mut independent = true;
452 let mut requires_offset = 0;
453 let mut ready = self.ready_tx.write();
454 for mark in &tx.transaction.requires {
456 if let Some(other) = self.provided_markers.get(mark) {
458 let tx = ready.get_mut(other).expect("hash included;");
459 tx.unlocks.push(hash);
460 independent = false;
462 } else {
463 requires_offset += 1;
464 }
465 }
466
467 for mark in tx.transaction.provides.iter().cloned() {
469 self.provided_markers.insert(mark, hash);
470 }
471
472 let transaction = PoolTransactionRef { id, transaction: tx.transaction };
473
474 if independent {
476 self.independent_transactions.insert(transaction.clone());
477 }
478
479 ready.insert(hash, ReadyTransaction { transaction, unlocks, requires_offset });
481
482 Ok(replaced_tx)
483 }
484
485 fn replaced_transactions(
487 &mut self,
488 tx: &PoolTransaction,
489 ) -> Result<(Vec<Arc<PoolTransaction>>, Vec<TxHash>), PoolError> {
490 let remove_hashes: HashSet<_> =
492 tx.provides.iter().filter_map(|mark| self.provided_markers.get(mark)).collect();
493
494 if remove_hashes.is_empty() {
496 return Ok((Vec::new(), Vec::new()))
497 }
498
499 let mut unlocked_tx = Vec::new();
502 {
503 let ready = self.ready_tx.read();
506 for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(*hash)) {
507 if to_remove.provides() == tx.provides {
510 if tx.pending_transaction.transaction.gas_price() <= to_remove.gas_price() {
512 warn!(target: "txpool", "ready replacement transaction underpriced [{:?}]", tx.hash());
513 return Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone())))
514 } else {
515 trace!(target: "txpool", "replacing ready transaction [{:?}] with higher gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
516 }
517 }
518
519 unlocked_tx.extend(to_remove.unlocks.iter().cloned())
520 }
521 }
522
523 let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
524
525 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
526 let removed_tx = self.remove_with_markers(remove_hashes, Some(new_provides));
527
528 Ok((removed_tx, unlocked_tx))
529 }
530
531 pub fn clear_transactions(&mut self, tx_hashes: &[TxHash]) -> Vec<Arc<PoolTransaction>> {
534 self.remove_with_markers(tx_hashes.to_vec(), None)
535 }
536
537 pub fn prune_tags(&mut self, marker: TxMarker) -> Vec<Arc<PoolTransaction>> {
542 let mut removed_tx = vec![];
543
544 let mut remove = vec![marker];
546
547 while let Some(marker) = remove.pop() {
548 let res = self
549 .provided_markers
550 .remove(&marker)
551 .and_then(|hash| self.ready_tx.write().remove(&hash));
552
553 if let Some(tx) = res {
554 let unlocks = tx.unlocks;
555 self.independent_transactions.remove(&tx.transaction);
556 let tx = tx.transaction.transaction;
557
558 {
560 let hash = tx.hash();
561 let mut ready = self.ready_tx.write();
562
563 let mut previous_markers = |marker| -> Option<Vec<TxMarker>> {
564 let prev_hash = self.provided_markers.get(marker)?;
565 let tx2 = ready.get_mut(prev_hash)?;
566 if let Some(idx) = tx2.unlocks.iter().position(|i| i == &hash) {
568 tx2.unlocks.swap_remove(idx);
569 }
570 if tx2.unlocks.is_empty() {
571 Some(tx2.transaction.transaction.provides.clone())
572 } else {
573 None
574 }
575 };
576
577 for marker in &tx.requires {
579 if let Some(mut tags_to_remove) = previous_markers(marker) {
580 remove.append(&mut tags_to_remove);
581 }
582 }
583 }
584
585 for hash in unlocks {
587 if let Some(tx) = self.ready_tx.write().get_mut(&hash) {
588 tx.requires_offset += 1;
589 if tx.requires_offset == tx.transaction.transaction.requires.len() {
590 self.independent_transactions.insert(tx.transaction.clone());
591 }
592 }
593 }
594 let current_marker = ▮
596 for marker in &tx.provides {
597 let removed = self.provided_markers.remove(marker);
598 assert_eq!(
599 removed,
600 if current_marker == marker { None } else { Some(tx.hash()) },
601 "The pool contains exactly one transaction providing given tag; the removed transaction
602 claims to provide that tag, so it has to be mapped to it's hash; qed"
603 );
604 }
605 removed_tx.push(tx);
606 }
607 }
608
609 removed_tx
610 }
611
612 pub fn remove_with_markers(
615 &mut self,
616 mut tx_hashes: Vec<TxHash>,
617 marker_filter: Option<HashSet<TxMarker>>,
618 ) -> Vec<Arc<PoolTransaction>> {
619 let mut removed = Vec::new();
620 let mut ready = self.ready_tx.write();
621
622 while let Some(hash) = tx_hashes.pop() {
623 if let Some(mut tx) = ready.remove(&hash) {
624 let invalidated = tx.transaction.transaction.provides.iter().filter(|mark| {
625 marker_filter.as_ref().map(|filter| !filter.contains(&**mark)).unwrap_or(true)
626 });
627
628 let mut removed_some_marks = false;
629 for mark in invalidated {
631 removed_some_marks = true;
632 self.provided_markers.remove(mark);
633 }
634
635 for mark in &tx.transaction.transaction.requires {
637 if let Some(hash) = self.provided_markers.get(mark) {
638 if let Some(tx) = ready.get_mut(hash) {
639 if let Some(idx) = tx.unlocks.iter().position(|i| i == hash) {
640 tx.unlocks.swap_remove(idx);
641 }
642 }
643 }
644 }
645
646 self.independent_transactions.remove(&tx.transaction);
648
649 if removed_some_marks {
650 tx_hashes.append(&mut tx.unlocks);
652 }
653
654 removed.push(tx.transaction.transaction);
656 }
657 }
658
659 removed
660 }
661}
662
663#[derive(Clone, Debug)]
665pub struct PoolTransactionRef {
666 pub transaction: Arc<PoolTransaction>,
668 pub id: u64,
670}
671
672impl Eq for PoolTransactionRef {}
673
674impl PartialEq<Self> for PoolTransactionRef {
675 fn eq(&self, other: &Self) -> bool {
676 self.cmp(other) == Ordering::Equal
677 }
678}
679
680impl PartialOrd<Self> for PoolTransactionRef {
681 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
682 Some(self.cmp(other))
683 }
684}
685
686impl Ord for PoolTransactionRef {
687 fn cmp(&self, other: &Self) -> Ordering {
688 self.transaction
689 .priority
690 .cmp(&other.transaction.priority)
691 .then_with(|| other.id.cmp(&self.id))
692 }
693}
694
695#[derive(Clone, Debug)]
696pub struct ReadyTransaction {
697 pub transaction: PoolTransactionRef,
699 pub unlocks: Vec<TxHash>,
701 pub requires_offset: usize,
703}
704
705impl ReadyTransaction {
706 pub fn provides(&self) -> &[TxMarker] {
707 &self.transaction.transaction.provides
708 }
709
710 pub fn gas_price(&self) -> u128 {
711 self.transaction.transaction.gas_price()
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718
719 #[test]
720 fn can_id_txs() {
721 let addr = Address::random();
722 assert_eq!(to_marker(1, addr), to_marker(1, addr));
723 assert_ne!(to_marker(2, addr), to_marker(1, addr));
724 }
725}