1use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{U256, map::HashMap};
10use alloy_provider::network::BlockResponse;
11use foundry_config::Config;
12use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
13use futures::{
14 FutureExt, StreamExt,
15 channel::mpsc::{Receiver, Sender, channel},
16 stream::Fuse,
17 task::{Context, Poll},
18};
19use revm::context::BlockEnv;
20use std::{
21 fmt::{self, Write},
22 pin::Pin,
23 sync::{
24 Arc,
25 atomic::AtomicUsize,
26 mpsc::{Sender as OneshotSender, channel as oneshot_channel},
27 },
28 time::Duration,
29};
30
31#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub struct ForkId(pub String);
35
36impl ForkId {
37 pub fn new(url: &str, num: Option<u64>) -> Self {
39 let mut id = url.to_string();
40 id.push('@');
41 match num {
42 Some(n) => write!(id, "{n:#x}").unwrap(),
43 None => id.push_str("latest"),
44 }
45 Self(id)
46 }
47
48 pub fn as_str(&self) -> &str {
50 &self.0
51 }
52}
53
54impl fmt::Display for ForkId {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 self.0.fmt(f)
57 }
58}
59
60impl<T: Into<String>> From<T> for ForkId {
61 fn from(id: T) -> Self {
62 Self(id.into())
63 }
64}
65
66#[derive(Clone, Debug)]
69#[must_use]
70pub struct MultiFork {
71 handler: Sender<Request>,
73 _shutdown: Arc<ShutDownMultiFork>,
75}
76
77impl MultiFork {
78 pub fn spawn() -> Self {
80 trace!(target: "fork::multi", "spawning multifork");
81
82 let (fork, mut handler) = Self::new();
83
84 let fut = async move {
87 handler.set_flush_cache_interval(Duration::from_secs(60));
92 handler.await
93 };
94 match tokio::runtime::Handle::try_current() {
95 Ok(rt) => _ = rt.spawn(fut),
96 Err(_) => {
97 trace!(target: "fork::multi", "spawning multifork backend thread");
98 _ = std::thread::Builder::new()
99 .name("multi-fork-backend".into())
100 .spawn(move || {
101 tokio::runtime::Builder::new_current_thread()
102 .enable_all()
103 .build()
104 .expect("failed to build tokio runtime")
105 .block_on(fut)
106 })
107 .expect("failed to spawn thread")
108 }
109 }
110
111 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
112 fork
113 }
114
115 #[doc(hidden)]
119 pub fn new() -> (Self, MultiForkHandler) {
120 let (handler, handler_rx) = channel(1);
121 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
122 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
123 }
124
125 pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
129 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
130 let (sender, rx) = oneshot_channel();
131 let req = Request::CreateFork(Box::new(fork), sender);
132 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
133 rx.recv()?
134 }
135
136 pub fn roll_fork(
140 &self,
141 fork: ForkId,
142 block: u64,
143 ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
144 trace!(?fork, ?block, "rolling fork");
145 let (sender, rx) = oneshot_channel();
146 let req = Request::RollFork(fork, block, sender);
147 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
148 rx.recv()?
149 }
150
151 pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
153 trace!(?fork, "getting env config");
154 let (sender, rx) = oneshot_channel();
155 let req = Request::GetEnv(fork, sender);
156 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
157 Ok(rx.recv()?)
158 }
159
160 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
162 trace!(?fork, ?number, ?timestamp, "update fork block");
163 self.handler
164 .clone()
165 .try_send(Request::UpdateBlock(fork, number, timestamp))
166 .map_err(|e| eyre::eyre!("{:?}", e))
167 }
168
169 pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
174 trace!(?fork, ?env, "update fork block");
175 self.handler
176 .clone()
177 .try_send(Request::UpdateEnv(fork, env))
178 .map_err(|e| eyre::eyre!("{:?}", e))
179 }
180
181 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
185 let id = id.into();
186 trace!(?id, "get fork backend");
187 let (sender, rx) = oneshot_channel();
188 let req = Request::GetFork(id, sender);
189 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
190 Ok(rx.recv()?)
191 }
192
193 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
197 let (sender, rx) = oneshot_channel();
198 let req = Request::GetForkUrl(id.into(), sender);
199 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
200 Ok(rx.recv()?)
201 }
202}
203
204type Handler = BackendHandler;
205type CreateFuture =
206 Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
207type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
208type GetEnvSender = OneshotSender<Option<Env>>;
209
210#[derive(Debug)]
212enum Request {
213 CreateFork(Box<CreateFork>, CreateSender),
215 GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
217 RollFork(ForkId, u64, CreateSender),
219 GetEnv(ForkId, GetEnvSender),
221 UpdateBlock(ForkId, U256, U256),
223 UpdateEnv(ForkId, BlockEnv),
225 ShutDown(OneshotSender<()>),
227 GetForkUrl(ForkId, OneshotSender<Option<String>>),
229}
230
231enum ForkTask {
232 Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
234}
235
236#[must_use = "futures do nothing unless polled"]
238pub struct MultiForkHandler {
239 incoming: Fuse<Receiver<Request>>,
241
242 handlers: Vec<(ForkId, Handler)>,
246
247 pending_tasks: Vec<ForkTask>,
249
250 forks: HashMap<ForkId, CreatedFork>,
255
256 flush_cache_interval: Option<tokio::time::Interval>,
258}
259
260impl MultiForkHandler {
261 fn new(incoming: Receiver<Request>) -> Self {
262 Self {
263 incoming: incoming.fuse(),
264 handlers: Default::default(),
265 pending_tasks: Default::default(),
266 forks: Default::default(),
267 flush_cache_interval: None,
268 }
269 }
270
271 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
273 self.flush_cache_interval =
274 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
275 self
276 }
277
278 #[expect(irrefutable_let_patterns)]
280 fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
281 for task in &mut self.pending_tasks {
282 if let ForkTask::Create(_, in_progress, _, additional) = task
283 && in_progress == id
284 {
285 return Some(additional);
286 }
287 }
288 None
289 }
290
291 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
292 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
293 trace!(?fork_id, "created new forkId");
294
295 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
297 in_progress.push(sender);
298 return;
299 }
300
301 let task = Box::pin(create_fork(fork));
303 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
304 }
305
306 fn insert_new_fork(
307 &mut self,
308 fork_id: ForkId,
309 fork: CreatedFork,
310 sender: CreateSender,
311 additional_senders: Vec<CreateSender>,
312 ) {
313 self.forks.insert(fork_id.clone(), fork.clone());
314 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
315
316 for sender in additional_senders {
318 let next_fork_id = fork.inc_senders(fork_id.clone());
319 self.forks.insert(next_fork_id.clone(), fork.clone());
320 let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
321 }
322 }
323
324 fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
326 if let Some(fork) = self.forks.get_mut(&fork_id) {
327 fork.opts.env.evm_env.block_env = env;
328 }
329 }
330 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
333 if let Some(fork) = self.forks.get_mut(&fork_id) {
334 fork.opts.env.evm_env.block_env.number = block_number;
335 fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
336 }
337 }
338
339 fn on_request(&mut self, req: Request) {
340 match req {
341 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
342 Request::GetFork(fork_id, sender) => {
343 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
344 let _ = sender.send(fork);
345 }
346 Request::RollFork(fork_id, block, sender) => {
347 if let Some(fork) = self.forks.get(&fork_id) {
348 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
349 let mut opts = fork.opts.clone();
350 opts.evm_opts.fork_block_number = Some(block);
351 self.create_fork(opts, sender)
352 } else {
353 let _ =
354 sender.send(Err(eyre::eyre!("No matching fork exists for {}", fork_id)));
355 }
356 }
357 Request::GetEnv(fork_id, sender) => {
358 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
359 }
360 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
361 self.update_block(fork_id, block_number, block_timestamp);
362 }
363 Request::UpdateEnv(fork_id, block_env) => {
364 self.update_env(fork_id, block_env);
365 }
366 Request::ShutDown(sender) => {
367 trace!(target: "fork::multi", "received shutdown signal");
368 self.forks.clear();
370 self.handlers.clear();
371 let _ = sender.send(());
372 }
373 Request::GetForkUrl(fork_id, sender) => {
374 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
375 let _ = sender.send(fork);
376 }
377 }
378 }
379}
380
381impl Future for MultiForkHandler {
384 type Output = ();
385
386 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
387 let this = self.get_mut();
388
389 loop {
391 match this.incoming.poll_next_unpin(cx) {
392 Poll::Ready(Some(req)) => this.on_request(req),
393 Poll::Ready(None) => {
394 trace!(target: "fork::multi", "request channel closed");
396 break;
397 }
398 Poll::Pending => break,
399 }
400 }
401
402 for n in (0..this.pending_tasks.len()).rev() {
404 let task = this.pending_tasks.swap_remove(n);
405 match task {
406 ForkTask::Create(mut fut, id, sender, additional_senders) => {
407 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
408 match resp {
409 Ok((fork_id, fork, handler)) => {
410 if let Some(fork) = this.forks.get(&fork_id).cloned() {
411 this.insert_new_fork(
412 fork.inc_senders(fork_id),
413 fork,
414 sender,
415 additional_senders,
416 );
417 } else {
418 this.handlers.push((fork_id.clone(), handler));
419 this.insert_new_fork(fork_id, fork, sender, additional_senders);
420 }
421 }
422 Err(err) => {
423 let _ = sender.send(Err(eyre::eyre!("{err}")));
424 for sender in additional_senders {
425 let _ = sender.send(Err(eyre::eyre!("{err}")));
426 }
427 }
428 }
429 } else {
430 this.pending_tasks.push(ForkTask::Create(
431 fut,
432 id,
433 sender,
434 additional_senders,
435 ));
436 }
437 }
438 }
439 }
440
441 for n in (0..this.handlers.len()).rev() {
443 let (id, mut handler) = this.handlers.swap_remove(n);
444 match handler.poll_unpin(cx) {
445 Poll::Ready(_) => {
446 trace!(target: "fork::multi", "fork {:?} completed", id);
447 }
448 Poll::Pending => {
449 this.handlers.push((id, handler));
450 }
451 }
452 }
453
454 if this.handlers.is_empty() && this.incoming.is_done() {
455 trace!(target: "fork::multi", "completed");
456 return Poll::Ready(());
457 }
458
459 if this
461 .flush_cache_interval
462 .as_mut()
463 .map(|interval| interval.poll_tick(cx).is_ready())
464 .unwrap_or_default()
465 && !this.forks.is_empty()
466 {
467 trace!(target: "fork::multi", "tick flushing caches");
468 let forks = this.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
469 std::thread::Builder::new()
471 .name("flusher".into())
472 .spawn(move || {
473 forks.into_iter().for_each(|fork| fork.flush_cache());
474 })
475 .expect("failed to spawn thread");
476 }
477
478 Poll::Pending
479 }
480}
481
482#[derive(Debug, Clone)]
484struct CreatedFork {
485 opts: CreateFork,
487 backend: SharedBackend,
489 num_senders: Arc<AtomicUsize>,
492}
493
494impl CreatedFork {
495 pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
496 Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
497 }
498
499 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
501 format!(
502 "{}-{}",
503 fork_id.as_str(),
504 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
505 )
506 .into()
507 }
508}
509
510#[derive(Debug)]
517struct ShutDownMultiFork {
518 handler: Option<Sender<Request>>,
519}
520
521impl Drop for ShutDownMultiFork {
522 fn drop(&mut self) {
523 trace!(target: "fork::multi", "initiating shutdown");
524 let (sender, rx) = oneshot_channel();
525 let req = Request::ShutDown(sender);
526 if let Some(mut handler) = self.handler.take()
527 && handler.try_send(req).is_ok()
528 {
529 let _ = rx.recv();
530 trace!(target: "fork::cache", "multifork backend shutdown");
531 }
532 }
533}
534
535async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
539 let provider = fork.evm_opts.fork_provider_with_url(&fork.url)?;
540
541 let (env, block) = fork.evm_opts.fork_evm_env_with_provider(&fork.url, &provider).await?;
543 fork.env = env;
544 let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
545
546 let number = block.header().number();
549
550 let cache_path = if fork.enable_caching {
552 Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
553 } else {
554 None
555 };
556
557 let db = BlockchainDb::new(meta, cache_path);
558 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
559 let fork = CreatedFork::new(fork, backend);
560 let fork_id = ForkId::new(&fork.opts.url, Some(number));
561
562 Ok((fork_id, fork, handler))
563}