foundry_evm_core/fork/
multi.rs

1//! Support for running multiple fork backends.
2//!
3//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
4//! concurrently active pairs at once.
5
6use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{U256, map::HashMap};
10use alloy_provider::network::BlockResponse;
11use foundry_common::provider::{ProviderBuilder, RetryProvider};
12use foundry_config::Config;
13use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
14use futures::{
15    FutureExt, StreamExt,
16    channel::mpsc::{Receiver, Sender, channel},
17    stream::{Fuse, Stream},
18    task::{Context, Poll},
19};
20use revm::context::BlockEnv;
21use std::{
22    fmt::{self, Write},
23    pin::Pin,
24    sync::{
25        Arc,
26        atomic::AtomicUsize,
27        mpsc::{Sender as OneshotSender, channel as oneshot_channel},
28    },
29    time::Duration,
30};
31
32/// The _unique_ identifier for a specific fork, this could be the name of the network a custom
33/// descriptive name.
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct ForkId(pub String);
36
37impl ForkId {
38    /// Returns the identifier for a Fork from a URL and block number.
39    pub fn new(url: &str, num: Option<u64>) -> Self {
40        let mut id = url.to_string();
41        id.push('@');
42        match num {
43            Some(n) => write!(id, "{n:#x}").unwrap(),
44            None => id.push_str("latest"),
45        }
46        Self(id)
47    }
48
49    /// Returns the identifier of the fork.
50    pub fn as_str(&self) -> &str {
51        &self.0
52    }
53}
54
55impl fmt::Display for ForkId {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        self.0.fmt(f)
58    }
59}
60
61impl<T: Into<String>> From<T> for ForkId {
62    fn from(id: T) -> Self {
63        Self(id.into())
64    }
65}
66
67/// The Sender half of multi fork pair.
68/// Can send requests to the `MultiForkHandler` to create forks.
69#[derive(Clone, Debug)]
70#[must_use]
71pub struct MultiFork {
72    /// Channel to send `Request`s to the handler.
73    handler: Sender<Request>,
74    /// Ensures that all rpc resources get flushed properly.
75    _shutdown: Arc<ShutDownMultiFork>,
76}
77
78impl MultiFork {
79    /// Creates a new pair and spawns the `MultiForkHandler` on a background thread.
80    pub fn spawn() -> Self {
81        trace!(target: "fork::multi", "spawning multifork");
82
83        let (fork, mut handler) = Self::new();
84
85        // Spawn a light-weight thread just for sending and receiving data from the remote
86        // client(s).
87        let fut = async move {
88            // Flush cache every 60s, this ensures that long-running fork tests get their
89            // cache flushed from time to time.
90            // NOTE: we install the interval here because the `tokio::timer::Interval`
91            // requires a rt.
92            handler.set_flush_cache_interval(Duration::from_secs(60));
93            handler.await
94        };
95        match tokio::runtime::Handle::try_current() {
96            Ok(rt) => _ = rt.spawn(fut),
97            Err(_) => {
98                _ = std::thread::Builder::new()
99                    .name("multi-fork-backend".into())
100                    .spawn(move || {
101                        tokio::runtime::Builder::new_current_thread()
102                            .enable_all()
103                            .build()
104                            .expect("failed to build tokio runtime")
105                            .block_on(fut)
106                    })
107                    .expect("failed to spawn thread")
108            }
109        }
110
111        trace!(target: "fork::multi", "spawned MultiForkHandler thread");
112        fork
113    }
114
115    /// Creates a new pair multi fork pair.
116    ///
117    /// Use [`spawn`](Self::spawn) instead.
118    #[doc(hidden)]
119    pub fn new() -> (Self, MultiForkHandler) {
120        let (handler, handler_rx) = channel(1);
121        let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
122        (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
123    }
124
125    /// Returns a fork backend.
126    ///
127    /// If no matching fork backend exists it will be created.
128    pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
129        trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
130        let (sender, rx) = oneshot_channel();
131        let req = Request::CreateFork(Box::new(fork), sender);
132        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
133        rx.recv()?
134    }
135
136    /// Rolls the block of the fork.
137    ///
138    /// If no matching fork backend exists it will be created.
139    pub fn roll_fork(
140        &self,
141        fork: ForkId,
142        block: u64,
143    ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
144        trace!(?fork, ?block, "rolling fork");
145        let (sender, rx) = oneshot_channel();
146        let req = Request::RollFork(fork, block, sender);
147        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
148        rx.recv()?
149    }
150
151    /// Returns the `Env` of the given fork, if any.
152    pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
153        trace!(?fork, "getting env config");
154        let (sender, rx) = oneshot_channel();
155        let req = Request::GetEnv(fork, sender);
156        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
157        Ok(rx.recv()?)
158    }
159
160    /// Updates block number and timestamp of given fork with new values.
161    pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
162        trace!(?fork, ?number, ?timestamp, "update fork block");
163        self.handler
164            .clone()
165            .try_send(Request::UpdateBlock(fork, number, timestamp))
166            .map_err(|e| eyre::eyre!("{:?}", e))
167    }
168
169    /// Updates the fork's entire env
170    ///
171    /// This is required for tx level forking where we need to fork off the `block - 1` state but
172    /// still need use env settings for `env`.
173    pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
174        trace!(?fork, ?env, "update fork block");
175        self.handler
176            .clone()
177            .try_send(Request::UpdateEnv(fork, env))
178            .map_err(|e| eyre::eyre!("{:?}", e))
179    }
180
181    /// Returns the corresponding fork if it exists.
182    ///
183    /// Returns `None` if no matching fork backend is available.
184    pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
185        let id = id.into();
186        trace!(?id, "get fork backend");
187        let (sender, rx) = oneshot_channel();
188        let req = Request::GetFork(id, sender);
189        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
190        Ok(rx.recv()?)
191    }
192
193    /// Returns the corresponding fork url if it exists.
194    ///
195    /// Returns `None` if no matching fork is available.
196    pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
197        let (sender, rx) = oneshot_channel();
198        let req = Request::GetForkUrl(id.into(), sender);
199        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
200        Ok(rx.recv()?)
201    }
202}
203
204type Handler = BackendHandler<Arc<RetryProvider>>;
205
206type CreateFuture =
207    Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
208type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
209type GetEnvSender = OneshotSender<Option<Env>>;
210
211/// Request that's send to the handler.
212#[derive(Debug)]
213enum Request {
214    /// Creates a new ForkBackend.
215    CreateFork(Box<CreateFork>, CreateSender),
216    /// Returns the Fork backend for the `ForkId` if it exists.
217    GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
218    /// Adjusts the block that's being forked, by creating a new fork at the new block.
219    RollFork(ForkId, u64, CreateSender),
220    /// Returns the environment of the fork.
221    GetEnv(ForkId, GetEnvSender),
222    /// Updates the block number and timestamp of the fork.
223    UpdateBlock(ForkId, U256, U256),
224    /// Updates the block the entire block env,
225    UpdateEnv(ForkId, BlockEnv),
226    /// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
227    ShutDown(OneshotSender<()>),
228    /// Returns the Fork Url for the `ForkId` if it exists.
229    GetForkUrl(ForkId, OneshotSender<Option<String>>),
230}
231
232enum ForkTask {
233    /// Contains the future that will establish a new fork.
234    Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
235}
236
237/// The type that manages connections in the background.
238#[must_use = "futures do nothing unless polled"]
239pub struct MultiForkHandler {
240    /// Incoming requests from the `MultiFork`.
241    incoming: Fuse<Receiver<Request>>,
242
243    /// All active handlers.
244    ///
245    /// It's expected that this list will be rather small (<10).
246    handlers: Vec<(ForkId, Handler)>,
247
248    // tasks currently in progress
249    pending_tasks: Vec<ForkTask>,
250
251    /// All _unique_ forkids mapped to their corresponding backend.
252    ///
253    /// Note: The backend can be shared by multiple ForkIds if the target the same provider and
254    /// block number.
255    forks: HashMap<ForkId, CreatedFork>,
256
257    /// Optional periodic interval to flush rpc cache.
258    flush_cache_interval: Option<tokio::time::Interval>,
259}
260
261impl MultiForkHandler {
262    fn new(incoming: Receiver<Request>) -> Self {
263        Self {
264            incoming: incoming.fuse(),
265            handlers: Default::default(),
266            pending_tasks: Default::default(),
267            forks: Default::default(),
268            flush_cache_interval: None,
269        }
270    }
271
272    /// Sets the interval after which all rpc caches should be flushed periodically.
273    pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
274        self.flush_cache_interval =
275            Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
276        self
277    }
278
279    /// Returns the list of additional senders of a matching task for the given id, if any.
280    #[expect(irrefutable_let_patterns)]
281    fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
282        for task in &mut self.pending_tasks {
283            if let ForkTask::Create(_, in_progress, _, additional) = task
284                && in_progress == id
285            {
286                return Some(additional);
287            }
288        }
289        None
290    }
291
292    fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
293        let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
294        trace!(?fork_id, "created new forkId");
295
296        // There could already be a task for the requested fork in progress.
297        if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
298            in_progress.push(sender);
299            return;
300        }
301
302        // Need to create a new fork.
303        let task = Box::pin(create_fork(fork));
304        self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
305    }
306
307    fn insert_new_fork(
308        &mut self,
309        fork_id: ForkId,
310        fork: CreatedFork,
311        sender: CreateSender,
312        additional_senders: Vec<CreateSender>,
313    ) {
314        self.forks.insert(fork_id.clone(), fork.clone());
315        let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
316
317        // Notify all additional senders and track unique forkIds.
318        for sender in additional_senders {
319            let next_fork_id = fork.inc_senders(fork_id.clone());
320            self.forks.insert(next_fork_id.clone(), fork.clone());
321            let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
322        }
323    }
324
325    /// Update the fork's block entire env
326    fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
327        if let Some(fork) = self.forks.get_mut(&fork_id) {
328            fork.opts.env.evm_env.block_env = env;
329        }
330    }
331    /// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
332    /// cheatcodes when new fork selected.
333    fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
334        if let Some(fork) = self.forks.get_mut(&fork_id) {
335            fork.opts.env.evm_env.block_env.number = block_number;
336            fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
337        }
338    }
339
340    fn on_request(&mut self, req: Request) {
341        match req {
342            Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
343            Request::GetFork(fork_id, sender) => {
344                let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
345                let _ = sender.send(fork);
346            }
347            Request::RollFork(fork_id, block, sender) => {
348                if let Some(fork) = self.forks.get(&fork_id) {
349                    trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
350                    let mut opts = fork.opts.clone();
351                    opts.evm_opts.fork_block_number = Some(block);
352                    self.create_fork(opts, sender)
353                } else {
354                    let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
355                }
356            }
357            Request::GetEnv(fork_id, sender) => {
358                let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
359            }
360            Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
361                self.update_block(fork_id, block_number, block_timestamp);
362            }
363            Request::UpdateEnv(fork_id, block_env) => {
364                self.update_env(fork_id, block_env);
365            }
366            Request::ShutDown(sender) => {
367                trace!(target: "fork::multi", "received shutdown signal");
368                // We're emptying all fork backends, this way we ensure all caches get flushed.
369                self.forks.clear();
370                self.handlers.clear();
371                let _ = sender.send(());
372            }
373            Request::GetForkUrl(fork_id, sender) => {
374                let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
375                let _ = sender.send(fork);
376            }
377        }
378    }
379}
380
381// Drives all handler to completion.
382// This future will finish once all underlying BackendHandler are completed.
383impl Future for MultiForkHandler {
384    type Output = ();
385
386    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
387        let pin = self.get_mut();
388
389        // Receive new requests.
390        loop {
391            match Pin::new(&mut pin.incoming).poll_next(cx) {
392                Poll::Ready(Some(req)) => {
393                    pin.on_request(req);
394                }
395                Poll::Ready(None) => {
396                    // Channel closed, but we still need to drive the fork handlers to completion.
397                    trace!(target: "fork::multi", "request channel closed");
398                    break;
399                }
400                Poll::Pending => break,
401            }
402        }
403
404        // Advance all tasks.
405        for n in (0..pin.pending_tasks.len()).rev() {
406            let task = pin.pending_tasks.swap_remove(n);
407            match task {
408                ForkTask::Create(mut fut, id, sender, additional_senders) => {
409                    if let Poll::Ready(resp) = fut.poll_unpin(cx) {
410                        match resp {
411                            Ok((fork_id, fork, handler)) => {
412                                if let Some(fork) = pin.forks.get(&fork_id).cloned() {
413                                    pin.insert_new_fork(
414                                        fork.inc_senders(fork_id),
415                                        fork,
416                                        sender,
417                                        additional_senders,
418                                    );
419                                } else {
420                                    pin.handlers.push((fork_id.clone(), handler));
421                                    pin.insert_new_fork(fork_id, fork, sender, additional_senders);
422                                }
423                            }
424                            Err(err) => {
425                                let _ = sender.send(Err(eyre::eyre!("{err}")));
426                                for sender in additional_senders {
427                                    let _ = sender.send(Err(eyre::eyre!("{err}")));
428                                }
429                            }
430                        }
431                    } else {
432                        pin.pending_tasks.push(ForkTask::Create(
433                            fut,
434                            id,
435                            sender,
436                            additional_senders,
437                        ));
438                    }
439                }
440            }
441        }
442
443        // Advance all handlers.
444        for n in (0..pin.handlers.len()).rev() {
445            let (id, mut handler) = pin.handlers.swap_remove(n);
446            match handler.poll_unpin(cx) {
447                Poll::Ready(_) => {
448                    trace!(target: "fork::multi", "fork {:?} completed", id);
449                }
450                Poll::Pending => {
451                    pin.handlers.push((id, handler));
452                }
453            }
454        }
455
456        if pin.handlers.is_empty() && pin.incoming.is_done() {
457            trace!(target: "fork::multi", "completed");
458            return Poll::Ready(());
459        }
460
461        // Periodically flush cached RPC state.
462        if pin
463            .flush_cache_interval
464            .as_mut()
465            .map(|interval| interval.poll_tick(cx).is_ready())
466            .unwrap_or_default()
467            && !pin.forks.is_empty()
468        {
469            trace!(target: "fork::multi", "tick flushing caches");
470            let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
471            // Flush this on new thread to not block here.
472            std::thread::Builder::new()
473                .name("flusher".into())
474                .spawn(move || {
475                    forks.into_iter().for_each(|fork| fork.flush_cache());
476                })
477                .expect("failed to spawn thread");
478        }
479
480        Poll::Pending
481    }
482}
483
484/// Tracks the created Fork
485#[derive(Debug, Clone)]
486struct CreatedFork {
487    /// How the fork was initially created.
488    opts: CreateFork,
489    /// Copy of the sender.
490    backend: SharedBackend,
491    /// How many consumers there are, since a `SharedBacked` can be used by multiple
492    /// consumers.
493    num_senders: Arc<AtomicUsize>,
494}
495
496impl CreatedFork {
497    pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
498        Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
499    }
500
501    /// Increment senders and return unique identifier of the fork.
502    fn inc_senders(&self, fork_id: ForkId) -> ForkId {
503        format!(
504            "{}-{}",
505            fork_id.as_str(),
506            self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
507        )
508        .into()
509    }
510}
511
512/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
513///
514/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
515///
516/// This type intentionally does not implement `Clone` since it's intended that there's only once
517/// instance.
518#[derive(Debug)]
519struct ShutDownMultiFork {
520    handler: Option<Sender<Request>>,
521}
522
523impl Drop for ShutDownMultiFork {
524    fn drop(&mut self) {
525        trace!(target: "fork::multi", "initiating shutdown");
526        let (sender, rx) = oneshot_channel();
527        let req = Request::ShutDown(sender);
528        if let Some(mut handler) = self.handler.take()
529            && handler.try_send(req).is_ok()
530        {
531            let _ = rx.recv();
532            trace!(target: "fork::cache", "multifork backend shutdown");
533        }
534    }
535}
536
537/// Creates a new fork.
538///
539/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
540async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
541    let provider = Arc::new(
542        ProviderBuilder::new(fork.url.as_str())
543            .maybe_max_retry(fork.evm_opts.fork_retries)
544            .maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
545            .maybe_headers(fork.evm_opts.fork_headers.clone())
546            .compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
547            .build()?,
548    );
549
550    // Initialise the fork environment.
551    let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
552    fork.env = env;
553    let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
554
555    // We need to use the block number from the block because the env's number can be different on
556    // some L2s (e.g. Arbitrum).
557    let number = block.header().number();
558
559    // Determine the cache path if caching is enabled.
560    let cache_path = if fork.enable_caching {
561        Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
562    } else {
563        None
564    };
565
566    let db = BlockchainDb::new(meta, cache_path);
567    let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
568    let fork = CreatedFork::new(fork, backend);
569    let fork_id = ForkId::new(&fork.opts.url, Some(number));
570
571    Ok((fork_id, fork, handler))
572}