1use super::CreateFork;
7use alloy_evm::EvmEnv;
8use alloy_network::Network;
9use alloy_primitives::{U256, map::HashMap};
10use foundry_config::Config;
11use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
12use futures::{
13 FutureExt, StreamExt,
14 channel::mpsc::{Receiver, Sender, channel},
15 stream::Fuse,
16 task::{Context, Poll},
17};
18use revm::context::BlockEnv;
19use std::{
20 fmt::{self, Write},
21 pin::Pin,
22 sync::{
23 Arc,
24 atomic::AtomicUsize,
25 mpsc::{Sender as OneshotSender, channel as oneshot_channel},
26 },
27 time::Duration,
28};
29
30#[derive(Clone, Debug, PartialEq, Eq, Hash)]
33pub struct ForkId(pub String);
34
35impl ForkId {
36 pub fn new(url: &str, num: Option<u64>) -> Self {
38 let mut id = url.to_string();
39 id.push('@');
40 match num {
41 Some(n) => write!(id, "{n:#x}").unwrap(),
42 None => id.push_str("latest"),
43 }
44 Self(id)
45 }
46
47 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53impl fmt::Display for ForkId {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 self.0.fmt(f)
56 }
57}
58
59impl<T: Into<String>> From<T> for ForkId {
60 fn from(id: T) -> Self {
61 Self(id.into())
62 }
63}
64
65#[derive(Clone, Debug)]
68#[must_use]
69pub struct MultiFork<N: Network> {
70 handler: Sender<Request<N>>,
72 _shutdown: Arc<ShutDownMultiFork<N>>,
74}
75
76impl<N: Network> MultiFork<N> {
77 pub fn spawn() -> Self {
79 trace!(target: "fork::multi", "spawning multifork");
80
81 let (fork, mut handler) = Self::new();
82
83 let fut = async move {
86 handler.set_flush_cache_interval(Duration::from_secs(60));
91 handler.await
92 };
93 match tokio::runtime::Handle::try_current() {
94 Ok(rt) => _ = rt.spawn(fut),
95 Err(_) => {
96 trace!(target: "fork::multi", "spawning multifork backend thread");
97 _ = std::thread::Builder::new()
98 .name("multi-fork-backend".into())
99 .spawn(move || {
100 tokio::runtime::Builder::new_current_thread()
101 .enable_all()
102 .build()
103 .expect("failed to build tokio runtime")
104 .block_on(fut)
105 })
106 .expect("failed to spawn thread")
107 }
108 }
109
110 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
111 fork
112 }
113
114 #[doc(hidden)]
118 pub fn new() -> (Self, MultiForkHandler<N>) {
119 let (handler, handler_rx) = channel(1);
120 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
121 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
122 }
123
124 pub fn create_fork(
128 &self,
129 fork: CreateFork,
130 ) -> eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)> {
131 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
132 let (sender, rx) = oneshot_channel();
133 let req = Request::CreateFork(Box::new(fork), sender);
134 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
135 rx.recv()?
136 }
137
138 pub fn roll_fork(
142 &self,
143 fork: ForkId,
144 block: u64,
145 ) -> eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)> {
146 trace!(?fork, ?block, "rolling fork");
147 let (sender, rx) = oneshot_channel();
148 let req = Request::RollFork(fork, block, sender);
149 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
150 rx.recv()?
151 }
152
153 pub fn get_evm_env(&self, fork: ForkId) -> eyre::Result<Option<EvmEnv>> {
155 trace!(?fork, "getting env config");
156 let (sender, rx) = oneshot_channel();
157 let req = Request::GetEvmEnv(fork, sender);
158 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
159 Ok(rx.recv()?)
160 }
161
162 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
164 trace!(?fork, ?number, ?timestamp, "update fork block");
165 self.handler
166 .clone()
167 .try_send(Request::UpdateBlock(fork, number, timestamp))
168 .map_err(|e| eyre::eyre!("{:?}", e))
169 }
170
171 pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
176 trace!(?fork, ?env, "update fork block");
177 self.handler
178 .clone()
179 .try_send(Request::UpdateEnv(fork, env))
180 .map_err(|e| eyre::eyre!("{:?}", e))
181 }
182
183 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend<N>>> {
187 let id = id.into();
188 trace!(?id, "get fork backend");
189 let (sender, rx) = oneshot_channel();
190 let req = Request::GetFork(id, sender);
191 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
192 Ok(rx.recv()?)
193 }
194
195 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
199 let (sender, rx) = oneshot_channel();
200 let req = Request::GetForkUrl(id.into(), sender);
201 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
202 Ok(rx.recv()?)
203 }
204}
205
206type CreateFuture<N> =
207 Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)>> + Send>>;
208type CreateSender<N> = OneshotSender<eyre::Result<(ForkId, SharedBackend<N>, EvmEnv)>>;
209type GetEvmEnvSender = OneshotSender<Option<EvmEnv>>;
210
211#[derive(Debug)]
213enum Request<N: Network> {
214 CreateFork(Box<CreateFork>, CreateSender<N>),
216 GetFork(ForkId, OneshotSender<Option<SharedBackend<N>>>),
218 RollFork(ForkId, u64, CreateSender<N>),
220 GetEvmEnv(ForkId, GetEvmEnvSender),
222 UpdateBlock(ForkId, U256, U256),
224 UpdateEnv(ForkId, BlockEnv),
226 ShutDown(OneshotSender<()>),
228 GetForkUrl(ForkId, OneshotSender<Option<String>>),
230}
231
232enum ForkTask<N: Network> {
233 Create(CreateFuture<N>, ForkId, CreateSender<N>, Vec<CreateSender<N>>),
235}
236
237#[must_use = "futures do nothing unless polled"]
239pub struct MultiForkHandler<N: Network> {
240 incoming: Fuse<Receiver<Request<N>>>,
242
243 handlers: Vec<(ForkId, BackendHandler<N>)>,
247
248 pending_tasks: Vec<ForkTask<N>>,
250
251 forks: HashMap<ForkId, CreatedFork<N>>,
256
257 flush_cache_interval: Option<tokio::time::Interval>,
259}
260
261impl<N: Network> MultiForkHandler<N> {
262 fn new(incoming: Receiver<Request<N>>) -> Self {
263 Self {
264 incoming: incoming.fuse(),
265 handlers: Default::default(),
266 pending_tasks: Default::default(),
267 forks: Default::default(),
268 flush_cache_interval: None,
269 }
270 }
271
272 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
274 self.flush_cache_interval =
275 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
276 self
277 }
278
279 fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender<N>>> {
281 for ForkTask::Create(_, in_progress, _, additional) in &mut self.pending_tasks {
282 if in_progress == id {
283 return Some(additional);
284 }
285 }
286 None
287 }
288
289 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender<N>) {
290 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
291 trace!(?fork_id, "created new forkId");
292
293 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
295 in_progress.push(sender);
296 return;
297 }
298
299 let task = Box::pin(create_fork(fork));
301 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
302 }
303
304 fn insert_new_fork(
305 &mut self,
306 fork_id: ForkId,
307 fork: CreatedFork<N>,
308 sender: CreateSender<N>,
309 additional_senders: Vec<CreateSender<N>>,
310 ) {
311 self.forks.insert(fork_id.clone(), fork.clone());
312 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.evm_env.clone())));
313
314 for sender in additional_senders {
316 let next_fork_id = fork.inc_senders(fork_id.clone());
317 self.forks.insert(next_fork_id.clone(), fork.clone());
318 let _ =
319 sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.evm_env.clone())));
320 }
321 }
322
323 fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
325 if let Some(fork) = self.forks.get_mut(&fork_id) {
326 fork.opts.evm_env.block_env = env;
327 }
328 }
329 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
332 if let Some(fork) = self.forks.get_mut(&fork_id) {
333 fork.opts.evm_env.block_env.number = block_number;
334 fork.opts.evm_env.block_env.timestamp = block_timestamp;
335 }
336 }
337
338 fn on_request(&mut self, req: Request<N>) {
339 match req {
340 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
341 Request::GetFork(fork_id, sender) => {
342 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
343 let _ = sender.send(fork);
344 }
345 Request::RollFork(fork_id, block, sender) => {
346 if let Some(fork) = self.forks.get(&fork_id) {
347 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
348 let mut opts = fork.opts.clone();
349 opts.evm_opts.fork_block_number = Some(block);
350 self.create_fork(opts, sender)
351 } else {
352 let _ =
353 sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
354 }
355 }
356 Request::GetEvmEnv(fork_id, sender) => {
357 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.evm_env.clone()));
358 }
359 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
360 self.update_block(fork_id, block_number, block_timestamp);
361 }
362 Request::UpdateEnv(fork_id, block_env) => {
363 self.update_env(fork_id, block_env);
364 }
365 Request::ShutDown(sender) => {
366 trace!(target: "fork::multi", "received shutdown signal");
367 self.forks.clear();
369 self.handlers.clear();
370 let _ = sender.send(());
371 }
372 Request::GetForkUrl(fork_id, sender) => {
373 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
374 let _ = sender.send(fork);
375 }
376 }
377 }
378}
379
380impl<N: Network> Future for MultiForkHandler<N> {
383 type Output = ();
384
385 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
386 let this = self.get_mut();
387
388 loop {
390 match this.incoming.poll_next_unpin(cx) {
391 Poll::Ready(Some(req)) => this.on_request(req),
392 Poll::Ready(None) => {
393 trace!(target: "fork::multi", "request channel closed");
395 break;
396 }
397 Poll::Pending => break,
398 }
399 }
400
401 for n in (0..this.pending_tasks.len()).rev() {
403 let task = this.pending_tasks.swap_remove(n);
404 match task {
405 ForkTask::Create(mut fut, id, sender, additional_senders) => {
406 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
407 match resp {
408 Ok((fork_id, fork, handler)) => {
409 if let Some(fork) = this.forks.get(&fork_id).cloned() {
410 this.insert_new_fork(
411 fork.inc_senders(fork_id),
412 fork,
413 sender,
414 additional_senders,
415 );
416 } else {
417 this.handlers.push((fork_id.clone(), handler));
418 this.insert_new_fork(fork_id, fork, sender, additional_senders);
419 }
420 }
421 Err(err) => {
422 let _ = sender.send(Err(eyre::eyre!("{err}")));
423 for sender in additional_senders {
424 let _ = sender.send(Err(eyre::eyre!("{err}")));
425 }
426 }
427 }
428 } else {
429 this.pending_tasks.push(ForkTask::Create(
430 fut,
431 id,
432 sender,
433 additional_senders,
434 ));
435 }
436 }
437 }
438 }
439
440 for n in (0..this.handlers.len()).rev() {
442 let (id, mut handler) = this.handlers.swap_remove(n);
443 match handler.poll_unpin(cx) {
444 Poll::Ready(_) => {
445 trace!(target: "fork::multi", "fork {:?} completed", id);
446 }
447 Poll::Pending => {
448 this.handlers.push((id, handler));
449 }
450 }
451 }
452
453 if this.handlers.is_empty() && this.incoming.is_done() {
454 trace!(target: "fork::multi", "completed");
455 return Poll::Ready(());
456 }
457
458 if this
460 .flush_cache_interval
461 .as_mut()
462 .map(|interval| interval.poll_tick(cx).is_ready())
463 .unwrap_or_default()
464 && !this.forks.is_empty()
465 {
466 trace!(target: "fork::multi", "tick flushing caches");
467 let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
468 std::thread::Builder::new()
470 .name("flusher".into())
471 .spawn(move || {
472 forks.into_iter().for_each(|fork| fork.flush_cache());
473 })
474 .expect("failed to spawn thread");
475 }
476
477 Poll::Pending
478 }
479}
480
481#[derive(Debug, Clone)]
483struct CreatedFork<N: Network> {
484 opts: CreateFork,
486 backend: SharedBackend<N>,
488 num_senders: Arc<AtomicUsize>,
491}
492
493impl<N: Network> CreatedFork<N> {
494 pub fn new(opts: CreateFork, backend: SharedBackend<N>) -> Self {
495 Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
496 }
497
498 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
500 format!(
501 "{}-{}",
502 fork_id.as_str(),
503 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
504 )
505 .into()
506 }
507}
508
509#[derive(Debug)]
516struct ShutDownMultiFork<N: Network> {
517 handler: Option<Sender<Request<N>>>,
518}
519
520impl<N: Network> Drop for ShutDownMultiFork<N> {
521 fn drop(&mut self) {
522 trace!(target: "fork::multi", "initiating shutdown");
523 let (sender, rx) = oneshot_channel();
524 let req = Request::ShutDown(sender);
525 if let Some(mut handler) = self.handler.take()
526 && handler.try_send(req).is_ok()
527 {
528 let _ = rx.recv();
529 trace!(target: "fork::cache", "multifork backend shutdown");
530 }
531 }
532}
533
534async fn create_fork<N: Network>(
538 mut fork: CreateFork,
539) -> eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)> {
540 fork.evm_opts.fork_url = Some(fork.url.clone());
543
544 let provider = fork.evm_opts.fork_provider_with_url::<N>(&fork.url)?;
545
546 let (evm_env, number) = fork.evm_opts.fork_evm_env(&provider).await?;
548 fork.evm_env = evm_env;
549 let meta = BlockchainDbMeta::new(fork.evm_env.block_env.clone(), fork.url.clone());
550
551 let cache_path = if fork.enable_caching {
553 Config::foundry_block_cache_dir(fork.evm_env.cfg_env.chain_id, number)
554 } else {
555 None
556 };
557
558 let db = BlockchainDb::new(meta, cache_path);
559 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
560 let fork = CreatedFork::new(fork, backend);
561 let fork_id = ForkId::new(&fork.opts.url, Some(number));
562
563 Ok((fork_id, fork, handler))
564}