1use super::CreateFork;
7use crate::Env;
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{U256, map::HashMap};
10use alloy_provider::network::BlockResponse;
11use foundry_common::provider::{ProviderBuilder, RetryProvider};
12use foundry_config::Config;
13use foundry_fork_db::{BackendHandler, BlockchainDb, SharedBackend, cache::BlockchainDbMeta};
14use futures::{
15 FutureExt, StreamExt,
16 channel::mpsc::{Receiver, Sender, channel},
17 stream::{Fuse, Stream},
18 task::{Context, Poll},
19};
20use revm::context::BlockEnv;
21use std::{
22 fmt::{self, Write},
23 pin::Pin,
24 sync::{
25 Arc,
26 atomic::AtomicUsize,
27 mpsc::{Sender as OneshotSender, channel as oneshot_channel},
28 },
29 time::Duration,
30};
31
32#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct ForkId(pub String);
36
37impl ForkId {
38 pub fn new(url: &str, num: Option<u64>) -> Self {
40 let mut id = url.to_string();
41 id.push('@');
42 match num {
43 Some(n) => write!(id, "{n:#x}").unwrap(),
44 None => id.push_str("latest"),
45 }
46 Self(id)
47 }
48
49 pub fn as_str(&self) -> &str {
51 &self.0
52 }
53}
54
55impl fmt::Display for ForkId {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 self.0.fmt(f)
58 }
59}
60
61impl<T: Into<String>> From<T> for ForkId {
62 fn from(id: T) -> Self {
63 Self(id.into())
64 }
65}
66
67#[derive(Clone, Debug)]
70#[must_use]
71pub struct MultiFork {
72 handler: Sender<Request>,
74 _shutdown: Arc<ShutDownMultiFork>,
76}
77
78impl MultiFork {
79 pub fn spawn() -> Self {
81 trace!(target: "fork::multi", "spawning multifork");
82
83 let (fork, mut handler) = Self::new();
84 std::thread::Builder::new()
87 .name("multi-fork-backend".into())
88 .spawn(move || {
89 let rt = tokio::runtime::Builder::new_current_thread()
90 .enable_all()
91 .build()
92 .expect("failed to build tokio runtime");
93
94 rt.block_on(async move {
95 handler.set_flush_cache_interval(Duration::from_secs(60));
100 handler.await
101 });
102 })
103 .expect("failed to spawn thread");
104 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
105 fork
106 }
107
108 #[doc(hidden)]
112 pub fn new() -> (Self, MultiForkHandler) {
113 let (handler, handler_rx) = channel(1);
114 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
115 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
116 }
117
118 pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
122 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
123 let (sender, rx) = oneshot_channel();
124 let req = Request::CreateFork(Box::new(fork), sender);
125 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
126 rx.recv()?
127 }
128
129 pub fn roll_fork(
133 &self,
134 fork: ForkId,
135 block: u64,
136 ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
137 trace!(?fork, ?block, "rolling fork");
138 let (sender, rx) = oneshot_channel();
139 let req = Request::RollFork(fork, block, sender);
140 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
141 rx.recv()?
142 }
143
144 pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
146 trace!(?fork, "getting env config");
147 let (sender, rx) = oneshot_channel();
148 let req = Request::GetEnv(fork, sender);
149 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
150 Ok(rx.recv()?)
151 }
152
153 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
155 trace!(?fork, ?number, ?timestamp, "update fork block");
156 self.handler
157 .clone()
158 .try_send(Request::UpdateBlock(fork, number, timestamp))
159 .map_err(|e| eyre::eyre!("{:?}", e))
160 }
161
162 pub fn update_block_env(&self, fork: ForkId, env: BlockEnv) -> eyre::Result<()> {
167 trace!(?fork, ?env, "update fork block");
168 self.handler
169 .clone()
170 .try_send(Request::UpdateEnv(fork, env))
171 .map_err(|e| eyre::eyre!("{:?}", e))
172 }
173
174 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
178 let id = id.into();
179 trace!(?id, "get fork backend");
180 let (sender, rx) = oneshot_channel();
181 let req = Request::GetFork(id, sender);
182 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
183 Ok(rx.recv()?)
184 }
185
186 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
190 let (sender, rx) = oneshot_channel();
191 let req = Request::GetForkUrl(id.into(), sender);
192 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
193 Ok(rx.recv()?)
194 }
195}
196
197type Handler = BackendHandler<Arc<RetryProvider>>;
198
199type CreateFuture =
200 Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
201type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
202type GetEnvSender = OneshotSender<Option<Env>>;
203
204#[derive(Debug)]
206enum Request {
207 CreateFork(Box<CreateFork>, CreateSender),
209 GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
211 RollFork(ForkId, u64, CreateSender),
213 GetEnv(ForkId, GetEnvSender),
215 UpdateBlock(ForkId, U256, U256),
217 UpdateEnv(ForkId, BlockEnv),
219 ShutDown(OneshotSender<()>),
221 GetForkUrl(ForkId, OneshotSender<Option<String>>),
223}
224
225enum ForkTask {
226 Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
228}
229
230#[must_use = "futures do nothing unless polled"]
232pub struct MultiForkHandler {
233 incoming: Fuse<Receiver<Request>>,
235
236 handlers: Vec<(ForkId, Handler)>,
240
241 pending_tasks: Vec<ForkTask>,
243
244 forks: HashMap<ForkId, CreatedFork>,
249
250 flush_cache_interval: Option<tokio::time::Interval>,
252}
253
254impl MultiForkHandler {
255 fn new(incoming: Receiver<Request>) -> Self {
256 Self {
257 incoming: incoming.fuse(),
258 handlers: Default::default(),
259 pending_tasks: Default::default(),
260 forks: Default::default(),
261 flush_cache_interval: None,
262 }
263 }
264
265 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
267 self.flush_cache_interval =
268 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
269 self
270 }
271
272 #[expect(irrefutable_let_patterns)]
274 fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
275 for task in &mut self.pending_tasks {
276 if let ForkTask::Create(_, in_progress, _, additional) = task
277 && in_progress == id
278 {
279 return Some(additional);
280 }
281 }
282 None
283 }
284
285 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
286 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
287 trace!(?fork_id, "created new forkId");
288
289 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
291 in_progress.push(sender);
292 return;
293 }
294
295 let task = Box::pin(create_fork(fork));
297 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
298 }
299
300 fn insert_new_fork(
301 &mut self,
302 fork_id: ForkId,
303 fork: CreatedFork,
304 sender: CreateSender,
305 additional_senders: Vec<CreateSender>,
306 ) {
307 self.forks.insert(fork_id.clone(), fork.clone());
308 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
309
310 for sender in additional_senders {
312 let next_fork_id = fork.inc_senders(fork_id.clone());
313 self.forks.insert(next_fork_id.clone(), fork.clone());
314 let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
315 }
316 }
317
318 fn update_env(&mut self, fork_id: ForkId, env: BlockEnv) {
320 if let Some(fork) = self.forks.get_mut(&fork_id) {
321 fork.opts.env.evm_env.block_env = env;
322 }
323 }
324 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
327 if let Some(fork) = self.forks.get_mut(&fork_id) {
328 fork.opts.env.evm_env.block_env.number = block_number;
329 fork.opts.env.evm_env.block_env.timestamp = block_timestamp;
330 }
331 }
332
333 fn on_request(&mut self, req: Request) {
334 match req {
335 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
336 Request::GetFork(fork_id, sender) => {
337 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
338 let _ = sender.send(fork);
339 }
340 Request::RollFork(fork_id, block, sender) => {
341 if let Some(fork) = self.forks.get(&fork_id) {
342 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
343 let mut opts = fork.opts.clone();
344 opts.evm_opts.fork_block_number = Some(block);
345 self.create_fork(opts, sender)
346 } else {
347 let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
348 }
349 }
350 Request::GetEnv(fork_id, sender) => {
351 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
352 }
353 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
354 self.update_block(fork_id, block_number, block_timestamp);
355 }
356 Request::UpdateEnv(fork_id, block_env) => {
357 self.update_env(fork_id, block_env);
358 }
359 Request::ShutDown(sender) => {
360 trace!(target: "fork::multi", "received shutdown signal");
361 self.forks.clear();
363 self.handlers.clear();
364 let _ = sender.send(());
365 }
366 Request::GetForkUrl(fork_id, sender) => {
367 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
368 let _ = sender.send(fork);
369 }
370 }
371 }
372}
373
374impl Future for MultiForkHandler {
377 type Output = ();
378
379 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
380 let pin = self.get_mut();
381
382 loop {
384 match Pin::new(&mut pin.incoming).poll_next(cx) {
385 Poll::Ready(Some(req)) => {
386 pin.on_request(req);
387 }
388 Poll::Ready(None) => {
389 trace!(target: "fork::multi", "request channel closed");
391 break;
392 }
393 Poll::Pending => break,
394 }
395 }
396
397 for n in (0..pin.pending_tasks.len()).rev() {
399 let task = pin.pending_tasks.swap_remove(n);
400 match task {
401 ForkTask::Create(mut fut, id, sender, additional_senders) => {
402 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
403 match resp {
404 Ok((fork_id, fork, handler)) => {
405 if let Some(fork) = pin.forks.get(&fork_id).cloned() {
406 pin.insert_new_fork(
407 fork.inc_senders(fork_id),
408 fork,
409 sender,
410 additional_senders,
411 );
412 } else {
413 pin.handlers.push((fork_id.clone(), handler));
414 pin.insert_new_fork(fork_id, fork, sender, additional_senders);
415 }
416 }
417 Err(err) => {
418 let _ = sender.send(Err(eyre::eyre!("{err}")));
419 for sender in additional_senders {
420 let _ = sender.send(Err(eyre::eyre!("{err}")));
421 }
422 }
423 }
424 } else {
425 pin.pending_tasks.push(ForkTask::Create(
426 fut,
427 id,
428 sender,
429 additional_senders,
430 ));
431 }
432 }
433 }
434 }
435
436 for n in (0..pin.handlers.len()).rev() {
438 let (id, mut handler) = pin.handlers.swap_remove(n);
439 match handler.poll_unpin(cx) {
440 Poll::Ready(_) => {
441 trace!(target: "fork::multi", "fork {:?} completed", id);
442 }
443 Poll::Pending => {
444 pin.handlers.push((id, handler));
445 }
446 }
447 }
448
449 if pin.handlers.is_empty() && pin.incoming.is_done() {
450 trace!(target: "fork::multi", "completed");
451 return Poll::Ready(());
452 }
453
454 if pin
456 .flush_cache_interval
457 .as_mut()
458 .map(|interval| interval.poll_tick(cx).is_ready())
459 .unwrap_or_default()
460 && !pin.forks.is_empty()
461 {
462 trace!(target: "fork::multi", "tick flushing caches");
463 let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
464 std::thread::Builder::new()
466 .name("flusher".into())
467 .spawn(move || {
468 forks.into_iter().for_each(|fork| fork.flush_cache());
469 })
470 .expect("failed to spawn thread");
471 }
472
473 Poll::Pending
474 }
475}
476
477#[derive(Debug, Clone)]
479struct CreatedFork {
480 opts: CreateFork,
482 backend: SharedBackend,
484 num_senders: Arc<AtomicUsize>,
487}
488
489impl CreatedFork {
490 pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
491 Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
492 }
493
494 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
496 format!(
497 "{}-{}",
498 fork_id.as_str(),
499 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
500 )
501 .into()
502 }
503}
504
505#[derive(Debug)]
512struct ShutDownMultiFork {
513 handler: Option<Sender<Request>>,
514}
515
516impl Drop for ShutDownMultiFork {
517 fn drop(&mut self) {
518 trace!(target: "fork::multi", "initiating shutdown");
519 let (sender, rx) = oneshot_channel();
520 let req = Request::ShutDown(sender);
521 if let Some(mut handler) = self.handler.take()
522 && handler.try_send(req).is_ok()
523 {
524 let _ = rx.recv();
525 trace!(target: "fork::cache", "multifork backend shutdown");
526 }
527 }
528}
529
530async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
534 let provider = Arc::new(
535 ProviderBuilder::new(fork.url.as_str())
536 .maybe_max_retry(fork.evm_opts.fork_retries)
537 .maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
538 .maybe_headers(fork.evm_opts.fork_headers.clone())
539 .compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
540 .build()?,
541 );
542
543 let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
545 fork.env = env;
546 let meta = BlockchainDbMeta::new(fork.env.evm_env.block_env.clone(), fork.url.clone());
547
548 let number = block.header().number();
551
552 let cache_path = if fork.enable_caching {
554 Config::foundry_block_cache_dir(fork.env.evm_env.cfg_env.chain_id, number)
555 } else {
556 None
557 };
558
559 let db = BlockchainDb::new(meta, cache_path);
560 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
561 let fork = CreatedFork::new(fork, backend);
562 let fork_id = ForkId::new(&fork.opts.url, Some(number));
563
564 Ok((fork_id, fork, handler))
565}