Skip to main content

anvil/eth/
miner.rs

1//! Mines transactions
2
3use crate::eth::pool::{Pool, transactions::PoolTransaction};
4use alloy_primitives::TxHash;
5use futures::{
6    channel::mpsc::Receiver,
7    stream::{Fuse, StreamExt},
8    task::AtomicWaker,
9};
10use parking_lot::{RawRwLock, RwLock, lock_api::RwLockWriteGuard};
11use std::{
12    fmt,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16    time::Duration,
17};
18use tokio::time::{Interval, MissedTickBehavior, Sleep};
19
20/// Window for grouping concurrently-submitted transactions into one instant-mined block.
21/// Scoped to batch/in-process concurrency; not a guarantee for independent external clients.
22const INSTANT_COALESCE_WINDOW: Duration = Duration::from_millis(5);
23
24pub struct Miner<T> {
25    /// The mode this miner currently operates in
26    mode: Arc<RwLock<MiningMode>>,
27    /// used for task wake up when the mining mode was forcefully changed
28    ///
29    /// This will register the task so we can manually wake it up if the mining mode was changed
30    inner: Arc<MinerInner>,
31    /// Transactions included into the pool before any others are.
32    /// Done once on startup.
33    force_transactions: Option<Vec<Arc<PoolTransaction<T>>>>,
34}
35
36impl<T> Clone for Miner<T> {
37    fn clone(&self) -> Self {
38        Self {
39            mode: self.mode.clone(),
40            inner: self.inner.clone(),
41            force_transactions: self.force_transactions.clone(),
42        }
43    }
44}
45
46impl<T> fmt::Debug for Miner<T> {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.debug_struct("Miner")
49            .field("mode", &self.mode)
50            .field("force_transactions", &self.force_transactions.as_ref().map(|txs| txs.len()))
51            .finish_non_exhaustive()
52    }
53}
54
55impl<T> Miner<T> {
56    /// Returns a new miner with that operates in the given `mode`.
57    pub fn new(mode: MiningMode) -> Self {
58        Self {
59            mode: Arc::new(RwLock::new(mode)),
60            inner: Default::default(),
61            force_transactions: None,
62        }
63    }
64
65    /// Provide transactions that will cause a block to be mined with transactions
66    /// as soon as the miner is polled.
67    /// Providing an empty list of transactions will cause the miner to mine an empty block assuming
68    /// there are not other transactions in the pool.
69    pub fn with_forced_transactions(
70        mut self,
71        force_transactions: Option<Vec<PoolTransaction<T>>>,
72    ) -> Self {
73        self.force_transactions =
74            force_transactions.map(|tx| tx.into_iter().map(Arc::new).collect());
75        self
76    }
77
78    /// Returns the write lock of the mining mode
79    pub fn mode_write(&self) -> RwLockWriteGuard<'_, RawRwLock, MiningMode> {
80        self.mode.write()
81    }
82
83    /// Returns `true` if auto mining is enabled
84    pub fn is_auto_mine(&self) -> bool {
85        let mode = self.mode.read();
86        matches!(*mode, MiningMode::Auto(_))
87    }
88
89    pub fn get_interval(&self) -> Option<u64> {
90        let mode = self.mode.read();
91        if let MiningMode::FixedBlockTime(ref mm) = *mode {
92            return Some(mm.interval.period().as_secs());
93        }
94        None
95    }
96
97    /// Sets the mining mode to operate in
98    pub fn set_mining_mode(&self, mode: MiningMode) {
99        let new_mode = format!("{mode:?}");
100        let mode = std::mem::replace(&mut *self.mode_write(), mode);
101        trace!(target: "miner", "updated mining mode from {:?} to {}", mode, new_mode);
102        self.inner.wake();
103    }
104
105    /// polls the [Pool] and returns those transactions that should be put in a block according to
106    /// the current mode.
107    ///
108    /// May return an empty list, if no transactions are ready.
109    pub fn poll(
110        &mut self,
111        pool: &Arc<Pool<T>>,
112        cx: &mut Context<'_>,
113    ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
114        self.inner.register(cx);
115        if let Some(mut transactions) = self.force_transactions.take() {
116            if let Poll::Ready(next) = self.mode.write().poll(pool, cx) {
117                transactions.extend(next);
118            }
119            return Poll::Ready(transactions);
120        }
121        self.mode.write().poll(pool, cx)
122    }
123}
124
125/// A Mining mode that does nothing
126#[derive(Debug)]
127pub struct MinerInner {
128    waker: AtomicWaker,
129}
130
131impl MinerInner {
132    /// Call the waker again
133    fn wake(&self) {
134        self.waker.wake();
135    }
136
137    fn register(&self, cx: &Context<'_>) {
138        self.waker.register(cx.waker());
139    }
140}
141
142impl Default for MinerInner {
143    fn default() -> Self {
144        Self { waker: AtomicWaker::new() }
145    }
146}
147
148/// Mode of operations for the `Miner`
149#[derive(Debug)]
150pub enum MiningMode {
151    /// A miner that does nothing
152    None,
153    /// A miner that listens for new transactions that are ready.
154    ///
155    /// Either one transaction will be mined per block, or any number of transactions will be
156    /// allowed
157    Auto(ReadyTransactionMiner),
158    /// A miner that constructs a new block every `interval` tick
159    FixedBlockTime(FixedBlockTimeMiner),
160
161    /// A miner that uses both Auto and FixedBlockTime
162    Mixed(ReadyTransactionMiner, FixedBlockTimeMiner),
163}
164
165impl MiningMode {
166    pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
167        Self::Auto(ReadyTransactionMiner {
168            max_transactions,
169            has_pending_txs: None,
170            rx: listener.fuse(),
171            coalesce: None,
172        })
173    }
174
175    pub fn interval(duration: Duration) -> Self {
176        Self::FixedBlockTime(FixedBlockTimeMiner::new(duration))
177    }
178
179    pub fn mixed(max_transactions: usize, listener: Receiver<TxHash>, duration: Duration) -> Self {
180        Self::Mixed(
181            ReadyTransactionMiner {
182                max_transactions,
183                has_pending_txs: None,
184                rx: listener.fuse(),
185                coalesce: None,
186            },
187            FixedBlockTimeMiner::new(duration),
188        )
189    }
190
191    /// polls the [Pool] and returns those transactions that should be put in a block, if any.
192    pub fn poll<T>(
193        &mut self,
194        pool: &Arc<Pool<T>>,
195        cx: &mut Context<'_>,
196    ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
197        match self {
198            Self::None => Poll::Pending,
199            Self::Auto(miner) => miner.poll(pool, cx),
200            Self::FixedBlockTime(miner) => miner.poll(pool, cx),
201            Self::Mixed(auto, fixed) => {
202                let auto_txs = auto.poll(pool, cx);
203                let fixed_txs = fixed.poll(pool, cx);
204
205                match (auto_txs, fixed_txs) {
206                    // Both auto and fixed transactions are ready, combine them
207                    (Poll::Ready(mut auto_txs), Poll::Ready(fixed_txs)) => {
208                        for tx in fixed_txs {
209                            // filter unique transactions
210                            if auto_txs.iter().any(|auto_tx| auto_tx.hash() == tx.hash()) {
211                                continue;
212                            }
213                            auto_txs.push(tx);
214                        }
215                        Poll::Ready(auto_txs)
216                    }
217                    // Only auto transactions are ready, return them
218                    (Poll::Ready(auto_txs), Poll::Pending) => Poll::Ready(auto_txs),
219                    // Only fixed transactions are ready or both are pending,
220                    // return fixed transactions or pending status
221                    (Poll::Pending, fixed_txs) => fixed_txs,
222                }
223            }
224        }
225    }
226}
227
228/// A miner that's supposed to create a new block every `interval`, mining all transactions that are
229/// ready at that time.
230///
231/// The default blocktime is set to 6 seconds
232#[derive(Debug)]
233pub struct FixedBlockTimeMiner {
234    /// The interval this fixed block time miner operates with
235    interval: Interval,
236}
237
238impl FixedBlockTimeMiner {
239    /// Creates a new instance with an interval of `duration`
240    pub fn new(duration: Duration) -> Self {
241        let start = tokio::time::Instant::now() + duration;
242        let mut interval = tokio::time::interval_at(start, duration);
243        // we use delay here, to ensure ticks are not shortened and to tick at multiples of interval
244        // from when tick was called rather than from start
245        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
246        Self { interval }
247    }
248
249    fn poll<T>(
250        &mut self,
251        pool: &Arc<Pool<T>>,
252        cx: &mut Context<'_>,
253    ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
254        if self.interval.poll_tick(cx).is_ready() {
255            // drain the pool
256            return Poll::Ready(pool.ready_transactions().collect());
257        }
258        Poll::Pending
259    }
260}
261
262impl Default for FixedBlockTimeMiner {
263    fn default() -> Self {
264        Self::new(Duration::from_secs(6))
265    }
266}
267
268/// A miner that Listens for new ready transactions
269pub struct ReadyTransactionMiner {
270    /// how many transactions to mine per block
271    max_transactions: usize,
272    /// stores whether there are pending transactions (if known)
273    has_pending_txs: Option<bool>,
274    /// Receives hashes of transactions that are ready
275    rx: Fuse<Receiver<TxHash>>,
276    /// Active [`INSTANT_COALESCE_WINDOW`] timer; while pending, ready txs are accumulated.
277    coalesce: Option<Pin<Box<Sleep>>>,
278}
279
280impl ReadyTransactionMiner {
281    fn poll<T>(
282        &mut self,
283        pool: &Arc<Pool<T>>,
284        cx: &mut Context<'_>,
285    ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
286        // always drain the notification stream so that we're woken up as soon as there's a new tx
287        let mut saw_new_ready = false;
288        while let Poll::Ready(Some(_hash)) = self.rx.poll_next_unpin(cx) {
289            saw_new_ready = true;
290        }
291
292        // Arm the coalescing window only on fresh notifications to avoid delaying
293        // consecutive chunks when draining a backlog larger than `max_transactions`.
294        if saw_new_ready {
295            self.has_pending_txs = Some(true);
296            if self.coalesce.is_none() {
297                self.coalesce = Some(Box::pin(tokio::time::sleep(INSTANT_COALESCE_WINDOW)));
298            }
299        }
300
301        if self.has_pending_txs == Some(false) {
302            return Poll::Pending;
303        }
304
305        if let Some(sleep) = self.coalesce.as_mut()
306            && sleep.as_mut().poll(cx).is_pending()
307        {
308            return Poll::Pending;
309        }
310        self.coalesce = None;
311
312        let transactions =
313            pool.ready_transactions().take(self.max_transactions).collect::<Vec<_>>();
314
315        // there are pending transactions if we didn't drain the pool
316        self.has_pending_txs = Some(transactions.len() >= self.max_transactions);
317
318        if transactions.is_empty() {
319            self.has_pending_txs = Some(false);
320            return Poll::Pending;
321        }
322
323        Poll::Ready(transactions)
324    }
325}
326
327impl fmt::Debug for ReadyTransactionMiner {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        f.debug_struct("ReadyTransactionMiner")
330            .field("max_transactions", &self.max_transactions)
331            .finish_non_exhaustive()
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use alloy_primitives::{Address, hex};
339    use alloy_rlp::Decodable;
340    use anvil_core::eth::transaction::PendingTransaction;
341    use foundry_primitives::FoundryTxEnvelope;
342    use futures::task::noop_waker;
343
344    fn forced_tx() -> PoolTransaction<FoundryTxEnvelope> {
345        let raw = hex::decode("f86b02843b9aca00830186a094d3e8763675e4c425df46cc3b5c0f6cbdac39604687038d7ea4c68000802ba00eb96ca19e8a77102767a41fc85a36afd5c61ccb09911cec5d3e86e193d9c5aea03a456401896b1b6055311536bf00a718568c744d8c1f9df59879e8350220ca18").unwrap();
346        let tx = FoundryTxEnvelope::decode(&mut &raw[..]).unwrap();
347        let sender: Address = "0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5".parse().unwrap();
348        let pending = PendingTransaction::with_impersonated(tx, sender);
349        PoolTransaction::new(pending)
350    }
351
352    #[test]
353    fn poll_consumes_forced_transactions_before_mode_is_ready() {
354        let forced = forced_tx();
355        let forced_hash = forced.hash();
356
357        let pool = Arc::new(Pool::default());
358        let mut miner = Miner::new(MiningMode::None).with_forced_transactions(Some(vec![forced]));
359
360        let waker = noop_waker();
361        let mut cx = Context::from_waker(&waker);
362
363        let polled = miner.poll(&pool, &mut cx);
364        let txs = match polled {
365            Poll::Ready(txs) => txs,
366            Poll::Pending => panic!("expected forced transactions to be returned immediately"),
367        };
368        assert_eq!(txs.len(), 1);
369        assert_eq!(txs[0].hash(), forced_hash);
370
371        // Forced transactions are consumed exactly once.
372        assert!(miner.poll(&pool, &mut cx).is_pending());
373    }
374}