Skip to main content

anvil/
service.rs

1//! background service
2
3use crate::{
4    NodeResult,
5    eth::{
6        backend::validate::TransactionValidator,
7        fees::FeeHistoryService,
8        miner::Miner,
9        pool::{Pool, transactions::PoolTransaction},
10    },
11    filter::Filters,
12    mem::{Backend, storage::MinedBlockOutcome},
13};
14use alloy_consensus::TxReceipt;
15use alloy_network::Network;
16use foundry_primitives::{FoundryReceiptEnvelope, FoundryTxEnvelope};
17use futures::{FutureExt, Stream, StreamExt};
18use std::{
19    collections::VecDeque,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23};
24use tokio::{task::JoinHandle, time::Interval};
25
26/// The type that drives the blockchain's state
27///
28/// This service is basically an endless future that continuously polls the miner which returns
29/// transactions for the next block, then those transactions are handed off to the backend to
30/// construct a new block, if all transactions were successfully included in a new block they get
31/// purged from the `Pool`.
32pub struct NodeService<N: Network>
33where
34    N::ReceiptEnvelope: TxReceipt<Log = alloy_primitives::Log>,
35{
36    /// The pool that holds all transactions.
37    pool: Arc<Pool<N::TxEnvelope>>,
38    /// Creates new blocks.
39    block_producer: BlockProducer<N>,
40    /// The miner responsible to select transactions from the `pool`.
41    miner: Miner<N::TxEnvelope>,
42    /// Maintenance task for fee history related tasks.
43    fee_history: FeeHistoryService<N>,
44    /// Tracks all active filters
45    filters: Filters<N>,
46    /// The interval at which to check for filters that need to be evicted
47    filter_eviction_interval: Interval,
48}
49
50impl<N: Network> NodeService<N>
51where
52    Backend<N>: TransactionValidator<N::TxEnvelope>,
53    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
54{
55    pub fn new(
56        pool: Arc<Pool<N::TxEnvelope>>,
57        backend: Arc<Backend<N>>,
58        miner: Miner<N::TxEnvelope>,
59        fee_history: FeeHistoryService<N>,
60        filters: Filters<N>,
61    ) -> Self {
62        let start = tokio::time::Instant::now() + filters.keep_alive();
63        let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
64        Self {
65            pool,
66            block_producer: BlockProducer::new(backend),
67            miner,
68            fee_history,
69            filter_eviction_interval,
70            filters,
71        }
72    }
73}
74
75impl<N: Network> Future for NodeService<N>
76where
77    Backend<N>: TransactionValidator<N::TxEnvelope>,
78    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
79{
80    type Output = NodeResult<()>;
81
82    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let pin = self.get_mut();
84
85        // this drives block production and feeds new sets of ready transactions to the block
86        // producer
87        loop {
88            // advance block production until pending
89            while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
90                trace!(target: "node", "mined block {}", outcome.block_number);
91                // prune the transactions from the pool
92                pin.pool.on_mined_block(outcome);
93            }
94
95            if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
96                // miner returned a set of transaction that we feed to the producer
97                pin.block_producer.queued.push_back(transactions);
98            } else {
99                // no progress made
100                break;
101            }
102        }
103
104        // poll the fee history task
105        let _ = pin.fee_history.poll_unpin(cx);
106
107        if pin.filter_eviction_interval.poll_tick(cx).is_ready() {
108            let filters = pin.filters.clone();
109
110            // evict filters that timed out
111            tokio::task::spawn(async move { filters.evict().await });
112        }
113
114        Poll::Pending
115    }
116}
117
118type MiningResult<N> = (MinedBlockOutcome<<N as Network>::TxEnvelope>, Arc<Backend<N>>);
119
120/// A type that exclusively mines one block at a time
121#[must_use = "streams do nothing unless polled"]
122struct BlockProducer<N: Network> {
123    /// Holds the backend if no block is being mined
124    idle_backend: Option<Arc<Backend<N>>>,
125    /// Single active future that mines a new block
126    block_mining: Option<JoinHandle<MiningResult<N>>>,
127    /// backlog of sets of transactions ready to be mined
128    queued: VecDeque<Vec<Arc<PoolTransaction<N::TxEnvelope>>>>,
129}
130
131impl<N: Network> BlockProducer<N>
132where
133    Backend<N>: TransactionValidator<N::TxEnvelope>,
134    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope>,
135{
136    fn new(backend: Arc<Backend<N>>) -> Self {
137        Self { idle_backend: Some(backend), block_mining: None, queued: Default::default() }
138    }
139}
140
141impl<N: Network> Stream for BlockProducer<N>
142where
143    Backend<N>: TransactionValidator<N::TxEnvelope> + Send + Sync + 'static,
144    N: Network<TxEnvelope = FoundryTxEnvelope, ReceiptEnvelope = FoundryReceiptEnvelope> + 'static,
145{
146    type Item = MinedBlockOutcome<N::TxEnvelope>;
147
148    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149        let pin = self.get_mut();
150
151        if !pin.queued.is_empty() {
152            // only spawn a building task if there's none in progress already
153            if let Some(backend) = pin.idle_backend.take() {
154                let transactions = pin.queued.pop_front().expect("not empty; qed");
155
156                // we spawn this on as blocking task because this can be blocking for a while in
157                // forking mode, because of all the rpc calls to fetch the required state
158                let handle = tokio::runtime::Handle::current();
159                let mining = tokio::task::spawn_blocking(move || {
160                    handle.block_on(async move {
161                        trace!(target: "miner", "creating new block");
162                        let block = backend.mine_block(transactions).await;
163                        trace!(target: "miner", "created new block: {}", block.block_number);
164                        (block, backend)
165                    })
166                });
167                pin.block_mining = Some(mining);
168            }
169        }
170
171        if let Some(mut mining) = pin.block_mining.take() {
172            if let Poll::Ready(res) = mining.poll_unpin(cx) {
173                return match res {
174                    Ok((outcome, backend)) => {
175                        pin.idle_backend = Some(backend);
176                        Poll::Ready(Some(outcome))
177                    }
178                    Err(err) => {
179                        panic!("miner task failed: {err}");
180                    }
181                };
182            }
183            pin.block_mining = Some(mining)
184        }
185
186        Poll::Pending
187    }
188}