Skip to main content

foundry_evm_core/fork/
multi.rs

1//! Support for running multiple fork backends.
2//!
3//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
4//! concurrently active pairs at once.
5
6use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_network::Network;
10use alloy_primitives::{U256, map::HashMap};
11use alloy_provider::network::BlockResponse;
12use foundry_config::Config;
13use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
14use futures::{
15    FutureExt, StreamExt,
16    channel::mpsc::{Receiver, Sender, channel},
17    stream::Fuse,
18    task::{Context, Poll},
19};
20use revm::context::BlockEnv;
21use std::{
22    fmt::{self, Write},
23    pin::Pin,
24    sync::{
25        Arc,
26        atomic::AtomicUsize,
27        mpsc::{Sender as OneshotSender, channel as oneshot_channel},
28    },
29    time::Duration,
30};
31
32/// The _unique_ identifier for a specific fork, this could be the name of the network a custom
33/// descriptive name.
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct ForkId(pub String);
36
37impl ForkId {
38    /// Returns the identifier for a Fork from a URL and block number.
39    pub fn new(url: &str, num: Option<u64>) -> Self {
40        let mut id = url.to_string();
41        id.push('@');
42        match num {
43            Some(n) => write!(id, "{n:#x}").unwrap(),
44            None => id.push_str("latest"),
45        }
46        Self(id)
47    }
48
49    /// Returns the identifier of the fork.
50    pub fn as_str(&self) -> &str {
51        &self.0
52    }
53}
54
55impl fmt::Display for ForkId {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        self.0.fmt(f)
58    }
59}
60
61impl<T: Into<String>> From<T> for ForkId {
62    fn from(id: T) -> Self {
63        Self(id.into())
64    }
65}
66
67/// The Sender half of multi fork pair.
68/// Can send requests to the `MultiForkHandler` to create forks.
69#[derive(Clone, Debug)]
70#[must_use]
71pub struct MultiFork<N: Network> {
72    /// Channel to send `Request`s to the handler.
73    handler: Sender<Request<N>>,
74    /// Ensures that all rpc resources get flushed properly.
75    _shutdown: Arc<ShutDownMultiFork<N>>,
76}
77
78impl<N: Network> MultiFork<N> {
79    /// Creates a new pair and spawns the `MultiForkHandler` on a background thread.
80    pub fn spawn() -> Self {
81        trace!(target: "fork::multi", "spawning multifork");
82
83        let (fork, mut handler) = Self::new();
84
85        // Spawn a light-weight thread just for sending and receiving data from the remote
86        // client(s).
87        let fut = async move {
88            // Flush cache every 60s, this ensures that long-running fork tests get their
89            // cache flushed from time to time.
90            // NOTE: we install the interval here because the `tokio::timer::Interval`
91            // requires a rt.
92            handler.set_flush_cache_interval(Duration::from_secs(60));
93            handler.await
94        };
95        match tokio::runtime::Handle::try_current() {
96            Ok(rt) => _ = rt.spawn(fut),
97            Err(_) => {
98                trace!(target: "fork::multi", "spawning multifork backend thread");
99                _ = std::thread::Builder::new()
100                    .name("multi-fork-backend".into())
101                    .spawn(move || {
102                        tokio::runtime::Builder::new_current_thread()
103                            .enable_all()
104                            .build()
105                            .expect("failed to build tokio runtime")
106                            .block_on(fut)
107                    })
108                    .expect("failed to spawn thread")
109            }
110        }
111
112        trace!(target: "fork::multi", "spawned MultiForkHandler thread");
113        fork
114    }
115
116    /// Creates a new pair multi fork pair.
117    ///
118    /// Use [`spawn`](Self::spawn) instead.
119    #[doc(hidden)]
120    pub fn new() -> (Self, MultiForkHandler<N>) {
121        let (handler, handler_rx) = channel(1);
122        let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
123        (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
124    }
125
126    /// Returns a fork backend.
127    ///
128    /// If no matching fork backend exists it will be created.
129    pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend<N>, Env)> {
130        trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
131        let (sender, rx) = oneshot_channel();
132        let req = Request::CreateFork(Box::new(fork), sender);
133        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
134        rx.recv()?
135    }
136
137    /// Rolls the block of the fork.
138    ///
139    /// If no matching fork backend exists it will be created.
140    pub fn roll_fork(
141        &self,
142        fork: ForkId,
143        block: u64,
144    ) -> eyre::Result<(ForkId, SharedBackend<N>, Env)> {
145        trace!(?fork, ?block, "rolling fork");
146        let (sender, rx) = oneshot_channel();
147        let req = Request::RollFork(fork, block, sender);
148        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
149        rx.recv()?
150    }
151
152    /// Returns the `Env` of the given fork, if any.
153    pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
154        trace!(?fork, "getting env config");
155        let (sender, rx) = oneshot_channel();
156        let req = Request::GetEnv(fork, sender);
157        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
158        Ok(rx.recv()?)
159    }
160
161    /// Updates block number and timestamp of given fork with new values.
162    pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
163        trace!(?fork, ?number, ?timestamp, "update fork block");
164        self.handler
165            .clone()
166            .try_send(Request::UpdateBlock(fork, number, timestamp))
167            .map_err(|e| eyre::eyre!("{:?}", e))
168    }
169
170    /// Updates the fork's entire env
171    ///
172    /// This is required for tx level forking where we need to fork off the `block - 1` state but
173    /// still need use env settings for `env`.
174    pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
175        trace!(?fork, ?env, "update fork block");
176        self.handler
177            .clone()
178            .try_send(Request::UpdateEnv(fork, env))
179            .map_err(|e| eyre::eyre!("{:?}", e))
180    }
181
182    /// Returns the corresponding fork if it exists.
183    ///
184    /// Returns `None` if no matching fork backend is available.
185    pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend<N>>> {
186        let id = id.into();
187        trace!(?id, "get fork backend");
188        let (sender, rx) = oneshot_channel();
189        let req = Request::GetFork(id, sender);
190        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
191        Ok(rx.recv()?)
192    }
193
194    /// Returns the corresponding fork url if it exists.
195    ///
196    /// Returns `None` if no matching fork is available.
197    pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
198        let (sender, rx) = oneshot_channel();
199        let req = Request::GetForkUrl(id.into(), sender);
200        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
201        Ok(rx.recv()?)
202    }
203}
204
205type CreateFuture<N> =
206    Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)>> + Send>>;
207type CreateSender<N> = OneshotSender<eyre::Result<(ForkId, SharedBackend<N>, Env)>>;
208type GetEnvSender = OneshotSender<Option<Env>>;
209
210/// Request that's send to the handler.
211#[derive(Debug)]
212enum Request<N: Network> {
213    /// Creates a new ForkBackend.
214    CreateFork(Box<CreateFork>, CreateSender<N>),
215    /// Returns the Fork backend for the `ForkId` if it exists.
216    GetFork(ForkId, OneshotSender<Option<SharedBackend<N>>>),
217    /// Adjusts the block that's being forked, by creating a new fork at the new block.
218    RollFork(ForkId, u64, CreateSender<N>),
219    /// Returns the environment of the fork.
220    GetEnv(ForkId, GetEnvSender),
221    /// Updates the block number and timestamp of the fork.
222    UpdateBlock(ForkId, U256, U256),
223    /// Updates the block the entire block env,
224    UpdateEnv(ForkId, BlockEnv),
225    /// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
226    ShutDown(OneshotSender<()>),
227    /// Returns the Fork Url for the `ForkId` if it exists.
228    GetForkUrl(ForkId, OneshotSender<Option<String>>),
229}
230
231enum ForkTask<N: Network> {
232    /// Contains the future that will establish a new fork.
233    Create(CreateFuture<N>, ForkId, CreateSender<N>, Vec<CreateSender<N>>),
234}
235
236/// The type that manages connections in the background.
237#[must_use = "futures do nothing unless polled"]
238pub struct MultiForkHandler<N: Network> {
239    /// Incoming requests from the `MultiFork`.
240    incoming: Fuse<Receiver<Request<N>>>,
241
242    /// All active handlers.
243    ///
244    /// It's expected that this list will be rather small (<10).
245    handlers: Vec<(ForkId, BackendHandler<N>)>,
246
247    // tasks currently in progress
248    pending_tasks: Vec<ForkTask<N>>,
249
250    /// All _unique_ forkids mapped to their corresponding backend.
251    ///
252    /// Note: The backend can be shared by multiple ForkIds if the target the same provider and
253    /// block number.
254    forks: HashMap<ForkId, CreatedFork<N>>,
255
256    /// Optional periodic interval to flush rpc cache.
257    flush_cache_interval: Option<tokio::time::Interval>,
258}
259
260impl<N: Network> MultiForkHandler<N> {
261    fn new(incoming: Receiver<Request<N>>) -> Self {
262        Self {
263            incoming: incoming.fuse(),
264            handlers: Default::default(),
265            pending_tasks: Default::default(),
266            forks: Default::default(),
267            flush_cache_interval: None,
268        }
269    }
270
271    /// Sets the interval after which all rpc caches should be flushed periodically.
272    pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
273        self.flush_cache_interval =
274            Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
275        self
276    }
277
278    /// Returns the list of additional senders of a matching task for the given id, if any.
279    fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender<N>>> {
280        for ForkTask::Create(_, in_progress, _, additional) in &mut self.pending_tasks {
281            if in_progress == id {
282                return Some(additional);
283            }
284        }
285        None
286    }
287
288    fn create_fork(&mut self, fork: CreateFork, sender: CreateSender<N>) {
289        let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
290        trace!(?fork_id, "created new forkId");
291
292        // There could already be a task for the requested fork in progress.
293        if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
294            in_progress.push(sender);
295            return;
296        }
297
298        // Need to create a new fork.
299        let task = Box::pin(create_fork(fork));
300        self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
301    }
302
303    fn insert_new_fork(
304        &mut self,
305        fork_id: ForkId,
306        fork: CreatedFork<N>,
307        sender: CreateSender<N>,
308        additional_senders: Vec<CreateSender<N>>,
309    ) {
310        self.forks.insert(fork_id.clone(), fork.clone());
311        let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
312
313        // Notify all additional senders and track unique forkIds.
314        for sender in additional_senders {
315            let next_fork_id = fork.inc_senders(fork_id.clone());
316            self.forks.insert(next_fork_id.clone(), fork.clone());
317            let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
318        }
319    }
320
321    /// Update the fork's block entire env
322    fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
323        if let Some(fork) = self.forks.get_mut(&fork_id) {
324            fork.opts.env.evm_env.block_env = env;
325        }
326    }
327    /// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
328    /// cheatcodes when new fork selected.
329    fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
330        if let Some(fork) = self.forks.get_mut(&fork_id) {
331            fork.opts.env.evm_env.block_env.number = block_number;
332            fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
333        }
334    }
335
336    fn on_request(&mut self, req: Request<N>) {
337        match req {
338            Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
339            Request::GetFork(fork_id, sender) => {
340                let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
341                let _ = sender.send(fork);
342            }
343            Request::RollFork(fork_id, block, sender) => {
344                if let Some(fork) = self.forks.get(&fork_id) {
345                    trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
346                    let mut opts = fork.opts.clone();
347                    opts.evm_opts.fork_block_number = Some(block);
348                    self.create_fork(opts, sender)
349                } else {
350                    let _ =
351                        sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
352                }
353            }
354            Request::GetEnv(fork_id, sender) => {
355                let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
356            }
357            Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
358                self.update_block(fork_id, block_number, block_timestamp);
359            }
360            Request::UpdateEnv(fork_id, block_env) => {
361                self.update_env(fork_id, block_env);
362            }
363            Request::ShutDown(sender) => {
364                trace!(target: "fork::multi", "received shutdown signal");
365                // We're emptying all fork backends, this way we ensure all caches get flushed.
366                self.forks.clear();
367                self.handlers.clear();
368                let _ = sender.send(());
369            }
370            Request::GetForkUrl(fork_id, sender) => {
371                let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
372                let _ = sender.send(fork);
373            }
374        }
375    }
376}
377
378// Drives all handler to completion.
379// This future will finish once all underlying BackendHandler are completed.
380impl<N: Network> Future for MultiForkHandler<N> {
381    type Output = ();
382
383    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384        let this = self.get_mut();
385
386        // Receive new requests.
387        loop {
388            match this.incoming.poll_next_unpin(cx) {
389                Poll::Ready(Some(req)) => this.on_request(req),
390                Poll::Ready(None) => {
391                    // Channel closed, but we still need to drive the fork handlers to completion.
392                    trace!(target: "fork::multi", "request channel closed");
393                    break;
394                }
395                Poll::Pending => break,
396            }
397        }
398
399        // Advance all tasks.
400        for n in (0..this.pending_tasks.len()).rev() {
401            let task = this.pending_tasks.swap_remove(n);
402            match task {
403                ForkTask::Create(mut fut, id, sender, additional_senders) => {
404                    if let Poll::Ready(resp) = fut.poll_unpin(cx) {
405                        match resp {
406                            Ok((fork_id, fork, handler)) => {
407                                if let Some(fork) = this.forks.get(&fork_id).cloned() {
408                                    this.insert_new_fork(
409                                        fork.inc_senders(fork_id),
410                                        fork,
411                                        sender,
412                                        additional_senders,
413                                    );
414                                } else {
415                                    this.handlers.push((fork_id.clone(), handler));
416                                    this.insert_new_fork(fork_id, fork, sender, additional_senders);
417                                }
418                            }
419                            Err(err) => {
420                                let _ = sender.send(Err(eyre::eyre!("{err}")));
421                                for sender in additional_senders {
422                                    let _ = sender.send(Err(eyre::eyre!("{err}")));
423                                }
424                            }
425                        }
426                    } else {
427                        this.pending_tasks.push(ForkTask::Create(
428                            fut,
429                            id,
430                            sender,
431                            additional_senders,
432                        ));
433                    }
434                }
435            }
436        }
437
438        // Advance all handlers.
439        for n in (0..this.handlers.len()).rev() {
440            let (id, mut handler) = this.handlers.swap_remove(n);
441            match handler.poll_unpin(cx) {
442                Poll::Ready(_) => {
443                    trace!(target: "fork::multi", "fork {:?} completed", id);
444                }
445                Poll::Pending => {
446                    this.handlers.push((id, handler));
447                }
448            }
449        }
450
451        if this.handlers.is_empty() && this.incoming.is_done() {
452            trace!(target: "fork::multi", "completed");
453            return Poll::Ready(());
454        }
455
456        // Periodically flush cached RPC state.
457        if this
458            .flush_cache_interval
459            .as_mut()
460            .map(|interval| interval.poll_tick(cx).is_ready())
461            .unwrap_or_default()
462            && !this.forks.is_empty()
463        {
464            trace!(target: "fork::multi", "tick flushing caches");
465            let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
466            // Flush this on new thread to not block here.
467            std::thread::Builder::new()
468                .name("flusher".into())
469                .spawn(move || {
470                    forks.into_iter().for_each(|fork| fork.flush_cache());
471                })
472                .expect("failed to spawn thread");
473        }
474
475        Poll::Pending
476    }
477}
478
479/// Tracks the created Fork
480#[derive(Debug, Clone)]
481struct CreatedFork<N: Network> {
482    /// How the fork was initially created.
483    opts: CreateFork,
484    /// Copy of the sender.
485    backend: SharedBackend<N>,
486    /// How many consumers there are, since a `SharedBacked` can be used by multiple
487    /// consumers.
488    num_senders: Arc<AtomicUsize>,
489}
490
491impl<N: Network> CreatedFork<N> {
492    pub fn new(opts: CreateFork, backend: SharedBackend<N>) -> Self {
493        Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
494    }
495
496    /// Increment senders and return unique identifier of the fork.
497    fn inc_senders(&self, fork_id: ForkId) -> ForkId {
498        format!(
499            "{}-{}",
500            fork_id.as_str(),
501            self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
502        )
503        .into()
504    }
505}
506
507/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
508///
509/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
510///
511/// This type intentionally does not implement `Clone` since it's intended that there's only once
512/// instance.
513#[derive(Debug)]
514struct ShutDownMultiFork<N: Network> {
515    handler: Option<Sender<Request<N>>>,
516}
517
518impl<N: Network> Drop for ShutDownMultiFork<N> {
519    fn drop(&mut self) {
520        trace!(target: "fork::multi", "initiating shutdown");
521        let (sender, rx) = oneshot_channel();
522        let req = Request::ShutDown(sender);
523        if let Some(mut handler) = self.handler.take()
524            && handler.try_send(req).is_ok()
525        {
526            let _ = rx.recv();
527            trace!(target: "fork::cache", "multifork backend shutdown");
528        }
529    }
530}
531
532/// Creates a new fork.
533///
534/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
535async fn create_fork<N: Network>(
536    mut fork: CreateFork,
537) -> eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)> {
538    let provider = fork.evm_opts.fork_provider_with_url(&fork.url)?;
539
540    // Initialise the fork environment.
541    let (env, block) =
542        fork.evm_opts.fork_evm_env_with_provider::<_, N>(&fork.url, &provider).await?;
543    fork.env = env;
544    let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
545
546    // We need to use the block number from the block because the env's number can be different on
547    // some L2s (e.g. Arbitrum).
548    let number = block.header().number();
549
550    // Determine the cache path if caching is enabled.
551    let cache_path = if fork.enable_caching {
552        Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
553    } else {
554        None
555    };
556
557    let db = BlockchainDb::new(meta, cache_path);
558    let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
559    let fork = CreatedFork::new(fork, backend);
560    let fork_id = ForkId::new(&fork.opts.url, Some(number));
561
562    Ok((fork_id, fork, handler))
563}