Skip to main content

foundry_evm_core/fork/
multi.rs

1//! Support for running multiple fork backends.
2//!
3//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
4//! concurrently active pairs at once.
5
6use super::CreateFork;
7use alloy_evm::EvmEnv;
8use alloy_network::Network;
9use alloy_primitives::{U256, map::HashMap};
10use foundry_config::Config;
11use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
12use futures::{
13    FutureExt, StreamExt,
14    channel::mpsc::{Receiver, Sender, channel},
15    stream::Fuse,
16    task::{Context, Poll},
17};
18use revm::context::BlockEnv;
19use std::{
20    fmt::{self, Write},
21    pin::Pin,
22    sync::{
23        Arc,
24        atomic::AtomicUsize,
25        mpsc::{Sender as OneshotSender, channel as oneshot_channel},
26    },
27    time::Duration,
28};
29
30/// The _unique_ identifier for a specific fork, this could be the name of the network a custom
31/// descriptive name.
32#[derive(Clone, Debug, PartialEq, Eq, Hash)]
33pub struct ForkId(pub String);
34
35impl ForkId {
36    /// Returns the identifier for a Fork from a URL and block number.
37    pub fn new(url: &str, num: Option<u64>) -> Self {
38        let mut id = url.to_string();
39        id.push('@');
40        match num {
41            Some(n) => write!(id, "{n:#x}").unwrap(),
42            None => id.push_str("latest"),
43        }
44        Self(id)
45    }
46
47    /// Returns the identifier of the fork.
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53impl fmt::Display for ForkId {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        self.0.fmt(f)
56    }
57}
58
59impl<T: Into<String>> From<T> for ForkId {
60    fn from(id: T) -> Self {
61        Self(id.into())
62    }
63}
64
65/// The Sender half of multi fork pair.
66/// Can send requests to the `MultiForkHandler` to create forks.
67#[derive(Clone, Debug)]
68#[must_use]
69pub struct MultiFork<N: Network> {
70    /// Channel to send `Request`s to the handler.
71    handler: Sender<Request<N>>,
72    /// Ensures that all rpc resources get flushed properly.
73    _shutdown: Arc<ShutDownMultiFork<N>>,
74}
75
76impl<N: Network> MultiFork<N> {
77    /// Creates a new pair and spawns the `MultiForkHandler` on a background thread.
78    pub fn spawn() -> Self {
79        trace!(target: "fork::multi", "spawning multifork");
80
81        let (fork, mut handler) = Self::new();
82
83        // Spawn a light-weight thread just for sending and receiving data from the remote
84        // client(s).
85        let fut = async move {
86            // Flush cache every 60s, this ensures that long-running fork tests get their
87            // cache flushed from time to time.
88            // NOTE: we install the interval here because the `tokio::timer::Interval`
89            // requires a rt.
90            handler.set_flush_cache_interval(Duration::from_secs(60));
91            handler.await
92        };
93        match tokio::runtime::Handle::try_current() {
94            Ok(rt) => _ = rt.spawn(fut),
95            Err(_) => {
96                trace!(target: "fork::multi", "spawning multifork backend thread");
97                _ = std::thread::Builder::new()
98                    .name("multi-fork-backend".into())
99                    .spawn(move || {
100                        tokio::runtime::Builder::new_current_thread()
101                            .enable_all()
102                            .build()
103                            .expect("failed to build tokio runtime")
104                            .block_on(fut)
105                    })
106                    .expect("failed to spawn thread")
107            }
108        }
109
110        trace!(target: "fork::multi", "spawned MultiForkHandler thread");
111        fork
112    }
113
114    /// Creates a new pair multi fork pair.
115    ///
116    /// Use [`spawn`](Self::spawn) instead.
117    #[doc(hidden)]
118    pub fn new() -> (Self, MultiForkHandler<N>) {
119        let (handler, handler_rx) = channel(1);
120        let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
121        (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
122    }
123
124    /// Returns a fork backend.
125    ///
126    /// If no matching fork backend exists it will be created.
127    pub fn create_fork(
128        &self,
129        fork: CreateFork,
130    ) -> eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)> {
131        trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
132        let (sender, rx) = oneshot_channel();
133        let req = Request::CreateFork(Box::new(fork), sender);
134        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
135        rx.recv()?
136    }
137
138    /// Rolls the block of the fork.
139    ///
140    /// If no matching fork backend exists it will be created.
141    pub fn roll_fork(
142        &self,
143        fork: ForkId,
144        block: u64,
145    ) -> eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)> {
146        trace!(?fork, ?block, "rolling fork");
147        let (sender, rx) = oneshot_channel();
148        let req = Request::RollFork(fork, block, sender);
149        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
150        rx.recv()?
151    }
152
153    /// Returns the `EvmEnv` of the given fork, if any.
154    pub fn get_evm_env(&self, fork: ForkId) -> eyre::Result<Option<EvmEnv>> {
155        trace!(?fork, "getting env config");
156        let (sender, rx) = oneshot_channel();
157        let req = Request::GetEvmEnv(fork, sender);
158        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
159        Ok(rx.recv()?)
160    }
161
162    /// Updates block number and timestamp of given fork with new values.
163    pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
164        trace!(?fork, ?number, ?timestamp, "update fork block");
165        self.handler
166            .clone()
167            .try_send(Request::UpdateBlock(fork, number, timestamp))
168            .map_err(|e| eyre::eyre!("{:?}", e))
169    }
170
171    /// Updates the fork's entire env
172    ///
173    /// This is required for tx level forking where we need to fork off the `block - 1` state but
174    /// still need use env settings for `env`.
175    pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
176        trace!(?fork, ?env, "update fork block");
177        self.handler
178            .clone()
179            .try_send(Request::UpdateEnv(fork, env))
180            .map_err(|e| eyre::eyre!("{:?}", e))
181    }
182
183    /// Returns the corresponding fork if it exists.
184    ///
185    /// Returns `None` if no matching fork backend is available.
186    pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend<N>>> {
187        let id = id.into();
188        trace!(?id, "get fork backend");
189        let (sender, rx) = oneshot_channel();
190        let req = Request::GetFork(id, sender);
191        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
192        Ok(rx.recv()?)
193    }
194
195    /// Returns the corresponding fork url if it exists.
196    ///
197    /// Returns `None` if no matching fork is available.
198    pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
199        let (sender, rx) = oneshot_channel();
200        let req = Request::GetForkUrl(id.into(), sender);
201        self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
202        Ok(rx.recv()?)
203    }
204}
205
206type CreateFuture<N> =
207    Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)>> + Send>>;
208type CreateSender<N> = OneshotSender<eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)>>;
209type GetEvmEnvSender = OneshotSender<Option<EvmEnv>>;
210
211/// Request that's send to the handler.
212#[derive(Debug)]
213enum Request<N: Network> {
214    /// Creates a new ForkBackend.
215    CreateFork(Box<CreateFork>, CreateSender<N>),
216    /// Returns the Fork backend for the `ForkId` if it exists.
217    GetFork(ForkId, OneshotSender<Option<SharedBackend<N>>>),
218    /// Adjusts the block that's being forked, by creating a new fork at the new block.
219    RollFork(ForkId, u64, CreateSender<N>),
220    /// Returns the environment of the fork.
221    GetEvmEnv(ForkId, GetEvmEnvSender),
222    /// Updates the block number and timestamp of the fork.
223    UpdateBlock(ForkId, U256, U256),
224    /// Updates the block the entire block env,
225    UpdateEnv(ForkId, BlockEnv),
226    /// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
227    ShutDown(OneshotSender<()>),
228    /// Returns the Fork Url for the `ForkId` if it exists.
229    GetForkUrl(ForkId, OneshotSender<Option<String>>),
230}
231
232enum ForkTask<N: Network> {
233    /// Contains the future that will establish a new fork.
234    Create(CreateFuture<N>, ForkId, CreateSender<N>, Vec<CreateSender<N>>),
235}
236
237/// The type that manages connections in the background.
238#[must_use = "futures do nothing unless polled"]
239pub struct MultiForkHandler<N: Network> {
240    /// Incoming requests from the `MultiFork`.
241    incoming: Fuse<Receiver<Request<N>>>,
242
243    /// All active handlers.
244    ///
245    /// It's expected that this list will be rather small (<10).
246    handlers: Vec<(ForkId, BackendHandler<N>)>,
247
248    // tasks currently in progress
249    pending_tasks: Vec<ForkTask<N>>,
250
251    /// All _unique_ forkids mapped to their corresponding backend.
252    ///
253    /// Note: The backend can be shared by multiple ForkIds if the target the same provider and
254    /// block number.
255    forks: HashMap<ForkId, CreatedFork<N>>,
256
257    /// Optional periodic interval to flush rpc cache.
258    flush_cache_interval: Option<tokio::time::Interval>,
259}
260
261impl<N: Network> MultiForkHandler<N> {
262    fn new(incoming: Receiver<Request<N>>) -> Self {
263        Self {
264            incoming: incoming.fuse(),
265            handlers: Default::default(),
266            pending_tasks: Default::default(),
267            forks: Default::default(),
268            flush_cache_interval: None,
269        }
270    }
271
272    /// Sets the interval after which all rpc caches should be flushed periodically.
273    pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
274        self.flush_cache_interval =
275            Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
276        self
277    }
278
279    /// Returns the list of additional senders of a matching task for the given id, if any.
280    fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender<N>>> {
281        for ForkTask::Create(_, in_progress, _, additional) in &mut self.pending_tasks {
282            if in_progress == id {
283                return Some(additional);
284            }
285        }
286        None
287    }
288
289    fn create_fork(&mut self, fork: CreateFork, sender: CreateSender<N>) {
290        let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
291        trace!(?fork_id, "created new forkId");
292
293        // There could already be a task for the requested fork in progress.
294        if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
295            in_progress.push(sender);
296            return;
297        }
298
299        // Need to create a new fork.
300        let task = Box::pin(create_fork(fork));
301        self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
302    }
303
304    fn insert_new_fork(
305        &mut self,
306        fork_id: ForkId,
307        fork: CreatedFork<N>,
308        sender: CreateSender<N>,
309        additional_senders: Vec<CreateSender<N>>,
310    ) {
311        self.forks.insert(fork_id.clone(), fork.clone());
312        let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.evm_env.clone())));
313
314        // Notify all additional senders and track unique forkIds.
315        for sender in additional_senders {
316            let next_fork_id = fork.inc_senders(fork_id.clone());
317            self.forks.insert(next_fork_id.clone(), fork.clone());
318            let _ =
319                sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.evm_env.clone())));
320        }
321    }
322
323    /// Update the fork's block entire env
324    fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
325        if let Some(fork) = self.forks.get_mut(&fork_id) {
326            fork.opts.evm_env.block_env = env;
327        }
328    }
329    /// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
330    /// cheatcodes when new fork selected.
331    fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
332        if let Some(fork) = self.forks.get_mut(&fork_id) {
333            fork.opts.evm_env.block_env.number = block_number;
334            fork.opts.evm_env.block_env.timestamp = block_timestamp;
335        }
336    }
337
338    fn on_request(&mut self, req: Request<N>) {
339        match req {
340            Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
341            Request::GetFork(fork_id, sender) => {
342                let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
343                let _ = sender.send(fork);
344            }
345            Request::RollFork(fork_id, block, sender) => {
346                if let Some(fork) = self.forks.get(&fork_id) {
347                    trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
348                    let mut opts = fork.opts.clone();
349                    opts.evm_opts.fork_block_number = Some(block);
350                    self.create_fork(opts, sender)
351                } else {
352                    let _ =
353                        sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
354                }
355            }
356            Request::GetEvmEnv(fork_id, sender) => {
357                let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.evm_env.clone()));
358            }
359            Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
360                self.update_block(fork_id, block_number, block_timestamp);
361            }
362            Request::UpdateEnv(fork_id, block_env) => {
363                self.update_env(fork_id, block_env);
364            }
365            Request::ShutDown(sender) => {
366                trace!(target: "fork::multi", "received shutdown signal");
367                // We're emptying all fork backends, this way we ensure all caches get flushed.
368                self.forks.clear();
369                self.handlers.clear();
370                let _ = sender.send(());
371            }
372            Request::GetForkUrl(fork_id, sender) => {
373                let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
374                let _ = sender.send(fork);
375            }
376        }
377    }
378}
379
380// Drives all handler to completion.
381// This future will finish once all underlying BackendHandler are completed.
382impl<N: Network> Future for MultiForkHandler<N> {
383    type Output = ();
384
385    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
386        let this = self.get_mut();
387
388        // Receive new requests.
389        loop {
390            match this.incoming.poll_next_unpin(cx) {
391                Poll::Ready(Some(req)) => this.on_request(req),
392                Poll::Ready(None) => {
393                    // Channel closed, but we still need to drive the fork handlers to completion.
394                    trace!(target: "fork::multi", "request channel closed");
395                    break;
396                }
397                Poll::Pending => break,
398            }
399        }
400
401        // Advance all tasks.
402        for n in (0..this.pending_tasks.len()).rev() {
403            let task = this.pending_tasks.swap_remove(n);
404            match task {
405                ForkTask::Create(mut fut, id, sender, additional_senders) => {
406                    if let Poll::Ready(resp) = fut.poll_unpin(cx) {
407                        match resp {
408                            Ok((fork_id, fork, handler)) => {
409                                if let Some(fork) = this.forks.get(&fork_id).cloned() {
410                                    this.insert_new_fork(
411                                        fork.inc_senders(fork_id),
412                                        fork,
413                                        sender,
414                                        additional_senders,
415                                    );
416                                } else {
417                                    this.handlers.push((fork_id.clone(), handler));
418                                    this.insert_new_fork(fork_id, fork, sender, additional_senders);
419                                }
420                            }
421                            Err(err) => {
422                                let _ = sender.send(Err(eyre::eyre!("{err}")));
423                                for sender in additional_senders {
424                                    let _ = sender.send(Err(eyre::eyre!("{err}")));
425                                }
426                            }
427                        }
428                    } else {
429                        this.pending_tasks.push(ForkTask::Create(
430                            fut,
431                            id,
432                            sender,
433                            additional_senders,
434                        ));
435                    }
436                }
437            }
438        }
439
440        // Advance all handlers.
441        for n in (0..this.handlers.len()).rev() {
442            let (id, mut handler) = this.handlers.swap_remove(n);
443            match handler.poll_unpin(cx) {
444                Poll::Ready(_) => {
445                    trace!(target: "fork::multi", "fork {:?} completed", id);
446                }
447                Poll::Pending => {
448                    this.handlers.push((id, handler));
449                }
450            }
451        }
452
453        if this.handlers.is_empty() && this.incoming.is_done() {
454            trace!(target: "fork::multi", "completed");
455            return Poll::Ready(());
456        }
457
458        // Periodically flush cached RPC state.
459        if this
460            .flush_cache_interval
461            .as_mut()
462            .map(|interval| interval.poll_tick(cx).is_ready())
463            .unwrap_or_default()
464            && !this.forks.is_empty()
465        {
466            trace!(target: "fork::multi", "tick flushing caches");
467            let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
468            // Flush this on new thread to not block here.
469            std::thread::Builder::new()
470                .name("flusher".into())
471                .spawn(move || {
472                    forks.into_iter().for_each(|fork| fork.flush_cache());
473                })
474                .expect("failed to spawn thread");
475        }
476
477        Poll::Pending
478    }
479}
480
481/// Tracks the created Fork
482#[derive(Debug, Clone)]
483struct CreatedFork<N: Network> {
484    /// How the fork was initially created.
485    opts: CreateFork,
486    /// Copy of the sender.
487    backend: SharedBackend<N>,
488    /// How many consumers there are, since a `SharedBacked` can be used by multiple
489    /// consumers.
490    num_senders: Arc<AtomicUsize>,
491}
492
493impl<N: Network> CreatedFork<N> {
494    pub fn new(opts: CreateFork, backend: SharedBackend<N>) -> Self {
495        Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
496    }
497
498    /// Increment senders and return unique identifier of the fork.
499    fn inc_senders(&self, fork_id: ForkId) -> ForkId {
500        format!(
501            "{}-{}",
502            fork_id.as_str(),
503            self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
504        )
505        .into()
506    }
507}
508
509/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
510///
511/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
512///
513/// This type intentionally does not implement `Clone` since it's intended that there's only once
514/// instance.
515#[derive(Debug)]
516struct ShutDownMultiFork<N: Network> {
517    handler: Option<Sender<Request<N>>>,
518}
519
520impl<N: Network> Drop for ShutDownMultiFork<N> {
521    fn drop(&mut self) {
522        trace!(target: "fork::multi", "initiating shutdown");
523        let (sender, rx) = oneshot_channel();
524        let req = Request::ShutDown(sender);
525        if let Some(mut handler) = self.handler.take()
526            && handler.try_send(req).is_ok()
527        {
528            let _ = rx.recv();
529            trace!(target: "fork::cache", "multifork backend shutdown");
530        }
531    }
532}
533
534/// Creates a new fork.
535///
536/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
537async fn create_fork<N: Network>(
538    mut fork: CreateFork,
539) -> eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)> {
540    // Ensure evm_opts reflects the fork URL (may differ from the resolved CreateFork url when
541    // created via cheatcodes, where evm_opts is cloned from the base config).
542    fork.evm_opts.fork_url = Some(fork.url.clone());
543
544    let provider = fork.evm_opts.fork_provider_with_url::<N>(&fork.url)?;
545
546    // Initialise the fork environment.
547    let (evm_env, number) = fork.evm_opts.fork_evm_env(&provider).await?;
548    fork.evm_env = evm_env;
549    let meta = BlockchainDbMeta::new(fork.evm_env.block_env.clone(), fork.url.clone());
550
551    // Determine the cache path if caching is enabled.
552    let cache_path = if fork.enable_caching {
553        Config::foundry_block_cache_dir(fork.evm_env.cfg_env.chain_id, number)
554    } else {
555        None
556    };
557
558    let db = BlockchainDb::new(meta, cache_path);
559    let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
560    let fork = CreatedFork::new(fork, backend);
561    let fork_id = ForkId::new(&fork.opts.url, Some(number));
562
563    Ok((fork_id, fork, handler))
564}