1use crate::eth::pool::{Pool, transactions::PoolTransaction};
4use alloy_primitives::TxHash;
5use futures::{
6 channel::mpsc::Receiver,
7 stream::{Fuse, StreamExt},
8 task::AtomicWaker,
9};
10use parking_lot::{RawRwLock, RwLock, lock_api::RwLockWriteGuard};
11use std::{
12 fmt,
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16 time::Duration,
17};
18use tokio::time::{Interval, MissedTickBehavior, Sleep};
19
20const INSTANT_COALESCE_WINDOW: Duration = Duration::from_millis(5);
23
24pub struct Miner<T> {
25 mode: Arc<RwLock<MiningMode>>,
27 inner: Arc<MinerInner>,
31 force_transactions: Option<Vec<Arc<PoolTransaction<T>>>>,
34}
35
36impl<T> Clone for Miner<T> {
37 fn clone(&self) -> Self {
38 Self {
39 mode: self.mode.clone(),
40 inner: self.inner.clone(),
41 force_transactions: self.force_transactions.clone(),
42 }
43 }
44}
45
46impl<T> fmt::Debug for Miner<T> {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 f.debug_struct("Miner")
49 .field("mode", &self.mode)
50 .field("force_transactions", &self.force_transactions.as_ref().map(|txs| txs.len()))
51 .finish_non_exhaustive()
52 }
53}
54
55impl<T> Miner<T> {
56 pub fn new(mode: MiningMode) -> Self {
58 Self {
59 mode: Arc::new(RwLock::new(mode)),
60 inner: Default::default(),
61 force_transactions: None,
62 }
63 }
64
65 pub fn with_forced_transactions(
70 mut self,
71 force_transactions: Option<Vec<PoolTransaction<T>>>,
72 ) -> Self {
73 self.force_transactions =
74 force_transactions.map(|tx| tx.into_iter().map(Arc::new).collect());
75 self
76 }
77
78 pub fn mode_write(&self) -> RwLockWriteGuard<'_, RawRwLock, MiningMode> {
80 self.mode.write()
81 }
82
83 pub fn is_auto_mine(&self) -> bool {
85 let mode = self.mode.read();
86 matches!(*mode, MiningMode::Auto(_))
87 }
88
89 pub fn get_interval(&self) -> Option<u64> {
90 let mode = self.mode.read();
91 if let MiningMode::FixedBlockTime(ref mm) = *mode {
92 return Some(mm.interval.period().as_secs());
93 }
94 None
95 }
96
97 pub fn set_mining_mode(&self, mode: MiningMode) {
99 let new_mode = format!("{mode:?}");
100 let mode = std::mem::replace(&mut *self.mode_write(), mode);
101 trace!(target: "miner", "updated mining mode from {:?} to {}", mode, new_mode);
102 self.inner.wake();
103 }
104
105 pub fn poll(
110 &mut self,
111 pool: &Arc<Pool<T>>,
112 cx: &mut Context<'_>,
113 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
114 self.inner.register(cx);
115 if let Some(mut transactions) = self.force_transactions.take() {
116 if let Poll::Ready(next) = self.mode.write().poll(pool, cx) {
117 transactions.extend(next);
118 }
119 return Poll::Ready(transactions);
120 }
121 self.mode.write().poll(pool, cx)
122 }
123}
124
125#[derive(Debug)]
127pub struct MinerInner {
128 waker: AtomicWaker,
129}
130
131impl MinerInner {
132 fn wake(&self) {
134 self.waker.wake();
135 }
136
137 fn register(&self, cx: &Context<'_>) {
138 self.waker.register(cx.waker());
139 }
140}
141
142impl Default for MinerInner {
143 fn default() -> Self {
144 Self { waker: AtomicWaker::new() }
145 }
146}
147
148#[derive(Debug)]
150pub enum MiningMode {
151 None,
153 Auto(ReadyTransactionMiner),
158 FixedBlockTime(FixedBlockTimeMiner),
160
161 Mixed(ReadyTransactionMiner, FixedBlockTimeMiner),
163}
164
165impl MiningMode {
166 pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
167 Self::Auto(ReadyTransactionMiner {
168 max_transactions,
169 has_pending_txs: None,
170 rx: listener.fuse(),
171 coalesce: None,
172 })
173 }
174
175 pub fn interval(duration: Duration) -> Self {
176 Self::FixedBlockTime(FixedBlockTimeMiner::new(duration))
177 }
178
179 pub fn mixed(max_transactions: usize, listener: Receiver<TxHash>, duration: Duration) -> Self {
180 Self::Mixed(
181 ReadyTransactionMiner {
182 max_transactions,
183 has_pending_txs: None,
184 rx: listener.fuse(),
185 coalesce: None,
186 },
187 FixedBlockTimeMiner::new(duration),
188 )
189 }
190
191 pub fn poll<T>(
193 &mut self,
194 pool: &Arc<Pool<T>>,
195 cx: &mut Context<'_>,
196 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
197 match self {
198 Self::None => Poll::Pending,
199 Self::Auto(miner) => miner.poll(pool, cx),
200 Self::FixedBlockTime(miner) => miner.poll(pool, cx),
201 Self::Mixed(auto, fixed) => {
202 let auto_txs = auto.poll(pool, cx);
203 let fixed_txs = fixed.poll(pool, cx);
204
205 match (auto_txs, fixed_txs) {
206 (Poll::Ready(mut auto_txs), Poll::Ready(fixed_txs)) => {
208 for tx in fixed_txs {
209 if auto_txs.iter().any(|auto_tx| auto_tx.hash() == tx.hash()) {
211 continue;
212 }
213 auto_txs.push(tx);
214 }
215 Poll::Ready(auto_txs)
216 }
217 (Poll::Ready(auto_txs), Poll::Pending) => Poll::Ready(auto_txs),
219 (Poll::Pending, fixed_txs) => fixed_txs,
222 }
223 }
224 }
225 }
226}
227
228#[derive(Debug)]
233pub struct FixedBlockTimeMiner {
234 interval: Interval,
236}
237
238impl FixedBlockTimeMiner {
239 pub fn new(duration: Duration) -> Self {
241 let start = tokio::time::Instant::now() + duration;
242 let mut interval = tokio::time::interval_at(start, duration);
243 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
246 Self { interval }
247 }
248
249 fn poll<T>(
250 &mut self,
251 pool: &Arc<Pool<T>>,
252 cx: &mut Context<'_>,
253 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
254 if self.interval.poll_tick(cx).is_ready() {
255 return Poll::Ready(pool.ready_transactions().collect());
257 }
258 Poll::Pending
259 }
260}
261
262impl Default for FixedBlockTimeMiner {
263 fn default() -> Self {
264 Self::new(Duration::from_secs(6))
265 }
266}
267
268pub struct ReadyTransactionMiner {
270 max_transactions: usize,
272 has_pending_txs: Option<bool>,
274 rx: Fuse<Receiver<TxHash>>,
276 coalesce: Option<Pin<Box<Sleep>>>,
278}
279
280impl ReadyTransactionMiner {
281 fn poll<T>(
282 &mut self,
283 pool: &Arc<Pool<T>>,
284 cx: &mut Context<'_>,
285 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
286 let mut saw_new_ready = false;
288 while let Poll::Ready(Some(_hash)) = self.rx.poll_next_unpin(cx) {
289 saw_new_ready = true;
290 }
291
292 if saw_new_ready {
295 self.has_pending_txs = Some(true);
296 if self.coalesce.is_none() {
297 self.coalesce = Some(Box::pin(tokio::time::sleep(INSTANT_COALESCE_WINDOW)));
298 }
299 }
300
301 if self.has_pending_txs == Some(false) {
302 return Poll::Pending;
303 }
304
305 if let Some(sleep) = self.coalesce.as_mut()
306 && sleep.as_mut().poll(cx).is_pending()
307 {
308 return Poll::Pending;
309 }
310 self.coalesce = None;
311
312 let transactions =
313 pool.ready_transactions().take(self.max_transactions).collect::<Vec<_>>();
314
315 self.has_pending_txs = Some(transactions.len() >= self.max_transactions);
317
318 if transactions.is_empty() {
319 self.has_pending_txs = Some(false);
320 return Poll::Pending;
321 }
322
323 Poll::Ready(transactions)
324 }
325}
326
327impl fmt::Debug for ReadyTransactionMiner {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 f.debug_struct("ReadyTransactionMiner")
330 .field("max_transactions", &self.max_transactions)
331 .finish_non_exhaustive()
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use alloy_primitives::{Address, hex};
339 use alloy_rlp::Decodable;
340 use anvil_core::eth::transaction::PendingTransaction;
341 use foundry_primitives::FoundryTxEnvelope;
342 use futures::task::noop_waker;
343
344 fn forced_tx() -> PoolTransaction<FoundryTxEnvelope> {
345 let raw = hex::decode("f86b02843b9aca00830186a094d3e8763675e4c425df46cc3b5c0f6cbdac39604687038d7ea4c68000802ba00eb96ca19e8a77102767a41fc85a36afd5c61ccb09911cec5d3e86e193d9c5aea03a456401896b1b6055311536bf00a718568c744d8c1f9df59879e8350220ca18").unwrap();
346 let tx = FoundryTxEnvelope::decode(&mut &raw[..]).unwrap();
347 let sender: Address = "0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5".parse().unwrap();
348 let pending = PendingTransaction::with_impersonated(tx, sender);
349 PoolTransaction::new(pending)
350 }
351
352 #[test]
353 fn poll_consumes_forced_transactions_before_mode_is_ready() {
354 let forced = forced_tx();
355 let forced_hash = forced.hash();
356
357 let pool = Arc::new(Pool::default());
358 let mut miner = Miner::new(MiningMode::None).with_forced_transactions(Some(vec![forced]));
359
360 let waker = noop_waker();
361 let mut cx = Context::from_waker(&waker);
362
363 let polled = miner.poll(&pool, &mut cx);
364 let txs = match polled {
365 Poll::Ready(txs) => txs,
366 Poll::Pending => panic!("expected forced transactions to be returned immediately"),
367 };
368 assert_eq!(txs.len(), 1);
369 assert_eq!(txs[0].hash(), forced_hash);
370
371 assert!(miner.poll(&pool, &mut cx).is_pending());
373 }
374}