foundry_evm_core/fork/
multi.rs

1//! Support for running multiple fork backends.
2//!
3//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
4//! concurrently active pairs at once.
5
6use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{U256, map::HashMap};
10use alloy_provider::network::BlockResponse;
11use foundry_common::provider::{ProviderBuilder, RetryProvider};
12use foundry_config::Config;
13use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
14use futures::{
15    FutureExt, StreamExt,
16    channel::mpsc::{Receiver, Sender, channel},
17    stream::{Fuse, Stream},
18    task::{Context, Poll},
19};
20use revm::context::BlockEnv;
21use std::{
22    fmt::{self, Write},
23    pin::Pin,
24    sync::{
25        Arc,
26        atomic::AtomicUsize,
27        mpsc::{Sender as OneshotSender, channel as oneshot_channel},
28    },
29    time::Duration,
30};
31
32/// The _unique_ identifier for a specific fork, this could be the name of the network a custom
33/// descriptive name.
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct ForkId(pub String);
36
37impl ForkId {
38    /// Returns the identifier for a Fork from a URL and block number.
39    pub fn new(url: &str, num: Option<u64>) -> Self {
40        let mut id = url.to_string();
41        id.push('@');
42        match num {
43            Some(n) => write!(id, "{n:#x}").unwrap(),
44            None => id.push_str("latest"),
45        }
46        Self(id)
47    }
48
49    /// Returns the identifier of the fork.
50    pub fn as_str(&self) -> &str {
51        &self.0
52    }
53}
54
55impl fmt::Display for ForkId {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        self.0.fmt(f)
58    }
59}
60
61impl<T: Into<String>> From<T> for ForkId {
62    fn from(id: T) -> Self {
63        Self(id.into())
64    }
65}
66
67/// The Sender half of multi fork pair.
68/// Can send requests to the `MultiForkHandler` to create forks.
69#[derive(Clone, Debug)]
70#[must_use]
71pub struct MultiFork {
72    /// Channel to send `Request`s to the handler.
73    handler: Sender<Request>,
74    /// Ensures that all rpc resources get flushed properly.
75    _shutdown: Arc<ShutDownMultiFork>,
76}
77
78impl MultiFork {
79    /// Creates a new pair and spawns the `MultiForkHandler` on a background thread.
80    pub fn spawn() -> Self {
81        trace!(target: "fork::multi", "spawning multifork");
82
83        let (fork, mut handler) = Self::new();
84        // Spawn a light-weight thread with a thread-local async runtime just for
85        // sending and receiving data from the remote client(s).
86        std::thread::Builder::new()
87            .name("multi-fork-backend".into())
88            .spawn(move || {
89                let rt = tokio::runtime::Builder::new_current_thread()
90                    .enable_all()
91                    .build()
92                    .expect("failed to build tokio runtime");
93
94                rt.block_on(async move {
95                    // Flush cache every 60s, this ensures that long-running fork tests get their
96                    // cache flushed from time to time.
97                    // NOTE: we install the interval here because the `tokio::timer::Interval`
98                    // requires a rt.
99                    handler.set_flush_cache_interval(Duration::from_secs(60));
100                    handler.await
101                });
102            })
103            .expect("failed to spawn thread");
104        trace!(target: "fork::multi", "spawned MultiForkHandler thread");
105        fork
106    }
107
108    /// Creates a new pair multi fork pair.
109    ///
110    /// Use [`spawn`](Self::spawn) instead.
111    #[doc(hidden)]
112    pub fn new() -> (Self, MultiForkHandler) {
113        let (handler, handler_rx) = channel(1);
114        let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
115        (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
116    }
117
118    /// Returns a fork backend.
119    ///
120    /// If no matching fork backend exists it will be created.
121    pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
122        trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
123        let (sender, rx) = oneshot_channel();
124        let req = Request::CreateFork(Box::new(fork), sender);
125        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
126        rx.recv()?
127    }
128
129    /// Rolls the block of the fork.
130    ///
131    /// If no matching fork backend exists it will be created.
132    pub fn roll_fork(
133        &self,
134        fork: ForkId,
135        block: u64,
136    ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
137        trace!(?fork, ?block, "rolling fork");
138        let (sender, rx) = oneshot_channel();
139        let req = Request::RollFork(fork, block, sender);
140        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
141        rx.recv()?
142    }
143
144    /// Returns the `Env` of the given fork, if any.
145    pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
146        trace!(?fork, "getting env config");
147        let (sender, rx) = oneshot_channel();
148        let req = Request::GetEnv(fork, sender);
149        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
150        Ok(rx.recv()?)
151    }
152
153    /// Updates block number and timestamp of given fork with new values.
154    pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
155        trace!(?fork, ?number, ?timestamp, "update fork block");
156        self.handler
157            .clone()
158            .try_send(Request::UpdateBlock(fork, number, timestamp))
159            .map_err(|e| eyre::eyre!("{:?}", e))
160    }
161
162    /// Updates the fork's entire env
163    ///
164    /// This is required for tx level forking where we need to fork off the `block - 1` state but
165    /// still need use env settings for `env`.
166    pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
167        trace!(?fork, ?env, "update fork block");
168        self.handler
169            .clone()
170            .try_send(Request::UpdateEnv(fork, env))
171            .map_err(|e| eyre::eyre!("{:?}", e))
172    }
173
174    /// Returns the corresponding fork if it exists.
175    ///
176    /// Returns `None` if no matching fork backend is available.
177    pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
178        let id = id.into();
179        trace!(?id, "get fork backend");
180        let (sender, rx) = oneshot_channel();
181        let req = Request::GetFork(id, sender);
182        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
183        Ok(rx.recv()?)
184    }
185
186    /// Returns the corresponding fork url if it exists.
187    ///
188    /// Returns `None` if no matching fork is available.
189    pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
190        let (sender, rx) = oneshot_channel();
191        let req = Request::GetForkUrl(id.into(), sender);
192        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
193        Ok(rx.recv()?)
194    }
195}
196
197type Handler = BackendHandler<Arc<RetryProvider>>;
198
199type CreateFuture =
200    Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
201type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
202type GetEnvSender = OneshotSender<Option<Env>>;
203
204/// Request that's send to the handler.
205#[derive(Debug)]
206enum Request {
207    /// Creates a new ForkBackend.
208    CreateFork(Box<CreateFork>, CreateSender),
209    /// Returns the Fork backend for the `ForkId` if it exists.
210    GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
211    /// Adjusts the block that's being forked, by creating a new fork at the new block.
212    RollFork(ForkId, u64, CreateSender),
213    /// Returns the environment of the fork.
214    GetEnv(ForkId, GetEnvSender),
215    /// Updates the block number and timestamp of the fork.
216    UpdateBlock(ForkId, U256, U256),
217    /// Updates the block the entire block env,
218    UpdateEnv(ForkId, BlockEnv),
219    /// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
220    ShutDown(OneshotSender<()>),
221    /// Returns the Fork Url for the `ForkId` if it exists.
222    GetForkUrl(ForkId, OneshotSender<Option<String>>),
223}
224
225enum ForkTask {
226    /// Contains the future that will establish a new fork.
227    Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
228}
229
230/// The type that manages connections in the background.
231#[must_use = "futures do nothing unless polled"]
232pub struct MultiForkHandler {
233    /// Incoming requests from the `MultiFork`.
234    incoming: Fuse<Receiver<Request>>,
235
236    /// All active handlers.
237    ///
238    /// It's expected that this list will be rather small (<10).
239    handlers: Vec<(ForkId, Handler)>,
240
241    // tasks currently in progress
242    pending_tasks: Vec<ForkTask>,
243
244    /// All _unique_ forkids mapped to their corresponding backend.
245    ///
246    /// Note: The backend can be shared by multiple ForkIds if the target the same provider and
247    /// block number.
248    forks: HashMap<ForkId, CreatedFork>,
249
250    /// Optional periodic interval to flush rpc cache.
251    flush_cache_interval: Option<tokio::time::Interval>,
252}
253
254impl MultiForkHandler {
255    fn new(incoming: Receiver<Request>) -> Self {
256        Self {
257            incoming: incoming.fuse(),
258            handlers: Default::default(),
259            pending_tasks: Default::default(),
260            forks: Default::default(),
261            flush_cache_interval: None,
262        }
263    }
264
265    /// Sets the interval after which all rpc caches should be flushed periodically.
266    pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
267        self.flush_cache_interval =
268            Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
269        self
270    }
271
272    /// Returns the list of additional senders of a matching task for the given id, if any.
273    #[expect(irrefutable_let_patterns)]
274    fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
275        for task in &mut self.pending_tasks {
276            if let ForkTask::Create(_, in_progress, _, additional) = task
277                && in_progress == id
278            {
279                return Some(additional);
280            }
281        }
282        None
283    }
284
285    fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
286        let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
287        trace!(?fork_id, "created new forkId");
288
289        // There could already be a task for the requested fork in progress.
290        if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
291            in_progress.push(sender);
292            return;
293        }
294
295        // Need to create a new fork.
296        let task = Box::pin(create_fork(fork));
297        self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
298    }
299
300    fn insert_new_fork(
301        &mut self,
302        fork_id: ForkId,
303        fork: CreatedFork,
304        sender: CreateSender,
305        additional_senders: Vec<CreateSender>,
306    ) {
307        self.forks.insert(fork_id.clone(), fork.clone());
308        let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
309
310        // Notify all additional senders and track unique forkIds.
311        for sender in additional_senders {
312            let next_fork_id = fork.inc_senders(fork_id.clone());
313            self.forks.insert(next_fork_id.clone(), fork.clone());
314            let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
315        }
316    }
317
318    /// Update the fork's block entire env
319    fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
320        if let Some(fork) = self.forks.get_mut(&fork_id) {
321            fork.opts.env.evm_env.block_env = env;
322        }
323    }
324    /// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
325    /// cheatcodes when new fork selected.
326    fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
327        if let Some(fork) = self.forks.get_mut(&fork_id) {
328            fork.opts.env.evm_env.block_env.number = block_number;
329            fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
330        }
331    }
332
333    fn on_request(&mut self, req: Request) {
334        match req {
335            Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
336            Request::GetFork(fork_id, sender) => {
337                let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
338                let _ = sender.send(fork);
339            }
340            Request::RollFork(fork_id, block, sender) => {
341                if let Some(fork) = self.forks.get(&fork_id) {
342                    trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
343                    let mut opts = fork.opts.clone();
344                    opts.evm_opts.fork_block_number = Some(block);
345                    self.create_fork(opts, sender)
346                } else {
347                    let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
348                }
349            }
350            Request::GetEnv(fork_id, sender) => {
351                let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
352            }
353            Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
354                self.update_block(fork_id, block_number, block_timestamp);
355            }
356            Request::UpdateEnv(fork_id, block_env) => {
357                self.update_env(fork_id, block_env);
358            }
359            Request::ShutDown(sender) => {
360                trace!(target: "fork::multi", "received shutdown signal");
361                // We're emptying all fork backends, this way we ensure all caches get flushed.
362                self.forks.clear();
363                self.handlers.clear();
364                let _ = sender.send(());
365            }
366            Request::GetForkUrl(fork_id, sender) => {
367                let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
368                let _ = sender.send(fork);
369            }
370        }
371    }
372}
373
374// Drives all handler to completion.
375// This future will finish once all underlying BackendHandler are completed.
376impl Future for MultiForkHandler {
377    type Output = ();
378
379    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
380        let pin = self.get_mut();
381
382        // Receive new requests.
383        loop {
384            match Pin::new(&mut pin.incoming).poll_next(cx) {
385                Poll::Ready(Some(req)) => {
386                    pin.on_request(req);
387                }
388                Poll::Ready(None) => {
389                    // Channel closed, but we still need to drive the fork handlers to completion.
390                    trace!(target: "fork::multi", "request channel closed");
391                    break;
392                }
393                Poll::Pending => break,
394            }
395        }
396
397        // Advance all tasks.
398        for n in (0..pin.pending_tasks.len()).rev() {
399            let task = pin.pending_tasks.swap_remove(n);
400            match task {
401                ForkTask::Create(mut fut, id, sender, additional_senders) => {
402                    if let Poll::Ready(resp) = fut.poll_unpin(cx) {
403                        match resp {
404                            Ok((fork_id, fork, handler)) => {
405                                if let Some(fork) = pin.forks.get(&fork_id).cloned() {
406                                    pin.insert_new_fork(
407                                        fork.inc_senders(fork_id),
408                                        fork,
409                                        sender,
410                                        additional_senders,
411                                    );
412                                } else {
413                                    pin.handlers.push((fork_id.clone(), handler));
414                                    pin.insert_new_fork(fork_id, fork, sender, additional_senders);
415                                }
416                            }
417                            Err(err) => {
418                                let _ = sender.send(Err(eyre::eyre!("{err}")));
419                                for sender in additional_senders {
420                                    let _ = sender.send(Err(eyre::eyre!("{err}")));
421                                }
422                            }
423                        }
424                    } else {
425                        pin.pending_tasks.push(ForkTask::Create(
426                            fut,
427                            id,
428                            sender,
429                            additional_senders,
430                        ));
431                    }
432                }
433            }
434        }
435
436        // Advance all handlers.
437        for n in (0..pin.handlers.len()).rev() {
438            let (id, mut handler) = pin.handlers.swap_remove(n);
439            match handler.poll_unpin(cx) {
440                Poll::Ready(_) => {
441                    trace!(target: "fork::multi", "fork {:?} completed", id);
442                }
443                Poll::Pending => {
444                    pin.handlers.push((id, handler));
445                }
446            }
447        }
448
449        if pin.handlers.is_empty() && pin.incoming.is_done() {
450            trace!(target: "fork::multi", "completed");
451            return Poll::Ready(());
452        }
453
454        // Periodically flush cached RPC state.
455        if pin
456            .flush_cache_interval
457            .as_mut()
458            .map(|interval| interval.poll_tick(cx).is_ready())
459            .unwrap_or_default()
460            && !pin.forks.is_empty()
461        {
462            trace!(target: "fork::multi", "tick flushing caches");
463            let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
464            // Flush this on new thread to not block here.
465            std::thread::Builder::new()
466                .name("flusher".into())
467                .spawn(move || {
468                    forks.into_iter().for_each(|fork| fork.flush_cache());
469                })
470                .expect("failed to spawn thread");
471        }
472
473        Poll::Pending
474    }
475}
476
477/// Tracks the created Fork
478#[derive(Debug, Clone)]
479struct CreatedFork {
480    /// How the fork was initially created.
481    opts: CreateFork,
482    /// Copy of the sender.
483    backend: SharedBackend,
484    /// How many consumers there are, since a `SharedBacked` can be used by multiple
485    /// consumers.
486    num_senders: Arc<AtomicUsize>,
487}
488
489impl CreatedFork {
490    pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
491        Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
492    }
493
494    /// Increment senders and return unique identifier of the fork.
495    fn inc_senders(&self, fork_id: ForkId) -> ForkId {
496        format!(
497            "{}-{}",
498            fork_id.as_str(),
499            self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
500        )
501        .into()
502    }
503}
504
505/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
506///
507/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
508///
509/// This type intentionally does not implement `Clone` since it's intended that there's only once
510/// instance.
511#[derive(Debug)]
512struct ShutDownMultiFork {
513    handler: Option<Sender<Request>>,
514}
515
516impl Drop for ShutDownMultiFork {
517    fn drop(&mut self) {
518        trace!(target: "fork::multi", "initiating shutdown");
519        let (sender, rx) = oneshot_channel();
520        let req = Request::ShutDown(sender);
521        if let Some(mut handler) = self.handler.take()
522            && handler.try_send(req).is_ok()
523        {
524            let _ = rx.recv();
525            trace!(target: "fork::cache", "multifork backend shutdown");
526        }
527    }
528}
529
530/// Creates a new fork.
531///
532/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
533async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
534    let provider = Arc::new(
535        ProviderBuilder::new(fork.url.as_str())
536            .maybe_max_retry(fork.evm_opts.fork_retries)
537            .maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
538            .maybe_headers(fork.evm_opts.fork_headers.clone())
539            .compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
540            .build()?,
541    );
542
543    // Initialise the fork environment.
544    let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
545    fork.env = env;
546    let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
547
548    // We need to use the block number from the block because the env's number can be different on
549    // some L2s (e.g. Arbitrum).
550    let number = block.header().number();
551
552    // Determine the cache path if caching is enabled.
553    let cache_path = if fork.enable_caching {
554        Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
555    } else {
556        None
557    };
558
559    let db = BlockchainDb::new(meta, cache_path);
560    let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
561    let fork = CreatedFork::new(fork, backend);
562    let fork_id = ForkId::new(&fork.opts.url, Some(number));
563
564    Ok((fork_id, fork, handler))
565}