1use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_network::Network;
10use alloy_primitives::{U256, map::HashMap};
11use alloy_provider::network::BlockResponse;
12use foundry_config::Config;
13use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
14use futures::{
15 FutureExt, StreamExt,
16 channel::mpsc::{Receiver, Sender, channel},
17 stream::Fuse,
18 task::{Context, Poll},
19};
20use revm::context::BlockEnv;
21use std::{
22 fmt::{self, Write},
23 pin::Pin,
24 sync::{
25 Arc,
26 atomic::AtomicUsize,
27 mpsc::{Sender as OneshotSender, channel as oneshot_channel},
28 },
29 time::Duration,
30};
31
32#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct ForkId(pub String);
36
37impl ForkId {
38 pub fn new(url: &str, num: Option<u64>) -> Self {
40 let mut id = url.to_string();
41 id.push('@');
42 match num {
43 Some(n) => write!(id, "{n:#x}").unwrap(),
44 None => id.push_str("latest"),
45 }
46 Self(id)
47 }
48
49 pub fn as_str(&self) -> &str {
51 &self.0
52 }
53}
54
55impl fmt::Display for ForkId {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 self.0.fmt(f)
58 }
59}
60
61impl<T: Into<String>> From<T> for ForkId {
62 fn from(id: T) -> Self {
63 Self(id.into())
64 }
65}
66
67#[derive(Clone, Debug)]
70#[must_use]
71pub struct MultiFork<N: Network> {
72 handler: Sender<Request<N>>,
74 _shutdown: Arc<ShutDownMultiFork<N>>,
76}
77
78impl<N: Network> MultiFork<N> {
79 pub fn spawn() -> Self {
81 trace!(target: "fork::multi", "spawning multifork");
82
83 let (fork, mut handler) = Self::new();
84
85 let fut = async move {
88 handler.set_flush_cache_interval(Duration::from_secs(60));
93 handler.await
94 };
95 match tokio::runtime::Handle::try_current() {
96 Ok(rt) => _ = rt.spawn(fut),
97 Err(_) => {
98 trace!(target: "fork::multi", "spawning multifork backend thread");
99 _ = std::thread::Builder::new()
100 .name("multi-fork-backend".into())
101 .spawn(move || {
102 tokio::runtime::Builder::new_current_thread()
103 .enable_all()
104 .build()
105 .expect("failed to build tokio runtime")
106 .block_on(fut)
107 })
108 .expect("failed to spawn thread")
109 }
110 }
111
112 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
113 fork
114 }
115
116 #[doc(hidden)]
120 pub fn new() -> (Self, MultiForkHandler<N>) {
121 let (handler, handler_rx) = channel(1);
122 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
123 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
124 }
125
126 pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend<N>, Env)> {
130 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
131 let (sender, rx) = oneshot_channel();
132 let req = Request::CreateFork(Box::new(fork), sender);
133 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
134 rx.recv()?
135 }
136
137 pub fn roll_fork(
141 &self,
142 fork: ForkId,
143 block: u64,
144 ) -> eyre::Result<(ForkId, SharedBackend<N>, Env)> {
145 trace!(?fork, ?block, "rolling fork");
146 let (sender, rx) = oneshot_channel();
147 let req = Request::RollFork(fork, block, sender);
148 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
149 rx.recv()?
150 }
151
152 pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
154 trace!(?fork, "getting env config");
155 let (sender, rx) = oneshot_channel();
156 let req = Request::GetEnv(fork, sender);
157 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
158 Ok(rx.recv()?)
159 }
160
161 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
163 trace!(?fork, ?number, ?timestamp, "update fork block");
164 self.handler
165 .clone()
166 .try_send(Request::UpdateBlock(fork, number, timestamp))
167 .map_err(|e| eyre::eyre!("{:?}", e))
168 }
169
170 pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
175 trace!(?fork, ?env, "update fork block");
176 self.handler
177 .clone()
178 .try_send(Request::UpdateEnv(fork, env))
179 .map_err(|e| eyre::eyre!("{:?}", e))
180 }
181
182 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend<N>>> {
186 let id = id.into();
187 trace!(?id, "get fork backend");
188 let (sender, rx) = oneshot_channel();
189 let req = Request::GetFork(id, sender);
190 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
191 Ok(rx.recv()?)
192 }
193
194 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
198 let (sender, rx) = oneshot_channel();
199 let req = Request::GetForkUrl(id.into(), sender);
200 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
201 Ok(rx.recv()?)
202 }
203}
204
205type CreateFuture<N> =
206 Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)>> + Send>>;
207type CreateSender<N> = OneshotSender<eyre::Result<(ForkId, SharedBackend<N>, Env)>>;
208type GetEnvSender = OneshotSender<Option<Env>>;
209
210#[derive(Debug)]
212enum Request<N: Network> {
213 CreateFork(Box<CreateFork>, CreateSender<N>),
215 GetFork(ForkId, OneshotSender<Option<SharedBackend<N>>>),
217 RollFork(ForkId, u64, CreateSender<N>),
219 GetEnv(ForkId, GetEnvSender),
221 UpdateBlock(ForkId, U256, U256),
223 UpdateEnv(ForkId, BlockEnv),
225 ShutDown(OneshotSender<()>),
227 GetForkUrl(ForkId, OneshotSender<Option<String>>),
229}
230
231enum ForkTask<N: Network> {
232 Create(CreateFuture<N>, ForkId, CreateSender<N>, Vec<CreateSender<N>>),
234}
235
236#[must_use = "futures do nothing unless polled"]
238pub struct MultiForkHandler<N: Network> {
239 incoming: Fuse<Receiver<Request<N>>>,
241
242 handlers: Vec<(ForkId, BackendHandler<N>)>,
246
247 pending_tasks: Vec<ForkTask<N>>,
249
250 forks: HashMap<ForkId, CreatedFork<N>>,
255
256 flush_cache_interval: Option<tokio::time::Interval>,
258}
259
260impl<N: Network> MultiForkHandler<N> {
261 fn new(incoming: Receiver<Request<N>>) -> Self {
262 Self {
263 incoming: incoming.fuse(),
264 handlers: Default::default(),
265 pending_tasks: Default::default(),
266 forks: Default::default(),
267 flush_cache_interval: None,
268 }
269 }
270
271 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
273 self.flush_cache_interval =
274 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
275 self
276 }
277
278 fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender<N>>> {
280 for ForkTask::Create(_, in_progress, _, additional) in &mut self.pending_tasks {
281 if in_progress == id {
282 return Some(additional);
283 }
284 }
285 None
286 }
287
288 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender<N>) {
289 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
290 trace!(?fork_id, "created new forkId");
291
292 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
294 in_progress.push(sender);
295 return;
296 }
297
298 let task = Box::pin(create_fork(fork));
300 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
301 }
302
303 fn insert_new_fork(
304 &mut self,
305 fork_id: ForkId,
306 fork: CreatedFork<N>,
307 sender: CreateSender<N>,
308 additional_senders: Vec<CreateSender<N>>,
309 ) {
310 self.forks.insert(fork_id.clone(), fork.clone());
311 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
312
313 for sender in additional_senders {
315 let next_fork_id = fork.inc_senders(fork_id.clone());
316 self.forks.insert(next_fork_id.clone(), fork.clone());
317 let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
318 }
319 }
320
321 fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
323 if let Some(fork) = self.forks.get_mut(&fork_id) {
324 fork.opts.env.evm_env.block_env = env;
325 }
326 }
327 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
330 if let Some(fork) = self.forks.get_mut(&fork_id) {
331 fork.opts.env.evm_env.block_env.number = block_number;
332 fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
333 }
334 }
335
336 fn on_request(&mut self, req: Request<N>) {
337 match req {
338 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
339 Request::GetFork(fork_id, sender) => {
340 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
341 let _ = sender.send(fork);
342 }
343 Request::RollFork(fork_id, block, sender) => {
344 if let Some(fork) = self.forks.get(&fork_id) {
345 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
346 let mut opts = fork.opts.clone();
347 opts.evm_opts.fork_block_number = Some(block);
348 self.create_fork(opts, sender)
349 } else {
350 let _ =
351 sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
352 }
353 }
354 Request::GetEnv(fork_id, sender) => {
355 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
356 }
357 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
358 self.update_block(fork_id, block_number, block_timestamp);
359 }
360 Request::UpdateEnv(fork_id, block_env) => {
361 self.update_env(fork_id, block_env);
362 }
363 Request::ShutDown(sender) => {
364 trace!(target: "fork::multi", "received shutdown signal");
365 self.forks.clear();
367 self.handlers.clear();
368 let _ = sender.send(());
369 }
370 Request::GetForkUrl(fork_id, sender) => {
371 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
372 let _ = sender.send(fork);
373 }
374 }
375 }
376}
377
378impl<N: Network> Future for MultiForkHandler<N> {
381 type Output = ();
382
383 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384 let this = self.get_mut();
385
386 loop {
388 match this.incoming.poll_next_unpin(cx) {
389 Poll::Ready(Some(req)) => this.on_request(req),
390 Poll::Ready(None) => {
391 trace!(target: "fork::multi", "request channel closed");
393 break;
394 }
395 Poll::Pending => break,
396 }
397 }
398
399 for n in (0..this.pending_tasks.len()).rev() {
401 let task = this.pending_tasks.swap_remove(n);
402 match task {
403 ForkTask::Create(mut fut, id, sender, additional_senders) => {
404 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
405 match resp {
406 Ok((fork_id, fork, handler)) => {
407 if let Some(fork) = this.forks.get(&fork_id).cloned() {
408 this.insert_new_fork(
409 fork.inc_senders(fork_id),
410 fork,
411 sender,
412 additional_senders,
413 );
414 } else {
415 this.handlers.push((fork_id.clone(), handler));
416 this.insert_new_fork(fork_id, fork, sender, additional_senders);
417 }
418 }
419 Err(err) => {
420 let _ = sender.send(Err(eyre::eyre!("{err}")));
421 for sender in additional_senders {
422 let _ = sender.send(Err(eyre::eyre!("{err}")));
423 }
424 }
425 }
426 } else {
427 this.pending_tasks.push(ForkTask::Create(
428 fut,
429 id,
430 sender,
431 additional_senders,
432 ));
433 }
434 }
435 }
436 }
437
438 for n in (0..this.handlers.len()).rev() {
440 let (id, mut handler) = this.handlers.swap_remove(n);
441 match handler.poll_unpin(cx) {
442 Poll::Ready(_) => {
443 trace!(target: "fork::multi", "fork {:?} completed", id);
444 }
445 Poll::Pending => {
446 this.handlers.push((id, handler));
447 }
448 }
449 }
450
451 if this.handlers.is_empty() && this.incoming.is_done() {
452 trace!(target: "fork::multi", "completed");
453 return Poll::Ready(());
454 }
455
456 if this
458 .flush_cache_interval
459 .as_mut()
460 .map(|interval| interval.poll_tick(cx).is_ready())
461 .unwrap_or_default()
462 && !this.forks.is_empty()
463 {
464 trace!(target: "fork::multi", "tick flushing caches");
465 let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
466 std::thread::Builder::new()
468 .name("flusher".into())
469 .spawn(move || {
470 forks.into_iter().for_each(|fork| fork.flush_cache());
471 })
472 .expect("failed to spawn thread");
473 }
474
475 Poll::Pending
476 }
477}
478
479#[derive(Debug, Clone)]
481struct CreatedFork<N: Network> {
482 opts: CreateFork,
484 backend: SharedBackend<N>,
486 num_senders: Arc<AtomicUsize>,
489}
490
491impl<N: Network> CreatedFork<N> {
492 pub fn new(opts: CreateFork, backend: SharedBackend<N>) -> Self {
493 Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
494 }
495
496 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
498 format!(
499 "{}-{}",
500 fork_id.as_str(),
501 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
502 )
503 .into()
504 }
505}
506
507#[derive(Debug)]
514struct ShutDownMultiFork<N: Network> {
515 handler: Option<Sender<Request<N>>>,
516}
517
518impl<N: Network> Drop for ShutDownMultiFork<N> {
519 fn drop(&mut self) {
520 trace!(target: "fork::multi", "initiating shutdown");
521 let (sender, rx) = oneshot_channel();
522 let req = Request::ShutDown(sender);
523 if let Some(mut handler) = self.handler.take()
524 && handler.try_send(req).is_ok()
525 {
526 let _ = rx.recv();
527 trace!(target: "fork::cache", "multifork backend shutdown");
528 }
529 }
530}
531
532async fn create_fork<N: Network>(
536 mut fork: CreateFork,
537) -> eyre::Result<(ForkId, CreatedFork<N>, BackendHandler<N>)> {
538 let provider = fork.evm_opts.fork_provider_with_url(&fork.url)?;
539
540 let (env, block) =
542 fork.evm_opts.fork_evm_env_with_provider::<_, N>(&fork.url, &provider).await?;
543 fork.env = env;
544 let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
545
546 let number = block.header().number();
549
550 let cache_path = if fork.enable_caching {
552 Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
553 } else {
554 None
555 };
556
557 let db = BlockchainDb::new(meta, cache_path);
558 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
559 let fork = CreatedFork::new(fork, backend);
560 let fork_id = ForkId::new(&fork.opts.url, Some(number));
561
562 Ok((fork_id, fork, handler))
563}