1use crate::{
4 eth::{
5 fees::FeeHistoryService,
6 miner::Miner,
7 pool::{transactions::PoolTransaction, Pool},
8 },
9 filter::Filters,
10 mem::{storage::MinedBlockOutcome, Backend},
11 NodeResult,
12};
13use futures::{FutureExt, Stream, StreamExt};
14use std::{
15 collections::VecDeque,
16 future::Future,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tokio::{task::JoinHandle, time::Interval};
22
23pub struct NodeService {
30 pool: Arc<Pool>,
32 block_producer: BlockProducer,
34 miner: Miner,
36 fee_history: FeeHistoryService,
38 filters: Filters,
40 filter_eviction_interval: Interval,
42}
43
44impl NodeService {
45 pub fn new(
46 pool: Arc<Pool>,
47 backend: Arc<Backend>,
48 miner: Miner,
49 fee_history: FeeHistoryService,
50 filters: Filters,
51 ) -> Self {
52 let start = tokio::time::Instant::now() + filters.keep_alive();
53 let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
54 Self {
55 pool,
56 block_producer: BlockProducer::new(backend),
57 miner,
58 fee_history,
59 filter_eviction_interval,
60 filters,
61 }
62 }
63}
64
65impl Future for NodeService {
66 type Output = NodeResult<()>;
67
68 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69 let pin = self.get_mut();
70
71 loop {
74 while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
76 trace!(target: "node", "mined block {}", outcome.block_number);
77 pin.pool.on_mined_block(outcome);
79 }
80
81 if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
82 pin.block_producer.queued.push_back(transactions);
84 } else {
85 break
87 }
88 }
89
90 let _ = pin.fee_history.poll_unpin(cx);
92
93 if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
94 let filters = pin.filters.clone();
95
96 tokio::task::spawn(async move { filters.evict().await });
98 }
99
100 Poll::Pending
101 }
102}
103
104#[must_use = "streams do nothing unless polled"]
106struct BlockProducer {
107 idle_backend: Option<Arc<Backend>>,
109 block_mining: Option<JoinHandle<(MinedBlockOutcome, Arc<Backend>)>>,
111 queued: VecDeque<Vec<Arc<PoolTransaction>>>,
113}
114
115impl BlockProducer {
116 fn new(backend: Arc<Backend>) -> Self {
117 Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
118 }
119}
120
121impl Stream for BlockProducer {
122 type Item = MinedBlockOutcome;
123
124 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
125 let pin = self.get_mut();
126
127 if !pin.queued.is_empty() {
128 if let Some(backend) = pin.idle_backend.take() {
130 let transactions = pin.queued.pop_front().expect("not empty; qed");
131
132 let handle = tokio::runtime::Handle::current();
135 let mining = tokio::task::spawn_blocking(move || {
136 handle.block_on(async move {
137 trace!(target: "miner", "creating new block");
138 let block = backend.mine_block(transactions).await;
139 trace!(target: "miner", "created new block: {}", block.block_number);
140 (block, backend)
141 })
142 });
143 pin.block_mining = Some(mining);
144 }
145 }
146
147 if let Some(mut mining) = pin.block_mining.take() {
148 if let Poll::Ready(res) = mining.poll_unpin(cx) {
149 return match res {
150 Ok((outcome, backend)) => {
151 pin.idle_backend = Some(backend);
152 Poll::Ready(Some(outcome))
153 }
154 Err(err) => {
155 panic!("miner task failed: {err}");
156 }
157 }
158 } else {
159 pin.block_mining = Some(mining)
160 }
161 }
162
163 Poll::Pending
164 }
165}