1use super::CreateFork;
7use crate::FoundryBlock;
8use alloy_evm::EvmEnv;
9use alloy_network::{AnyNetwork, Network};
10use alloy_primitives::{U256, map::HashMap};
11use foundry_config::Config;
12use foundry_fork_db::{
13 BackendHandler, BlockchainDb, ForkBlockEnv, SharedBackend, cache::BlockchainDbMeta,
14};
15use futures::{
16 FutureExt, StreamExt,
17 channel::mpsc::{Receiver, Sender, channel},
18 stream::Fuse,
19 task::{Context, Poll},
20};
21use revm::primitives::hardfork::SpecId;
22use std::{
23 fmt::{self, Write},
24 pin::Pin,
25 sync::{
26 Arc,
27 atomic::AtomicUsize,
28 mpsc::{Sender as OneshotSender, channel as oneshot_channel},
29 },
30 time::Duration,
31};
32
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
36pub struct ForkId(pub String);
37
38impl ForkId {
39 pub fn new(url: &str, num: Option<u64>) -> Self {
41 let mut id = url.to_string();
42 id.push('@');
43 match num {
44 Some(n) => write!(id, "{n:#x}").unwrap(),
45 None => id.push_str("latest"),
46 }
47 Self(id)
48 }
49
50 pub fn as_str(&self) -> &str {
52 &self.0
53 }
54}
55
56impl fmt::Display for ForkId {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 self.0.fmt(f)
59 }
60}
61
62impl<T: Into<String>> From<T> for ForkId {
63 fn from(id: T) -> Self {
64 Self(id.into())
65 }
66}
67
68#[derive(Clone, Debug)]
71#[must_use]
72pub struct MultiFork<N: Network, SPEC, BLOCK: ForkBlockEnv> {
73 handler: Sender<Request<N, SPEC, BLOCK>>,
75 _shutdown: Arc<ShutDownMultiFork<N, SPEC, BLOCK>>,
77}
78
79impl<
80 N: Network,
81 SPEC: Into<SpecId> + Default + Copy + Unpin + Send + 'static,
82 BLOCK: FoundryBlock + ForkBlockEnv + Default + Unpin,
83> MultiFork<N, SPEC, BLOCK>
84{
85 pub fn spawn() -> Self {
87 trace!(target: "fork::multi", "spawning multifork");
88
89 let (fork, mut handler) = Self::new();
90
91 let fut = async move {
94 handler.set_flush_cache_interval(Duration::from_secs(60));
99 handler.await
100 };
101 match tokio::runtime::Handle::try_current() {
102 Ok(rt) => _ = rt.spawn(fut),
103 Err(_) => {
104 trace!(target: "fork::multi", "spawning multifork backend thread");
105 _ = std::thread::Builder::new()
106 .name("multi-fork-backend".into())
107 .spawn(move || {
108 tokio::runtime::Builder::new_current_thread()
109 .enable_all()
110 .build()
111 .expect("failed to build tokio runtime")
112 .block_on(fut)
113 })
114 .expect("failed to spawn thread")
115 }
116 }
117
118 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
119 fork
120 }
121
122 #[doc(hidden)]
126 pub fn new() -> (Self, MultiForkHandler<N, SPEC, BLOCK>) {
127 let (handler, handler_rx) = channel(1);
128 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
129 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
130 }
131
132 #[allow(clippy::type_complexity)]
136 pub fn create_fork(
137 &self,
138 fork: CreateFork,
139 ) -> eyre::Result<(ForkId, SharedBackend<N, BLOCK>, EvmEnv<SPEC, BLOCK>)> {
140 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
141 let (sender, rx) = oneshot_channel();
142 let req = Request::CreateFork(Box::new(fork), sender);
143 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
144 rx.recv()?
145 }
146
147 #[allow(clippy::type_complexity)]
151 pub fn roll_fork(
152 &self,
153 fork: ForkId,
154 block: u64,
155 ) -> eyre::Result<(ForkId, SharedBackend<N, BLOCK>, EvmEnv<SPEC, BLOCK>)> {
156 trace!(?fork, ?block, "rolling fork");
157 let (sender, rx) = oneshot_channel();
158 let req = Request::RollFork(fork, block, sender);
159 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
160 rx.recv()?
161 }
162
163 pub fn get_evm_env(&self, fork: ForkId) -> eyre::Result<Option<EvmEnv<SPEC, BLOCK>>> {
165 trace!(?fork, "getting env config");
166 let (sender, rx) = oneshot_channel();
167 let req = Request::GetEvmEnv(fork, sender);
168 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
169 Ok(rx.recv()?)
170 }
171
172 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
174 trace!(?fork, ?number, ?timestamp, "update fork block");
175 self.handler
176 .clone()
177 .try_send(Request::UpdateBlock(fork, number, timestamp))
178 .map_err(|e| eyre::eyre!("{:?}", e))
179 }
180
181 pub fn update_block_env(&self, fork: ForkId, env: BLOCK) -> eyre::Result<()>
186 where
187 BLOCK: fmt::Debug,
188 {
189 trace!(?fork, ?env, "update fork block");
190 self.handler
191 .clone()
192 .try_send(Request::UpdateEnv(fork, env))
193 .map_err(|e| eyre::eyre!("{:?}", e))
194 }
195
196 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend<N, BLOCK>>> {
200 let id = id.into();
201 trace!(?id, "get fork backend");
202 let (sender, rx) = oneshot_channel();
203 let req = Request::GetFork(id, sender);
204 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
205 Ok(rx.recv()?)
206 }
207
208 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
212 let (sender, rx) = oneshot_channel();
213 let req = Request::GetForkUrl(id.into(), sender);
214 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
215 Ok(rx.recv()?)
216 }
217}
218
219type CreateFuture<N, SPEC, BLOCK> = Pin<
220 Box<
221 dyn Future<
222 Output = eyre::Result<(
223 ForkId,
224 CreatedFork<N, SPEC, BLOCK>,
225 BackendHandler<N, BLOCK>,
226 )>,
227 > + Send,
228 >,
229>;
230type CreateSender<N, SPEC, BLOCK> =
231 OneshotSender<eyre::Result<(ForkId, SharedBackend<N, BLOCK>, EvmEnv<SPEC, BLOCK>)>>;
232type GetEvmEnvSender<SPEC, BLOCK> = OneshotSender<Option<EvmEnv<SPEC, BLOCK>>>;
233
234#[derive(Debug)]
236enum Request<N: Network, SPEC, BLOCK: ForkBlockEnv> {
237 CreateFork(Box<CreateFork>, CreateSender<N, SPEC, BLOCK>),
239 GetFork(ForkId, OneshotSender<Option<SharedBackend<N, BLOCK>>>),
241 RollFork(ForkId, u64, CreateSender<N, SPEC, BLOCK>),
243 GetEvmEnv(ForkId, GetEvmEnvSender<SPEC, BLOCK>),
245 UpdateBlock(ForkId, U256, U256),
247 UpdateEnv(ForkId, BLOCK),
249 ShutDown(OneshotSender<()>),
251 GetForkUrl(ForkId, OneshotSender<Option<String>>),
253}
254
255enum ForkTask<N: Network, SPEC, BLOCK: ForkBlockEnv> {
256 Create(
258 CreateFuture<N, SPEC, BLOCK>,
259 ForkId,
260 CreateSender<N, SPEC, BLOCK>,
261 Vec<CreateSender<N, SPEC, BLOCK>>,
262 ),
263}
264
265#[must_use = "futures do nothing unless polled"]
267pub struct MultiForkHandler<N: Network, SPEC, BLOCK: ForkBlockEnv> {
268 incoming: Fuse<Receiver<Request<N, SPEC, BLOCK>>>,
270
271 handlers: Vec<(ForkId, BackendHandler<N, BLOCK>)>,
275
276 pending_tasks: Vec<ForkTask<N, SPEC, BLOCK>>,
278
279 forks: HashMap<ForkId, CreatedFork<N, SPEC, BLOCK>>,
284
285 flush_cache_interval: Option<tokio::time::Interval>,
287}
288
289impl<
290 N: Network,
291 SPEC: Into<SpecId> + Default + Copy + 'static,
292 BLOCK: FoundryBlock + ForkBlockEnv + Default,
293> MultiForkHandler<N, SPEC, BLOCK>
294{
295 fn new(incoming: Receiver<Request<N, SPEC, BLOCK>>) -> Self {
296 Self {
297 incoming: incoming.fuse(),
298 handlers: Default::default(),
299 pending_tasks: Default::default(),
300 forks: Default::default(),
301 flush_cache_interval: None,
302 }
303 }
304
305 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
307 self.flush_cache_interval =
308 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
309 self
310 }
311
312 fn find_in_progress_task(
314 &mut self,
315 id: &ForkId,
316 ) -> Option<&mut Vec<CreateSender<N, SPEC, BLOCK>>> {
317 for ForkTask::Create(_, in_progress, _, additional) in &mut self.pending_tasks {
318 if in_progress == id {
319 return Some(additional);
320 }
321 }
322 None
323 }
324
325 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender<N, SPEC, BLOCK>) {
326 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
327 trace!(?fork_id, "created new forkId");
328
329 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
331 in_progress.push(sender);
332 return;
333 }
334
335 let task = Box::pin(create_fork(fork));
337 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
338 }
339
340 fn insert_new_fork(
341 &mut self,
342 fork_id: ForkId,
343 fork: CreatedFork<N, SPEC, BLOCK>,
344 sender: CreateSender<N, SPEC, BLOCK>,
345 additional_senders: Vec<CreateSender<N, SPEC, BLOCK>>,
346 ) {
347 self.forks.insert(fork_id.clone(), fork.clone());
348 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.evm_env.clone())));
349
350 for sender in additional_senders {
352 let next_fork_id = fork.inc_senders(fork_id.clone());
353 self.forks.insert(next_fork_id.clone(), fork.clone());
354 let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.evm_env.clone())));
355 }
356 }
357
358 fn update_env(&mut self, fork_id: ForkId, env: BLOCK) {
360 if let Some(fork) = self.forks.get_mut(&fork_id) {
361 fork.evm_env.block_env = env;
362 }
363 }
364 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
367 if let Some(fork) = self.forks.get_mut(&fork_id) {
368 fork.evm_env.block_env.set_number(block_number);
369 fork.evm_env.block_env.set_timestamp(block_timestamp);
370 }
371 }
372
373 fn on_request(&mut self, req: Request<N, SPEC, BLOCK>) {
374 match req {
375 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
376 Request::GetFork(fork_id, sender) => {
377 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
378 let _ = sender.send(fork);
379 }
380 Request::RollFork(fork_id, block, sender) => {
381 if let Some(fork) = self.forks.get(&fork_id) {
382 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
383 let mut opts = fork.opts.clone();
384 opts.evm_opts.fork_block_number = Some(block);
385 self.create_fork(opts, sender)
386 } else {
387 let _ =
388 sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
389 }
390 }
391 Request::GetEvmEnv(fork_id, sender) => {
392 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.evm_env.clone()));
393 }
394 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
395 self.update_block(fork_id, block_number, block_timestamp);
396 }
397 Request::UpdateEnv(fork_id, block_env) => {
398 self.update_env(fork_id, block_env);
399 }
400 Request::ShutDown(sender) => {
401 trace!(target: "fork::multi", "received shutdown signal");
402 self.forks.clear();
404 self.handlers.clear();
405 let _ = sender.send(());
406 }
407 Request::GetForkUrl(fork_id, sender) => {
408 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
409 let _ = sender.send(fork);
410 }
411 }
412 }
413}
414
415impl<
418 N: Network,
419 SPEC: Into<SpecId> + Default + Copy + Unpin + 'static,
420 BLOCK: FoundryBlock + ForkBlockEnv + Default + Unpin,
421> Future for MultiForkHandler<N, SPEC, BLOCK>
422{
423 type Output = ();
424
425 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
426 let this = self.get_mut();
427
428 loop {
430 match this.incoming.poll_next_unpin(cx) {
431 Poll::Ready(Some(req)) => this.on_request(req),
432 Poll::Ready(None) => {
433 trace!(target: "fork::multi", "request channel closed");
435 break;
436 }
437 Poll::Pending => break,
438 }
439 }
440
441 for n in (0..this.pending_tasks.len()).rev() {
443 let task = this.pending_tasks.swap_remove(n);
444 match task {
445 ForkTask::Create(mut fut, id, sender, additional_senders) => {
446 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
447 match resp {
448 Ok((fork_id, fork, handler)) => {
449 if let Some(fork) = this.forks.get(&fork_id).cloned() {
450 this.insert_new_fork(
451 fork.inc_senders(fork_id),
452 fork,
453 sender,
454 additional_senders,
455 );
456 } else {
457 this.handlers.push((fork_id.clone(), handler));
458 this.insert_new_fork(fork_id, fork, sender, additional_senders);
459 }
460 }
461 Err(err) => {
462 let _ = sender.send(Err(eyre::eyre!("{err}")));
463 for sender in additional_senders {
464 let _ = sender.send(Err(eyre::eyre!("{err}")));
465 }
466 }
467 }
468 } else {
469 this.pending_tasks.push(ForkTask::Create(
470 fut,
471 id,
472 sender,
473 additional_senders,
474 ));
475 }
476 }
477 }
478 }
479
480 for n in (0..this.handlers.len()).rev() {
482 let (id, mut handler) = this.handlers.swap_remove(n);
483 match handler.poll_unpin(cx) {
484 Poll::Ready(_) => {
485 trace!(target: "fork::multi", "fork {:?} completed", id);
486 }
487 Poll::Pending => {
488 this.handlers.push((id, handler));
489 }
490 }
491 }
492
493 if this.handlers.is_empty() && this.incoming.is_done() {
494 trace!(target: "fork::multi", "completed");
495 return Poll::Ready(());
496 }
497
498 if this
500 .flush_cache_interval
501 .as_mut()
502 .map(|interval| interval.poll_tick(cx).is_ready())
503 .unwrap_or_default()
504 && !this.forks.is_empty()
505 {
506 trace!(target: "fork::multi", "tick flushing caches");
507 let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
508 std::thread::Builder::new()
510 .name("flusher".into())
511 .spawn(move || {
512 forks.into_iter().for_each(|fork| fork.flush_cache());
513 })
514 .expect("failed to spawn thread");
515 }
516
517 Poll::Pending
518 }
519}
520
521#[derive(Debug, Clone)]
523struct CreatedFork<N: Network, SPEC, BLOCK: ForkBlockEnv> {
524 opts: CreateFork,
526 evm_env: EvmEnv<SPEC, BLOCK>,
528 backend: SharedBackend<N, BLOCK>,
530 num_senders: Arc<AtomicUsize>,
533}
534
535impl<N: Network, SPEC, BLOCK: ForkBlockEnv> CreatedFork<N, SPEC, BLOCK> {
536 pub fn new(
537 opts: CreateFork,
538 evm_env: EvmEnv<SPEC, BLOCK>,
539 backend: SharedBackend<N, BLOCK>,
540 ) -> Self {
541 Self { opts, evm_env, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
542 }
543
544 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
546 format!(
547 "{}-{}",
548 fork_id.as_str(),
549 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
550 )
551 .into()
552 }
553}
554
555#[derive(Debug)]
562struct ShutDownMultiFork<N: Network, SPEC, BLOCK: ForkBlockEnv> {
563 handler: Option<Sender<Request<N, SPEC, BLOCK>>>,
564}
565
566impl<N: Network, SPEC, BLOCK: ForkBlockEnv> Drop for ShutDownMultiFork<N, SPEC, BLOCK> {
567 fn drop(&mut self) {
568 trace!(target: "fork::multi", "initiating shutdown");
569 let (sender, rx) = oneshot_channel();
570 let req = Request::ShutDown(sender);
571 if let Some(mut handler) = self.handler.take()
572 && handler.try_send(req).is_ok()
573 {
574 let _ = rx.recv();
575 trace!(target: "fork::cache", "multifork backend shutdown");
576 }
577 }
578}
579
580async fn create_fork<
584 N: Network,
585 SPEC: Into<SpecId> + Default + Copy,
586 BLOCK: FoundryBlock + ForkBlockEnv + Default,
587>(
588 mut fork: CreateFork,
589) -> eyre::Result<(ForkId, CreatedFork<N, SPEC, BLOCK>, BackendHandler<N, BLOCK>)> {
590 fork.evm_opts.fork_url = Some(fork.url.clone());
593
594 let any_provider = fork.evm_opts.fork_provider_with_url::<AnyNetwork>(&fork.url)?;
598 let (evm_env, number) = fork.evm_opts.fork_evm_env::<_, BLOCK, _, _>(&any_provider).await?;
599 let meta = BlockchainDbMeta::new(evm_env.block_env.clone(), fork.url.clone());
600
601 let cache_path = if fork.enable_caching {
603 Config::foundry_block_cache_dir(evm_env.cfg_env.chain_id, number)
604 } else {
605 None
606 };
607
608 let provider = fork.evm_opts.fork_provider_with_url::<N>(&fork.url)?;
609 let db = BlockchainDb::new(meta, cache_path);
610 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
611 let fork_id = ForkId::new(&fork.url, Some(number));
612 let fork = CreatedFork::new(fork, evm_env, backend);
613
614 Ok((fork_id, fork, handler))
615}