1use crate::eth::pool::{Pool, transactions::PoolTransaction};
4use alloy_primitives::TxHash;
5use futures::{
6 channel::mpsc::Receiver,
7 stream::{Fuse, StreamExt},
8 task::AtomicWaker,
9};
10use parking_lot::{RawRwLock, RwLock, lock_api::RwLockWriteGuard};
11use std::{
12 fmt,
13 sync::Arc,
14 task::{Context, Poll, ready},
15 time::Duration,
16};
17use tokio::time::{Interval, MissedTickBehavior};
18
19pub struct Miner<T> {
20 mode: Arc<RwLock<MiningMode>>,
22 inner: Arc<MinerInner>,
26 force_transactions: Option<Vec<Arc<PoolTransaction<T>>>>,
29}
30
31impl<T> Clone for Miner<T> {
32 fn clone(&self) -> Self {
33 Self {
34 mode: self.mode.clone(),
35 inner: self.inner.clone(),
36 force_transactions: self.force_transactions.clone(),
37 }
38 }
39}
40
41impl<T> fmt::Debug for Miner<T> {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.debug_struct("Miner")
44 .field("mode", &self.mode)
45 .field("force_transactions", &self.force_transactions.as_ref().map(|txs| txs.len()))
46 .finish_non_exhaustive()
47 }
48}
49
50impl<T> Miner<T> {
51 pub fn new(mode: MiningMode) -> Self {
53 Self {
54 mode: Arc::new(RwLock::new(mode)),
55 inner: Default::default(),
56 force_transactions: None,
57 }
58 }
59
60 pub fn with_forced_transactions(
65 mut self,
66 force_transactions: Option<Vec<PoolTransaction<T>>>,
67 ) -> Self {
68 self.force_transactions =
69 force_transactions.map(|tx| tx.into_iter().map(Arc::new).collect());
70 self
71 }
72
73 pub fn mode_write(&self) -> RwLockWriteGuard<'_, RawRwLock, MiningMode> {
75 self.mode.write()
76 }
77
78 pub fn is_auto_mine(&self) -> bool {
80 let mode = self.mode.read();
81 matches!(*mode, MiningMode::Auto(_))
82 }
83
84 pub fn get_interval(&self) -> Option<u64> {
85 let mode = self.mode.read();
86 if let MiningMode::FixedBlockTime(ref mm) = *mode {
87 return Some(mm.interval.period().as_secs());
88 }
89 None
90 }
91
92 pub fn set_mining_mode(&self, mode: MiningMode) {
94 let new_mode = format!("{mode:?}");
95 let mode = std::mem::replace(&mut *self.mode_write(), mode);
96 trace!(target: "miner", "updated mining mode from {:?} to {}", mode, new_mode);
97 self.inner.wake();
98 }
99
100 pub fn poll(
105 &mut self,
106 pool: &Arc<Pool<T>>,
107 cx: &mut Context<'_>,
108 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
109 self.inner.register(cx);
110 let next = ready!(self.mode.write().poll(pool, cx));
111 if let Some(mut transactions) = self.force_transactions.take() {
112 transactions.extend(next);
113 Poll::Ready(transactions)
114 } else {
115 Poll::Ready(next)
116 }
117 }
118}
119
120#[derive(Debug)]
122pub struct MinerInner {
123 waker: AtomicWaker,
124}
125
126impl MinerInner {
127 fn wake(&self) {
129 self.waker.wake();
130 }
131
132 fn register(&self, cx: &Context<'_>) {
133 self.waker.register(cx.waker());
134 }
135}
136
137impl Default for MinerInner {
138 fn default() -> Self {
139 Self { waker: AtomicWaker::new() }
140 }
141}
142
143#[derive(Debug)]
145pub enum MiningMode {
146 None,
148 Auto(ReadyTransactionMiner),
153 FixedBlockTime(FixedBlockTimeMiner),
155
156 Mixed(ReadyTransactionMiner, FixedBlockTimeMiner),
158}
159
160impl MiningMode {
161 pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
162 Self::Auto(ReadyTransactionMiner {
163 max_transactions,
164 has_pending_txs: None,
165 rx: listener.fuse(),
166 })
167 }
168
169 pub fn interval(duration: Duration) -> Self {
170 Self::FixedBlockTime(FixedBlockTimeMiner::new(duration))
171 }
172
173 pub fn mixed(max_transactions: usize, listener: Receiver<TxHash>, duration: Duration) -> Self {
174 Self::Mixed(
175 ReadyTransactionMiner { max_transactions, has_pending_txs: None, rx: listener.fuse() },
176 FixedBlockTimeMiner::new(duration),
177 )
178 }
179
180 pub fn poll<T>(
182 &mut self,
183 pool: &Arc<Pool<T>>,
184 cx: &mut Context<'_>,
185 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
186 match self {
187 Self::None => Poll::Pending,
188 Self::Auto(miner) => miner.poll(pool, cx),
189 Self::FixedBlockTime(miner) => miner.poll(pool, cx),
190 Self::Mixed(auto, fixed) => {
191 let auto_txs = auto.poll(pool, cx);
192 let fixed_txs = fixed.poll(pool, cx);
193
194 match (auto_txs, fixed_txs) {
195 (Poll::Ready(mut auto_txs), Poll::Ready(fixed_txs)) => {
197 for tx in fixed_txs {
198 if auto_txs.iter().any(|auto_tx| auto_tx.hash() == tx.hash()) {
200 continue;
201 }
202 auto_txs.push(tx);
203 }
204 Poll::Ready(auto_txs)
205 }
206 (Poll::Ready(auto_txs), Poll::Pending) => Poll::Ready(auto_txs),
208 (Poll::Pending, fixed_txs) => fixed_txs,
211 }
212 }
213 }
214 }
215}
216
217#[derive(Debug)]
222pub struct FixedBlockTimeMiner {
223 interval: Interval,
225}
226
227impl FixedBlockTimeMiner {
228 pub fn new(duration: Duration) -> Self {
230 let start = tokio::time::Instant::now() + duration;
231 let mut interval = tokio::time::interval_at(start, duration);
232 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
235 Self { interval }
236 }
237
238 fn poll<T>(
239 &mut self,
240 pool: &Arc<Pool<T>>,
241 cx: &mut Context<'_>,
242 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
243 if self.interval.poll_tick(cx).is_ready() {
244 return Poll::Ready(pool.ready_transactions().collect());
246 }
247 Poll::Pending
248 }
249}
250
251impl Default for FixedBlockTimeMiner {
252 fn default() -> Self {
253 Self::new(Duration::from_secs(6))
254 }
255}
256
257pub struct ReadyTransactionMiner {
259 max_transactions: usize,
261 has_pending_txs: Option<bool>,
263 rx: Fuse<Receiver<TxHash>>,
265}
266
267impl ReadyTransactionMiner {
268 fn poll<T>(
269 &mut self,
270 pool: &Arc<Pool<T>>,
271 cx: &mut Context<'_>,
272 ) -> Poll<Vec<Arc<PoolTransaction<T>>>> {
273 while let Poll::Ready(Some(_hash)) = self.rx.poll_next_unpin(cx) {
275 self.has_pending_txs = Some(true);
276 }
277
278 if self.has_pending_txs == Some(false) {
279 return Poll::Pending;
280 }
281
282 let transactions =
283 pool.ready_transactions().take(self.max_transactions).collect::<Vec<_>>();
284
285 self.has_pending_txs = Some(transactions.len() >= self.max_transactions);
287
288 if transactions.is_empty() {
289 return Poll::Pending;
290 }
291
292 Poll::Ready(transactions)
293 }
294}
295
296impl fmt::Debug for ReadyTransactionMiner {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 f.debug_struct("ReadyTransactionMiner")
299 .field("max_transactions", &self.max_transactions)
300 .finish_non_exhaustive()
301 }
302}