1use crate::{
4 NodeResult,
5 eth::{
6 backend::validate::TransactionValidator,
7 fees::FeeHistoryService,
8 miner::Miner,
9 pool::{Pool, transactions::PoolTransaction},
10 },
11 filter::Filters,
12 mem::{Backend, storage::MinedBlockOutcome},
13};
14use alloy_network::Network;
15use foundry_primitives::{FoundryReceiptEnvelope, FoundryTxEnvelope};
16use futures::{FutureExt, Stream, StreamExt};
17use std::{
18 collections::VecDeque,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tokio::{task::JoinHandle, time::Interval};
24
25pub struct NodeService<N: Network> {
32 pool: Arc<Pool<N::TxEnvelope>>,
34 block_producer: BlockProducer<N>,
36 miner: Miner<N::TxEnvelope>,
38 fee_history: FeeHistoryService,
40 filters: Filters<N>,
42 filter_eviction_interval: Interval,
44}
45
46impl<N: Network> NodeService<N>
47where
48 Backend<N>: TransactionValidator<N::TxEnvelope>,
49 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
50{
51 pub fn new(
52 pool: Arc<Pool<N::TxEnvelope>>,
53 backend: Arc<Backend<N>>,
54 miner: Miner<N::TxEnvelope>,
55 fee_history: FeeHistoryService,
56 filters: Filters<N>,
57 ) -> Self {
58 let start = tokio::time::Instant::now() + filters.keep_alive();
59 let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
60 Self {
61 pool,
62 block_producer: BlockProducer::new(backend),
63 miner,
64 fee_history,
65 filter_eviction_interval,
66 filters,
67 }
68 }
69}
70
71impl<N: Network> Future for NodeService<N>
72where
73 Backend<N>: TransactionValidator<N::TxEnvelope>,
74 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
75{
76 type Output = NodeResult<()>;
77
78 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 let pin = self.get_mut();
80
81 loop {
84 while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
86 trace!(target: "node", "mined block {}", outcome.block_number);
87 pin.pool.on_mined_block(outcome);
89 }
90
91 if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
92 pin.block_producer.queued.push_back(transactions);
94 } else {
95 break;
97 }
98 }
99
100 let _ = pin.fee_history.poll_unpin(cx);
102
103 if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
104 let filters = pin.filters.clone();
105
106 tokio::task::spawn(async move { filters.evict().await });
108 }
109
110 Poll::Pending
111 }
112}
113
114type MiningResult<N> = (MinedBlockOutcome<<N as Network>::TxEnvelope>, Arc<Backend<N>>);
115
116#[must_use = "streams do nothing unless polled"]
118struct BlockProducer<N: Network> {
119 idle_backend: Option<Arc<Backend<N>>>,
121 block_mining: Option<JoinHandle<MiningResult<N>>>,
123 queued: VecDeque<Vec<Arc<PoolTransaction<N::TxEnvelope>>>>,
125}
126
127impl<N: Network> BlockProducer<N>
128where
129 Backend<N>: TransactionValidator<N::TxEnvelope>,
130 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
131{
132 fn new(backend: Arc<Backend<N>>) -> Self {
133 Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
134 }
135}
136
137impl<N: Network> Stream for BlockProducer<N>
138where
139 Backend<N>: TransactionValidator<N::TxEnvelope> + Send + Sync + 'static,
140 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope> + 'static,
141{
142 type Item = MinedBlockOutcome<N::TxEnvelope>;
143
144 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145 let pin = self.get_mut();
146
147 if !pin.queued.is_empty() {
148 if let Some(backend) = pin.idle_backend.take() {
150 let transactions = pin.queued.pop_front().expect("not empty; qed");
151
152 let handle = tokio::runtime::Handle::current();
155 let mining = tokio::task::spawn_blocking(move || {
156 handle.block_on(async move {
157 trace!(target: "miner", "creating new block");
158 let block = backend.mine_block(transactions).await;
159 trace!(target: "miner", "created new block: {}", block.block_number);
160 (block, backend)
161 })
162 });
163 pin.block_mining = Some(mining);
164 }
165 }
166
167 if let Some(mut mining) = pin.block_mining.take() {
168 if let Poll::Ready(res) = mining.poll_unpin(cx) {
169 return match res {
170 Ok((outcome, backend)) => {
171 pin.idle_backend = Some(backend);
172 Poll::Ready(Some(outcome))
173 }
174 Err(err) => {
175 panic!("miner task failed: {err}");
176 }
177 };
178 } else {
179 pin.block_mining = Some(mining)
180 }
181 }
182
183 Poll::Pending
184 }
185}