anvil/
service.rs

1//! background service
2
3use crate::{
4    eth::{
5        fees::FeeHistoryService,
6        miner::Miner,
7        pool::{transactions::PoolTransaction, Pool},
8    },
9    filter::Filters,
10    mem::{storage::MinedBlockOutcome, Backend},
11    NodeResult,
12};
13use futures::{FutureExt, Stream, StreamExt};
14use std::{
15    collections::VecDeque,
16    future::Future,
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20};
21use tokio::{task::JoinHandle, time::Interval};
22
23/// The type that drives the blockchain's state
24///
25/// This service is basically an endless future that continuously polls the miner which returns
26/// transactions for the next block, then those transactions are handed off to the backend to
27/// construct a new block, if all transactions were successfully included in a new block they get
28/// purged from the `Pool`.
29pub struct NodeService {
30    /// The pool that holds all transactions.
31    pool: Arc<Pool>,
32    /// Creates new blocks.
33    block_producer: BlockProducer,
34    /// The miner responsible to select transactions from the `pool`.
35    miner: Miner,
36    /// Maintenance task for fee history related tasks.
37    fee_history: FeeHistoryService,
38    /// Tracks all active filters
39    filters: Filters,
40    /// The interval at which to check for filters that need to be evicted
41    filter_eviction_interval: Interval,
42}
43
44impl NodeService {
45    pub fn new(
46        pool: Arc<Pool>,
47        backend: Arc<Backend>,
48        miner: Miner,
49        fee_history: FeeHistoryService,
50        filters: Filters,
51    ) -> Self {
52        let start = tokio::time::Instant::now() + filters.keep_alive();
53        let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
54        Self {
55            pool,
56            block_producer: BlockProducer::new(backend),
57            miner,
58            fee_history,
59            filter_eviction_interval,
60            filters,
61        }
62    }
63}
64
65impl Future for NodeService {
66    type Output = NodeResult<()>;
67
68    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69        let pin = self.get_mut();
70
71        // this drives block production and feeds new sets of ready transactions to the block
72        // producer
73        loop {
74            // advance block production until pending
75            while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
76                trace!(target: "node", "mined block {}", outcome.block_number);
77                // prune the transactions from the pool
78                pin.pool.on_mined_block(outcome);
79            }
80
81            if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
82                // miner returned a set of transaction that we feed to the producer
83                pin.block_producer.queued.push_back(transactions);
84            } else {
85                // no progress made
86                break
87            }
88        }
89
90        // poll the fee history task
91        let _ = pin.fee_history.poll_unpin(cx);
92
93        if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
94            let filters = pin.filters.clone();
95
96            // evict filters that timed out
97            tokio::task::spawn(async move { filters.evict().await });
98        }
99
100        Poll::Pending
101    }
102}
103
104/// A type that exclusively mines one block at a time
105#[must_use = "streams do nothing unless polled"]
106struct BlockProducer {
107    /// Holds the backend if no block is being mined
108    idle_backend: Option<Arc<Backend>>,
109    /// Single active future that mines a new block
110    block_mining: Option<JoinHandle<(MinedBlockOutcome, Arc<Backend>)>>,
111    /// backlog of sets of transactions ready to be mined
112    queued: VecDeque<Vec<Arc<PoolTransaction>>>,
113}
114
115impl BlockProducer {
116    fn new(backend: Arc<Backend>) -> Self {
117        Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
118    }
119}
120
121impl Stream for BlockProducer {
122    type Item = MinedBlockOutcome;
123
124    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
125        let pin = self.get_mut();
126
127        if !pin.queued.is_empty() {
128            // only spawn a building task if there's none in progress already
129            if let Some(backend) = pin.idle_backend.take() {
130                let transactions = pin.queued.pop_front().expect("not empty; qed");
131
132                // we spawn this on as blocking task because this can be blocking for a while in
133                // forking mode, because of all the rpc calls to fetch the required state
134                let handle = tokio::runtime::Handle::current();
135                let mining = tokio::task::spawn_blocking(move || {
136                    handle.block_on(async move {
137                        trace!(target: "miner", "creating new block");
138                        let block = backend.mine_block(transactions).await;
139                        trace!(target: "miner", "created new block: {}", block.block_number);
140                        (block, backend)
141                    })
142                });
143                pin.block_mining = Some(mining);
144            }
145        }
146
147        if let Some(mut mining) = pin.block_mining.take() {
148            if let Poll::Ready(res) = mining.poll_unpin(cx) {
149                return match res {
150                    Ok((outcome, backend)) => {
151                        pin.idle_backend = Some(backend);
152                        Poll::Ready(Some(outcome))
153                    }
154                    Err(err) => {
155                        panic!("miner task failed: {err}");
156                    }
157                }
158            } else {
159                pin.block_mining = Some(mining)
160            }
161        }
162
163        Poll::Pending
164    }
165}