anvil/
service.rs

1//! background service
2
3use crate::{
4    NodeResult,
5    eth::{
6        fees::FeeHistoryService,
7        miner::Miner,
8        pool::{Pool, transactions::PoolTransaction},
9    },
10    filter::Filters,
11    mem::{Backend, storage::MinedBlockOutcome},
12};
13use futures::{FutureExt, Stream, StreamExt};
14use std::{
15    collections::VecDeque,
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19};
20use tokio::{task::JoinHandle, time::Interval};
21
22/// The type that drives the blockchain's state
23///
24/// This service is basically an endless future that continuously polls the miner which returns
25/// transactions for the next block, then those transactions are handed off to the backend to
26/// construct a new block, if all transactions were successfully included in a new block they get
27/// purged from the `Pool`.
28pub struct NodeService {
29    /// The pool that holds all transactions.
30    pool: Arc<Pool>,
31    /// Creates new blocks.
32    block_producer: BlockProducer,
33    /// The miner responsible to select transactions from the `pool`.
34    miner: Miner,
35    /// Maintenance task for fee history related tasks.
36    fee_history: FeeHistoryService,
37    /// Tracks all active filters
38    filters: Filters,
39    /// The interval at which to check for filters that need to be evicted
40    filter_eviction_interval: Interval,
41}
42
43impl NodeService {
44    pub fn new(
45        pool: Arc<Pool>,
46        backend: Arc<Backend>,
47        miner: Miner,
48        fee_history: FeeHistoryService,
49        filters: Filters,
50    ) -> Self {
51        let start = tokio::time::Instant::now() + filters.keep_alive();
52        let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
53        Self {
54            pool,
55            block_producer: BlockProducer::new(backend),
56            miner,
57            fee_history,
58            filter_eviction_interval,
59            filters,
60        }
61    }
62}
63
64impl Future for NodeService {
65    type Output = NodeResult<()>;
66
67    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68        let pin = self.get_mut();
69
70        // this drives block production and feeds new sets of ready transactions to the block
71        // producer
72        loop {
73            // advance block production until pending
74            while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
75                trace!(target: "node", "mined block {}", outcome.block_number);
76                // prune the transactions from the pool
77                pin.pool.on_mined_block(outcome);
78            }
79
80            if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
81                // miner returned a set of transaction that we feed to the producer
82                pin.block_producer.queued.push_back(transactions);
83            } else {
84                // no progress made
85                break;
86            }
87        }
88
89        // poll the fee history task
90        let _ = pin.fee_history.poll_unpin(cx);
91
92        if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
93            let filters = pin.filters.clone();
94
95            // evict filters that timed out
96            tokio::task::spawn(async move { filters.evict().await });
97        }
98
99        Poll::Pending
100    }
101}
102
103/// A type that exclusively mines one block at a time
104#[must_use = "streams do nothing unless polled"]
105struct BlockProducer {
106    /// Holds the backend if no block is being mined
107    idle_backend: Option<Arc<Backend>>,
108    /// Single active future that mines a new block
109    block_mining: Option<JoinHandle<(MinedBlockOutcome, Arc<Backend>)>>,
110    /// backlog of sets of transactions ready to be mined
111    queued: VecDeque<Vec<Arc<PoolTransaction>>>,
112}
113
114impl BlockProducer {
115    fn new(backend: Arc<Backend>) -> Self {
116        Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
117    }
118}
119
120impl Stream for BlockProducer {
121    type Item = MinedBlockOutcome;
122
123    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        let pin = self.get_mut();
125
126        if !pin.queued.is_empty() {
127            // only spawn a building task if there's none in progress already
128            if let Some(backend) = pin.idle_backend.take() {
129                let transactions = pin.queued.pop_front().expect("not empty; qed");
130
131                // we spawn this on as blocking task because this can be blocking for a while in
132                // forking mode, because of all the rpc calls to fetch the required state
133                let handle = tokio::runtime::Handle::current();
134                let mining = tokio::task::spawn_blocking(move || {
135                    handle.block_on(async move {
136                        trace!(target: "miner", "creating new block");
137                        let block = backend.mine_block(transactions).await;
138                        trace!(target: "miner", "created new block: {}", block.block_number);
139                        (block, backend)
140                    })
141                });
142                pin.block_mining = Some(mining);
143            }
144        }
145
146        if let Some(mut mining) = pin.block_mining.take() {
147            if let Poll::Ready(res) = mining.poll_unpin(cx) {
148                return match res {
149                    Ok((outcome, backend)) => {
150                        pin.idle_backend = Some(backend);
151                        Poll::Ready(Some(outcome))
152                    }
153                    Err(err) => {
154                        panic!("miner task failed: {err}");
155                    }
156                };
157            } else {
158                pin.block_mining = Some(mining)
159            }
160        }
161
162        Poll::Pending
163    }
164}