foundry_evm_core/fork/
multi.rs

1//! Support for running multiple fork backends.
2//!
3//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
4//! concurrently active pairs at once.
5
6use super::CreateFork;
7use alloy_consensus::BlockHeader;
8use alloy_primitives::{map::HashMap, U256};
9use alloy_provider::network::BlockResponse;
10use foundry_common::provider::{ProviderBuilder, RetryProvider};
11use foundry_config::Config;
12use foundry_fork_db::{cache::BlockchainDbMeta, BackendHandler, BlockchainDb, SharedBackend};
13use futures::{
14    channel::mpsc::{channel, Receiver, Sender},
15    stream::{Fuse, Stream},
16    task::{Context, Poll},
17    Future, FutureExt, StreamExt,
18};
19use revm::primitives::Env;
20use std::{
21    fmt::{self, Write},
22    pin::Pin,
23    sync::{
24        atomic::AtomicUsize,
25        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
26        Arc,
27    },
28    time::Duration,
29};
30
31/// The _unique_ identifier for a specific fork, this could be the name of the network a custom
32/// descriptive name.
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub struct ForkId(pub String);
35
36impl ForkId {
37    /// Returns the identifier for a Fork from a URL and block number.
38    pub fn new(url: &str, num: Option<u64>) -> Self {
39        let mut id = url.to_string();
40        id.push('@');
41        match num {
42            Some(n) => write!(id, "{n:#x}").unwrap(),
43            None => id.push_str("latest"),
44        }
45        Self(id)
46    }
47
48    /// Returns the identifier of the fork.
49    pub fn as_str(&self) -> &str {
50        &self.0
51    }
52}
53
54impl fmt::Display for ForkId {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        self.0.fmt(f)
57    }
58}
59
60impl<T: Into<String>> From<T> for ForkId {
61    fn from(id: T) -> Self {
62        Self(id.into())
63    }
64}
65
66/// The Sender half of multi fork pair.
67/// Can send requests to the `MultiForkHandler` to create forks.
68#[derive(Clone, Debug)]
69#[must_use]
70pub struct MultiFork {
71    /// Channel to send `Request`s to the handler.
72    handler: Sender<Request>,
73    /// Ensures that all rpc resources get flushed properly.
74    _shutdown: Arc<ShutDownMultiFork>,
75}
76
77impl MultiFork {
78    /// Creates a new pair and spawns the `MultiForkHandler` on a background thread.
79    pub fn spawn() -> Self {
80        trace!(target: "fork::multi", "spawning multifork");
81
82        let (fork, mut handler) = Self::new();
83        // Spawn a light-weight thread with a thread-local async runtime just for
84        // sending and receiving data from the remote client(s).
85        std::thread::Builder::new()
86            .name("multi-fork-backend".into())
87            .spawn(move || {
88                let rt = tokio::runtime::Builder::new_current_thread()
89                    .enable_all()
90                    .build()
91                    .expect("failed to build tokio runtime");
92
93                rt.block_on(async move {
94                    // Flush cache every 60s, this ensures that long-running fork tests get their
95                    // cache flushed from time to time.
96                    // NOTE: we install the interval here because the `tokio::timer::Interval`
97                    // requires a rt.
98                    handler.set_flush_cache_interval(Duration::from_secs(60));
99                    handler.await
100                });
101            })
102            .expect("failed to spawn thread");
103        trace!(target: "fork::multi", "spawned MultiForkHandler thread");
104        fork
105    }
106
107    /// Creates a new pair multi fork pair.
108    ///
109    /// Use [`spawn`](Self::spawn) instead.
110    #[doc(hidden)]
111    pub fn new() -> (Self, MultiForkHandler) {
112        let (handler, handler_rx) = channel(1);
113        let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
114        (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
115    }
116
117    /// Returns a fork backend.
118    ///
119    /// If no matching fork backend exists it will be created.
120    pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
121        trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
122        let (sender, rx) = oneshot_channel();
123        let req = Request::CreateFork(Box::new(fork), sender);
124        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
125        rx.recv()?
126    }
127
128    /// Rolls the block of the fork.
129    ///
130    /// If no matching fork backend exists it will be created.
131    pub fn roll_fork(
132        &self,
133        fork: ForkId,
134        block: u64,
135    ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
136        trace!(?fork, ?block, "rolling fork");
137        let (sender, rx) = oneshot_channel();
138        let req = Request::RollFork(fork, block, sender);
139        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
140        rx.recv()?
141    }
142
143    /// Returns the `Env` of the given fork, if any.
144    pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
145        trace!(?fork, "getting env config");
146        let (sender, rx) = oneshot_channel();
147        let req = Request::GetEnv(fork, sender);
148        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
149        Ok(rx.recv()?)
150    }
151
152    /// Updates block number and timestamp of given fork with new values.
153    pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
154        trace!(?fork, ?number, ?timestamp, "update fork block");
155        self.handler
156            .clone()
157            .try_send(Request::UpdateBlock(fork, number, timestamp))
158            .map_err(|e| eyre::eyre!("{:?}", e))
159    }
160
161    /// Returns the corresponding fork if it exists.
162    ///
163    /// Returns `None` if no matching fork backend is available.
164    pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
165        let id = id.into();
166        trace!(?id, "get fork backend");
167        let (sender, rx) = oneshot_channel();
168        let req = Request::GetFork(id, sender);
169        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
170        Ok(rx.recv()?)
171    }
172
173    /// Returns the corresponding fork url if it exists.
174    ///
175    /// Returns `None` if no matching fork is available.
176    pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
177        let (sender, rx) = oneshot_channel();
178        let req = Request::GetForkUrl(id.into(), sender);
179        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
180        Ok(rx.recv()?)
181    }
182}
183
184type Handler = BackendHandler<Arc<RetryProvider>>;
185
186type CreateFuture =
187    Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
188type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
189type GetEnvSender = OneshotSender<Option<Env>>;
190
191/// Request that's send to the handler.
192#[derive(Debug)]
193enum Request {
194    /// Creates a new ForkBackend.
195    CreateFork(Box<CreateFork>, CreateSender),
196    /// Returns the Fork backend for the `ForkId` if it exists.
197    GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
198    /// Adjusts the block that's being forked, by creating a new fork at the new block.
199    RollFork(ForkId, u64, CreateSender),
200    /// Returns the environment of the fork.
201    GetEnv(ForkId, GetEnvSender),
202    /// Updates the block number and timestamp of the fork.
203    UpdateBlock(ForkId, U256, U256),
204    /// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
205    ShutDown(OneshotSender<()>),
206    /// Returns the Fork Url for the `ForkId` if it exists.
207    GetForkUrl(ForkId, OneshotSender<Option<String>>),
208}
209
210enum ForkTask {
211    /// Contains the future that will establish a new fork.
212    Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
213}
214
215/// The type that manages connections in the background.
216#[must_use = "futures do nothing unless polled"]
217pub struct MultiForkHandler {
218    /// Incoming requests from the `MultiFork`.
219    incoming: Fuse<Receiver<Request>>,
220
221    /// All active handlers.
222    ///
223    /// It's expected that this list will be rather small (<10).
224    handlers: Vec<(ForkId, Handler)>,
225
226    // tasks currently in progress
227    pending_tasks: Vec<ForkTask>,
228
229    /// All _unique_ forkids mapped to their corresponding backend.
230    ///
231    /// Note: The backend can be shared by multiple ForkIds if the target the same provider and
232    /// block number.
233    forks: HashMap<ForkId, CreatedFork>,
234
235    /// Optional periodic interval to flush rpc cache.
236    flush_cache_interval: Option<tokio::time::Interval>,
237}
238
239impl MultiForkHandler {
240    fn new(incoming: Receiver<Request>) -> Self {
241        Self {
242            incoming: incoming.fuse(),
243            handlers: Default::default(),
244            pending_tasks: Default::default(),
245            forks: Default::default(),
246            flush_cache_interval: None,
247        }
248    }
249
250    /// Sets the interval after which all rpc caches should be flushed periodically.
251    pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
252        self.flush_cache_interval =
253            Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
254        self
255    }
256
257    /// Returns the list of additional senders of a matching task for the given id, if any.
258    #[expect(irrefutable_let_patterns)]
259    fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
260        for task in &mut self.pending_tasks {
261            if let ForkTask::Create(_, in_progress, _, additional) = task {
262                if in_progress == id {
263                    return Some(additional);
264                }
265            }
266        }
267        None
268    }
269
270    fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
271        let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
272        trace!(?fork_id, "created new forkId");
273
274        // There could already be a task for the requested fork in progress.
275        if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
276            in_progress.push(sender);
277            return;
278        }
279
280        // Need to create a new fork.
281        let task = Box::pin(create_fork(fork));
282        self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
283    }
284
285    fn insert_new_fork(
286        &mut self,
287        fork_id: ForkId,
288        fork: CreatedFork,
289        sender: CreateSender,
290        additional_senders: Vec<CreateSender>,
291    ) {
292        self.forks.insert(fork_id.clone(), fork.clone());
293        let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
294
295        // Notify all additional senders and track unique forkIds.
296        for sender in additional_senders {
297            let next_fork_id = fork.inc_senders(fork_id.clone());
298            self.forks.insert(next_fork_id.clone(), fork.clone());
299            let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
300        }
301    }
302
303    /// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
304    /// cheatcodes when new fork selected.
305    fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
306        if let Some(fork) = self.forks.get_mut(&fork_id) {
307            fork.opts.env.block.number = block_number;
308            fork.opts.env.block.timestamp = block_timestamp;
309        }
310    }
311
312    fn on_request(&mut self, req: Request) {
313        match req {
314            Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
315            Request::GetFork(fork_id, sender) => {
316                let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
317                let _ = sender.send(fork);
318            }
319            Request::RollFork(fork_id, block, sender) => {
320                if let Some(fork) = self.forks.get(&fork_id) {
321                    trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
322                    let mut opts = fork.opts.clone();
323                    opts.evm_opts.fork_block_number = Some(block);
324                    self.create_fork(opts, sender)
325                } else {
326                    let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
327                }
328            }
329            Request::GetEnv(fork_id, sender) => {
330                let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
331            }
332            Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
333                self.update_block(fork_id, block_number, block_timestamp);
334            }
335            Request::ShutDown(sender) => {
336                trace!(target: "fork::multi", "received shutdown signal");
337                // We're emptying all fork backends, this way we ensure all caches get flushed.
338                self.forks.clear();
339                self.handlers.clear();
340                let _ = sender.send(());
341            }
342            Request::GetForkUrl(fork_id, sender) => {
343                let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
344                let _ = sender.send(fork);
345            }
346        }
347    }
348}
349
350// Drives all handler to completion.
351// This future will finish once all underlying BackendHandler are completed.
352impl Future for MultiForkHandler {
353    type Output = ();
354
355    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
356        let pin = self.get_mut();
357
358        // Receive new requests.
359        loop {
360            match Pin::new(&mut pin.incoming).poll_next(cx) {
361                Poll::Ready(Some(req)) => {
362                    pin.on_request(req);
363                }
364                Poll::Ready(None) => {
365                    // Channel closed, but we still need to drive the fork handlers to completion.
366                    trace!(target: "fork::multi", "request channel closed");
367                    break;
368                }
369                Poll::Pending => break,
370            }
371        }
372
373        // Advance all tasks.
374        for n in (0..pin.pending_tasks.len()).rev() {
375            let task = pin.pending_tasks.swap_remove(n);
376            match task {
377                ForkTask::Create(mut fut, id, sender, additional_senders) => {
378                    if let Poll::Ready(resp) = fut.poll_unpin(cx) {
379                        match resp {
380                            Ok((fork_id, fork, handler)) => {
381                                if let Some(fork) = pin.forks.get(&fork_id).cloned() {
382                                    pin.insert_new_fork(
383                                        fork.inc_senders(fork_id),
384                                        fork,
385                                        sender,
386                                        additional_senders,
387                                    );
388                                } else {
389                                    pin.handlers.push((fork_id.clone(), handler));
390                                    pin.insert_new_fork(fork_id, fork, sender, additional_senders);
391                                }
392                            }
393                            Err(err) => {
394                                let _ = sender.send(Err(eyre::eyre!("{err}")));
395                                for sender in additional_senders {
396                                    let _ = sender.send(Err(eyre::eyre!("{err}")));
397                                }
398                            }
399                        }
400                    } else {
401                        pin.pending_tasks.push(ForkTask::Create(
402                            fut,
403                            id,
404                            sender,
405                            additional_senders,
406                        ));
407                    }
408                }
409            }
410        }
411
412        // Advance all handlers.
413        for n in (0..pin.handlers.len()).rev() {
414            let (id, mut handler) = pin.handlers.swap_remove(n);
415            match handler.poll_unpin(cx) {
416                Poll::Ready(_) => {
417                    trace!(target: "fork::multi", "fork {:?} completed", id);
418                }
419                Poll::Pending => {
420                    pin.handlers.push((id, handler));
421                }
422            }
423        }
424
425        if pin.handlers.is_empty() && pin.incoming.is_done() {
426            trace!(target: "fork::multi", "completed");
427            return Poll::Ready(());
428        }
429
430        // Periodically flush cached RPC state.
431        if pin
432            .flush_cache_interval
433            .as_mut()
434            .map(|interval| interval.poll_tick(cx).is_ready())
435            .unwrap_or_default() &&
436            !pin.forks.is_empty()
437        {
438            trace!(target: "fork::multi", "tick flushing caches");
439            let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
440            // Flush this on new thread to not block here.
441            std::thread::Builder::new()
442                .name("flusher".into())
443                .spawn(move || {
444                    forks.into_iter().for_each(|fork| fork.flush_cache());
445                })
446                .expect("failed to spawn thread");
447        }
448
449        Poll::Pending
450    }
451}
452
453/// Tracks the created Fork
454#[derive(Debug, Clone)]
455struct CreatedFork {
456    /// How the fork was initially created.
457    opts: CreateFork,
458    /// Copy of the sender.
459    backend: SharedBackend,
460    /// How many consumers there are, since a `SharedBacked` can be used by multiple
461    /// consumers.
462    num_senders: Arc<AtomicUsize>,
463}
464
465impl CreatedFork {
466    pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
467        Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
468    }
469
470    /// Increment senders and return unique identifier of the fork.
471    fn inc_senders(&self, fork_id: ForkId) -> ForkId {
472        format!(
473            "{}-{}",
474            fork_id.as_str(),
475            self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
476        )
477        .into()
478    }
479}
480
481/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
482///
483/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
484///
485/// This type intentionally does not implement `Clone` since it's intended that there's only once
486/// instance.
487#[derive(Debug)]
488struct ShutDownMultiFork {
489    handler: Option<Sender<Request>>,
490}
491
492impl Drop for ShutDownMultiFork {
493    fn drop(&mut self) {
494        trace!(target: "fork::multi", "initiating shutdown");
495        let (sender, rx) = oneshot_channel();
496        let req = Request::ShutDown(sender);
497        if let Some(mut handler) = self.handler.take() {
498            if handler.try_send(req).is_ok() {
499                let _ = rx.recv();
500                trace!(target: "fork::cache", "multifork backend shutdown");
501            }
502        }
503    }
504}
505
506/// Creates a new fork.
507///
508/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
509async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
510    let provider = Arc::new(
511        ProviderBuilder::new(fork.url.as_str())
512            .maybe_max_retry(fork.evm_opts.fork_retries)
513            .maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
514            .maybe_headers(fork.evm_opts.fork_headers.clone())
515            .compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
516            .build()?,
517    );
518
519    // Initialise the fork environment.
520    let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
521    fork.env = env;
522    let meta = BlockchainDbMeta::new(fork.env.clone(), fork.url.clone());
523
524    // We need to use the block number from the block because the env's number can be different on
525    // some L2s (e.g. Arbitrum).
526    let number = block.header().number();
527
528    // Determine the cache path if caching is enabled.
529    let cache_path = if fork.enable_caching {
530        Config::foundry_block_cache_dir(meta.cfg_env.chain_id, number)
531    } else {
532        None
533    };
534
535    let db = BlockchainDb::new(meta, cache_path);
536    let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
537    let fork = CreatedFork::new(fork, backend);
538    let fork_id = ForkId::new(&fork.opts.url, number.into());
539
540    Ok((fork_id, fork, handler))
541}