Skip to main content

anvil/
service.rs

1//! background service
2
3use crate::{
4    NodeResult,
5    eth::{
6        backend::validate::TransactionValidator,
7        fees::FeeHistoryService,
8        miner::Miner,
9        pool::{Pool, transactions::PoolTransaction},
10    },
11    filter::Filters,
12    mem::{Backend, storage::MinedBlockOutcome},
13};
14use alloy_network::Network;
15use foundry_primitives::{FoundryReceiptEnvelope, FoundryTxEnvelope};
16use futures::{FutureExt, Stream, StreamExt};
17use std::{
18    collections::VecDeque,
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22};
23use tokio::{task::JoinHandle, time::Interval};
24
25/// The type that drives the blockchain's state
26///
27/// This service is basically an endless future that continuously polls the miner which returns
28/// transactions for the next block, then those transactions are handed off to the backend to
29/// construct a new block, if all transactions were successfully included in a new block they get
30/// purged from the `Pool`.
31pub struct NodeService<N: Network> {
32    /// The pool that holds all transactions.
33    pool: Arc<Pool<N::TxEnvelope>>,
34    /// Creates new blocks.
35    block_producer: BlockProducer<N>,
36    /// The miner responsible to select transactions from the `pool`.
37    miner: Miner<N::TxEnvelope>,
38    /// Maintenance task for fee history related tasks.
39    fee_history: FeeHistoryService,
40    /// Tracks all active filters
41    filters: Filters<N>,
42    /// The interval at which to check for filters that need to be evicted
43    filter_eviction_interval: Interval,
44}
45
46impl<N: Network> NodeService<N>
47where
48    Backend<N>: TransactionValidator<N::TxEnvelope>,
49    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
50{
51    pub fn new(
52        pool: Arc<Pool<N::TxEnvelope>>,
53        backend: Arc<Backend<N>>,
54        miner: Miner<N::TxEnvelope>,
55        fee_history: FeeHistoryService,
56        filters: Filters<N>,
57    ) -> Self {
58        let start = tokio::time::Instant::now() + filters.keep_alive();
59        let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
60        Self {
61            pool,
62            block_producer: BlockProducer::new(backend),
63            miner,
64            fee_history,
65            filter_eviction_interval,
66            filters,
67        }
68    }
69}
70
71impl<N: Network> Future for NodeService<N>
72where
73    Backend<N>: TransactionValidator<N::TxEnvelope>,
74    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
75{
76    type Output = NodeResult<()>;
77
78    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        let pin = self.get_mut();
80
81        // this drives block production and feeds new sets of ready transactions to the block
82        // producer
83        loop {
84            // advance block production until pending
85            while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
86                trace!(target: "node", "mined block {}", outcome.block_number);
87                // prune the transactions from the pool
88                pin.pool.on_mined_block(outcome);
89            }
90
91            if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
92                // miner returned a set of transaction that we feed to the producer
93                pin.block_producer.queued.push_back(transactions);
94            } else {
95                // no progress made
96                break;
97            }
98        }
99
100        // poll the fee history task
101        let _ = pin.fee_history.poll_unpin(cx);
102
103        if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
104            let filters = pin.filters.clone();
105
106            // evict filters that timed out
107            tokio::task::spawn(async move { filters.evict().await });
108        }
109
110        Poll::Pending
111    }
112}
113
114type MiningResult<N> = (MinedBlockOutcome<<N as Network>::TxEnvelope>, Arc<Backend<N>>);
115
116/// A type that exclusively mines one block at a time
117#[must_use = "streams do nothing unless polled"]
118struct BlockProducer<N: Network> {
119    /// Holds the backend if no block is being mined
120    idle_backend: Option<Arc<Backend<N>>>,
121    /// Single active future that mines a new block
122    block_mining: Option<JoinHandle<MiningResult<N>>>,
123    /// backlog of sets of transactions ready to be mined
124    queued: VecDeque<Vec<Arc<PoolTransaction<N::TxEnvelope>>>>,
125}
126
127impl<N: Network> BlockProducer<N>
128where
129    Backend<N>: TransactionValidator<N::TxEnvelope>,
130    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
131{
132    fn new(backend: Arc<Backend<N>>) -> Self {
133        Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
134    }
135}
136
137impl<N: Network> Stream for BlockProducer<N>
138where
139    Backend<N>: TransactionValidator<N::TxEnvelope> + Send + Sync + 'static,
140    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope> + 'static,
141{
142    type Item = MinedBlockOutcome<N::TxEnvelope>;
143
144    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145        let pin = self.get_mut();
146
147        if !pin.queued.is_empty() {
148            // only spawn a building task if there's none in progress already
149            if let Some(backend) = pin.idle_backend.take() {
150                let transactions = pin.queued.pop_front().expect("not empty; qed");
151
152                // we spawn this on as blocking task because this can be blocking for a while in
153                // forking mode, because of all the rpc calls to fetch the required state
154                let handle = tokio::runtime::Handle::current();
155                let mining = tokio::task::spawn_blocking(move || {
156                    handle.block_on(async move {
157                        trace!(target: "miner", "creating new block");
158                        let block = backend.mine_block(transactions).await;
159                        trace!(target: "miner", "created new block: {}", block.block_number);
160                        (block, backend)
161                    })
162                });
163                pin.block_mining = Some(mining);
164            }
165        }
166
167        if let Some(mut mining) = pin.block_mining.take() {
168            if let Poll::Ready(res) = mining.poll_unpin(cx) {
169                return match res {
170                    Ok((outcome, backend)) => {
171                        pin.idle_backend = Some(backend);
172                        Poll::Ready(Some(outcome))
173                    }
174                    Err(err) => {
175                        panic!("miner task failed: {err}");
176                    }
177                };
178            } else {
179                pin.block_mining = Some(mining)
180            }
181        }
182
183        Poll::Pending
184    }
185}