1use crate::{
4 NodeResult,
5 eth::{
6 backend::validate::TransactionValidator,
7 fees::FeeHistoryService,
8 miner::Miner,
9 pool::{Pool, transactions::PoolTransaction},
10 },
11 filter::Filters,
12 mem::{Backend, storage::MinedBlockOutcome},
13};
14use alloy_consensus::TxReceipt;
15use alloy_network::Network;
16use foundry_primitives::{FoundryReceiptEnvelope, FoundryTxEnvelope};
17use futures::{FutureExt, Stream, StreamExt};
18use std::{
19 collections::VecDeque,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23};
24use tokio::{task::JoinHandle, time::Interval};
25
26pub struct NodeService<N: Network>
33where
34 N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log>,
35{
36 pool: Arc<Pool<N::TxEnvelope>>,
38 block_producer: BlockProducer<N>,
40 miner: Miner<N::TxEnvelope>,
42 fee_history: FeeHistoryService<N>,
44 filters: Filters<N>,
46 filter_eviction_interval: Interval,
48}
49
50impl<N: Network> NodeService<N>
51where
52 Backend<N>: TransactionValidator<N::TxEnvelope>,
53 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
54{
55 pub fn new(
56 pool: Arc<Pool<N::TxEnvelope>>,
57 backend: Arc<Backend<N>>,
58 miner: Miner<N::TxEnvelope>,
59 fee_history: FeeHistoryService<N>,
60 filters: Filters<N>,
61 ) -> Self {
62 let start = tokio::time::Instant::now() + filters.keep_alive();
63 let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
64 Self {
65 pool,
66 block_producer: BlockProducer::new(backend),
67 miner,
68 fee_history,
69 filter_eviction_interval,
70 filters,
71 }
72 }
73}
74
75impl<N: Network> Future for NodeService<N>
76where
77 Backend<N>: TransactionValidator<N::TxEnvelope>,
78 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
79{
80 type Output = NodeResult<()>;
81
82 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let pin = self.get_mut();
84
85 loop {
88 while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
90 trace!(target: "node", "mined block {}", outcome.block_number);
91 pin.pool.on_mined_block(outcome);
93 }
94
95 if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
96 pin.block_producer.queued.push_back(transactions);
98 } else {
99 break;
101 }
102 }
103
104 let _ = pin.fee_history.poll_unpin(cx);
106
107 if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
108 let filters = pin.filters.clone();
109
110 tokio::task::spawn(async move { filters.evict().await });
112 }
113
114 Poll::Pending
115 }
116}
117
118type MiningResult<N> = (MinedBlockOutcome<<N as Network>::TxEnvelope>, Arc<Backend<N>>);
119
120#[must_use = "streams do nothing unless polled"]
122struct BlockProducer<N: Network> {
123 idle_backend: Option<Arc<Backend<N>>>,
125 block_mining: Option<JoinHandle<MiningResult<N>>>,
127 queued: VecDeque<Vec<Arc<PoolTransaction<N::TxEnvelope>>>>,
129}
130
131impl<N: Network> BlockProducer<N>
132where
133 Backend<N>: TransactionValidator<N::TxEnvelope>,
134 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
135{
136 fn new(backend: Arc<Backend<N>>) -> Self {
137 Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
138 }
139}
140
141impl<N: Network> Stream for BlockProducer<N>
142where
143 Backend<N>: TransactionValidator<N::TxEnvelope> + Send + Sync + 'static,
144 N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope> + 'static,
145{
146 type Item = MinedBlockOutcome<N::TxEnvelope>;
147
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 let pin = self.get_mut();
150
151 if !pin.queued.is_empty() {
152 if let Some(backend) = pin.idle_backend.take() {
154 let transactions = pin.queued.pop_front().expect("not empty; qed");
155
156 let handle = tokio::runtime::Handle::current();
159 let mining = tokio::task::spawn_blocking(move || {
160 handle.block_on(async move {
161 trace!(target: "miner", "creating new block");
162 let block = backend.mine_block(transactions).await;
163 trace!(target: "miner", "created new block: {}", block.block_number);
164 (block, backend)
165 })
166 });
167 pin.block_mining = Some(mining);
168 }
169 }
170
171 if let Some(mut mining) = pin.block_mining.take() {
172 if let Poll::Ready(res) = mining.poll_unpin(cx) {
173 return match res {
174 Ok((outcome, backend)) => {
175 pin.idle_backend = Some(backend);
176 Poll::Ready(Some(outcome))
177 }
178 Err(err) => {
179 panic!("miner task failed: {err}");
180 }
181 };
182 }
183 pin.block_mining = Some(mining)
184 }
185
186 Poll::Pending
187 }
188}