1use crate::{
4 NodeResult,
5 eth::{
6 fees::FeeHistoryService,
7 miner::Miner,
8 pool::{Pool, transactions::PoolTransaction},
9 },
10 filter::Filters,
11 mem::{Backend, storage::MinedBlockOutcome},
12};
13use futures::{FutureExt, Stream, StreamExt};
14use std::{
15 collections::VecDeque,
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19};
20use tokio::{task::JoinHandle, time::Interval};
21
22pub struct NodeService {
29 pool: Arc<Pool>,
31 block_producer: BlockProducer,
33 miner: Miner,
35 fee_history: FeeHistoryService,
37 filters: Filters,
39 filter_eviction_interval: Interval,
41}
42
43impl NodeService {
44 pub fn new(
45 pool: Arc<Pool>,
46 backend: Arc<Backend>,
47 miner: Miner,
48 fee_history: FeeHistoryService,
49 filters: Filters,
50 ) -> Self {
51 let start = tokio::time::Instant::now() + filters.keep_alive();
52 let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
53 Self {
54 pool,
55 block_producer: BlockProducer::new(backend),
56 miner,
57 fee_history,
58 filter_eviction_interval,
59 filters,
60 }
61 }
62}
63
64impl Future for NodeService {
65 type Output = NodeResult<()>;
66
67 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68 let pin = self.get_mut();
69
70 loop {
73 while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
75 trace!(target: "node", "mined block {}", outcome.block_number);
76 pin.pool.on_mined_block(outcome);
78 }
79
80 if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
81 pin.block_producer.queued.push_back(transactions);
83 } else {
84 break;
86 }
87 }
88
89 let _ = pin.fee_history.poll_unpin(cx);
91
92 if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
93 let filters = pin.filters.clone();
94
95 tokio::task::spawn(async move { filters.evict().await });
97 }
98
99 Poll::Pending
100 }
101}
102
103#[must_use = "streams do nothing unless polled"]
105struct BlockProducer {
106 idle_backend: Option<Arc<Backend>>,
108 block_mining: Option<JoinHandle<(MinedBlockOutcome, Arc<Backend>)>>,
110 queued: VecDeque<Vec<Arc<PoolTransaction>>>,
112}
113
114impl BlockProducer {
115 fn new(backend: Arc<Backend>) -> Self {
116 Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
117 }
118}
119
120impl Stream for BlockProducer {
121 type Item = MinedBlockOutcome;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let pin = self.get_mut();
125
126 if !pin.queued.is_empty() {
127 if let Some(backend) = pin.idle_backend.take() {
129 let transactions = pin.queued.pop_front().expect("not empty; qed");
130
131 let handle = tokio::runtime::Handle::current();
134 let mining = tokio::task::spawn_blocking(move || {
135 handle.block_on(async move {
136 trace!(target: "miner", "creating new block");
137 let block = backend.mine_block(transactions).await;
138 trace!(target: "miner", "created new block: {}", block.block_number);
139 (block, backend)
140 })
141 });
142 pin.block_mining = Some(mining);
143 }
144 }
145
146 if let Some(mut mining) = pin.block_mining.take() {
147 if let Poll::Ready(res) = mining.poll_unpin(cx) {
148 return match res {
149 Ok((outcome, backend)) => {
150 pin.idle_backend = Some(backend);
151 Poll::Ready(Some(outcome))
152 }
153 Err(err) => {
154 panic!("miner task failed: {err}");
155 }
156 };
157 } else {
158 pin.block_mining = Some(mining)
159 }
160 }
161
162 Poll::Pending
163 }
164}