1use super::CreateFork;
7use alloy_consensus::BlockHeader;
8use alloy_primitives::{map::HashMap, U256};
9use alloy_provider::network::BlockResponse;
10use foundry_common::provider::{ProviderBuilder, RetryProvider};
11use foundry_config::Config;
12use foundry_fork_db::{cache::BlockchainDbMeta, BackendHandler, BlockchainDb, SharedBackend};
13use futures::{
14 channel::mpsc::{channel, Receiver, Sender},
15 stream::{Fuse, Stream},
16 task::{Context, Poll},
17 Future, FutureExt, StreamExt,
18};
19use revm::primitives::Env;
20use std::{
21 fmt::{self, Write},
22 pin::Pin,
23 sync::{
24 atomic::AtomicUsize,
25 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
26 Arc,
27 },
28 time::Duration,
29};
30
31#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub struct ForkId(pub String);
35
36impl ForkId {
37 pub fn new(url: &str, num: Option<u64>) -> Self {
39 let mut id = url.to_string();
40 id.push('@');
41 match num {
42 Some(n) => write!(id, "{n:#x}").unwrap(),
43 None => id.push_str("latest"),
44 }
45 Self(id)
46 }
47
48 pub fn as_str(&self) -> &str {
50 &self.0
51 }
52}
53
54impl fmt::Display for ForkId {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 self.0.fmt(f)
57 }
58}
59
60impl<T: Into<String>> From<T> for ForkId {
61 fn from(id: T) -> Self {
62 Self(id.into())
63 }
64}
65
66#[derive(Clone, Debug)]
69#[must_use]
70pub struct MultiFork {
71 handler: Sender<Request>,
73 _shutdown: Arc<ShutDownMultiFork>,
75}
76
77impl MultiFork {
78 pub fn spawn() -> Self {
80 trace!(target: "fork::multi", "spawning multifork");
81
82 let (fork, mut handler) = Self::new();
83 std::thread::Builder::new()
86 .name("multi-fork-backend".into())
87 .spawn(move || {
88 let rt = tokio::runtime::Builder::new_current_thread()
89 .enable_all()
90 .build()
91 .expect("failed to build tokio runtime");
92
93 rt.block_on(async move {
94 handler.set_flush_cache_interval(Duration::from_secs(60));
99 handler.await
100 });
101 })
102 .expect("failed to spawn thread");
103 trace!(target: "fork::multi", "spawned MultiForkHandler thread");
104 fork
105 }
106
107 #[doc(hidden)]
111 pub fn new() -> (Self, MultiForkHandler) {
112 let (handler, handler_rx) = channel(1);
113 let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
114 (Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
115 }
116
117 pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
121 trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
122 let (sender, rx) = oneshot_channel();
123 let req = Request::CreateFork(Box::new(fork), sender);
124 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
125 rx.recv()?
126 }
127
128 pub fn roll_fork(
132 &self,
133 fork: ForkId,
134 block: u64,
135 ) -> eyre::Result<(ForkId, SharedBackend, Env)> {
136 trace!(?fork, ?block, "rolling fork");
137 let (sender, rx) = oneshot_channel();
138 let req = Request::RollFork(fork, block, sender);
139 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
140 rx.recv()?
141 }
142
143 pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
145 trace!(?fork, "getting env config");
146 let (sender, rx) = oneshot_channel();
147 let req = Request::GetEnv(fork, sender);
148 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
149 Ok(rx.recv()?)
150 }
151
152 pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
154 trace!(?fork, ?number, ?timestamp, "update fork block");
155 self.handler
156 .clone()
157 .try_send(Request::UpdateBlock(fork, number, timestamp))
158 .map_err(|e| eyre::eyre!("{:?}", e))
159 }
160
161 pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
165 let id = id.into();
166 trace!(?id, "get fork backend");
167 let (sender, rx) = oneshot_channel();
168 let req = Request::GetFork(id, sender);
169 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
170 Ok(rx.recv()?)
171 }
172
173 pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
177 let (sender, rx) = oneshot_channel();
178 let req = Request::GetForkUrl(id.into(), sender);
179 self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
180 Ok(rx.recv()?)
181 }
182}
183
184type Handler = BackendHandler<Arc<RetryProvider>>;
185
186type CreateFuture =
187 Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
188type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
189type GetEnvSender = OneshotSender<Option<Env>>;
190
191#[derive(Debug)]
193enum Request {
194 CreateFork(Box<CreateFork>, CreateSender),
196 GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
198 RollFork(ForkId, u64, CreateSender),
200 GetEnv(ForkId, GetEnvSender),
202 UpdateBlock(ForkId, U256, U256),
204 ShutDown(OneshotSender<()>),
206 GetForkUrl(ForkId, OneshotSender<Option<String>>),
208}
209
210enum ForkTask {
211 Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
213}
214
215#[must_use = "futures do nothing unless polled"]
217pub struct MultiForkHandler {
218 incoming: Fuse<Receiver<Request>>,
220
221 handlers: Vec<(ForkId, Handler)>,
225
226 pending_tasks: Vec<ForkTask>,
228
229 forks: HashMap<ForkId, CreatedFork>,
234
235 flush_cache_interval: Option<tokio::time::Interval>,
237}
238
239impl MultiForkHandler {
240 fn new(incoming: Receiver<Request>) -> Self {
241 Self {
242 incoming: incoming.fuse(),
243 handlers: Default::default(),
244 pending_tasks: Default::default(),
245 forks: Default::default(),
246 flush_cache_interval: None,
247 }
248 }
249
250 pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
252 self.flush_cache_interval =
253 Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
254 self
255 }
256
257 #[expect(irrefutable_let_patterns)]
259 fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
260 for task in &mut self.pending_tasks {
261 if let ForkTask::Create(_, in_progress, _, additional) = task {
262 if in_progress == id {
263 return Some(additional);
264 }
265 }
266 }
267 None
268 }
269
270 fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
271 let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
272 trace!(?fork_id, "created new forkId");
273
274 if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
276 in_progress.push(sender);
277 return;
278 }
279
280 let task = Box::pin(create_fork(fork));
282 self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
283 }
284
285 fn insert_new_fork(
286 &mut self,
287 fork_id: ForkId,
288 fork: CreatedFork,
289 sender: CreateSender,
290 additional_senders: Vec<CreateSender>,
291 ) {
292 self.forks.insert(fork_id.clone(), fork.clone());
293 let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
294
295 for sender in additional_senders {
297 let next_fork_id = fork.inc_senders(fork_id.clone());
298 self.forks.insert(next_fork_id.clone(), fork.clone());
299 let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
300 }
301 }
302
303 fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
306 if let Some(fork) = self.forks.get_mut(&fork_id) {
307 fork.opts.env.block.number = block_number;
308 fork.opts.env.block.timestamp = block_timestamp;
309 }
310 }
311
312 fn on_request(&mut self, req: Request) {
313 match req {
314 Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
315 Request::GetFork(fork_id, sender) => {
316 let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
317 let _ = sender.send(fork);
318 }
319 Request::RollFork(fork_id, block, sender) => {
320 if let Some(fork) = self.forks.get(&fork_id) {
321 trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
322 let mut opts = fork.opts.clone();
323 opts.evm_opts.fork_block_number = Some(block);
324 self.create_fork(opts, sender)
325 } else {
326 let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
327 }
328 }
329 Request::GetEnv(fork_id, sender) => {
330 let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
331 }
332 Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
333 self.update_block(fork_id, block_number, block_timestamp);
334 }
335 Request::ShutDown(sender) => {
336 trace!(target: "fork::multi", "received shutdown signal");
337 self.forks.clear();
339 self.handlers.clear();
340 let _ = sender.send(());
341 }
342 Request::GetForkUrl(fork_id, sender) => {
343 let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
344 let _ = sender.send(fork);
345 }
346 }
347 }
348}
349
350impl Future for MultiForkHandler {
353 type Output = ();
354
355 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
356 let pin = self.get_mut();
357
358 loop {
360 match Pin::new(&mut pin.incoming).poll_next(cx) {
361 Poll::Ready(Some(req)) => {
362 pin.on_request(req);
363 }
364 Poll::Ready(None) => {
365 trace!(target: "fork::multi", "request channel closed");
367 break;
368 }
369 Poll::Pending => break,
370 }
371 }
372
373 for n in (0..pin.pending_tasks.len()).rev() {
375 let task = pin.pending_tasks.swap_remove(n);
376 match task {
377 ForkTask::Create(mut fut, id, sender, additional_senders) => {
378 if let Poll::Ready(resp) = fut.poll_unpin(cx) {
379 match resp {
380 Ok((fork_id, fork, handler)) => {
381 if let Some(fork) = pin.forks.get(&fork_id).cloned() {
382 pin.insert_new_fork(
383 fork.inc_senders(fork_id),
384 fork,
385 sender,
386 additional_senders,
387 );
388 } else {
389 pin.handlers.push((fork_id.clone(), handler));
390 pin.insert_new_fork(fork_id, fork, sender, additional_senders);
391 }
392 }
393 Err(err) => {
394 let _ = sender.send(Err(eyre::eyre!("{err}")));
395 for sender in additional_senders {
396 let _ = sender.send(Err(eyre::eyre!("{err}")));
397 }
398 }
399 }
400 } else {
401 pin.pending_tasks.push(ForkTask::Create(
402 fut,
403 id,
404 sender,
405 additional_senders,
406 ));
407 }
408 }
409 }
410 }
411
412 for n in (0..pin.handlers.len()).rev() {
414 let (id, mut handler) = pin.handlers.swap_remove(n);
415 match handler.poll_unpin(cx) {
416 Poll::Ready(_) => {
417 trace!(target: "fork::multi", "fork {:?} completed", id);
418 }
419 Poll::Pending => {
420 pin.handlers.push((id, handler));
421 }
422 }
423 }
424
425 if pin.handlers.is_empty() && pin.incoming.is_done() {
426 trace!(target: "fork::multi", "completed");
427 return Poll::Ready(());
428 }
429
430 if pin
432 .flush_cache_interval
433 .as_mut()
434 .map(|interval| interval.poll_tick(cx).is_ready())
435 .unwrap_or_default() &&
436 !pin.forks.is_empty()
437 {
438 trace!(target: "fork::multi", "tick flushing caches");
439 let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
440 std::thread::Builder::new()
442 .name("flusher".into())
443 .spawn(move || {
444 forks.into_iter().for_each(|fork| fork.flush_cache());
445 })
446 .expect("failed to spawn thread");
447 }
448
449 Poll::Pending
450 }
451}
452
453#[derive(Debug, Clone)]
455struct CreatedFork {
456 opts: CreateFork,
458 backend: SharedBackend,
460 num_senders: Arc<AtomicUsize>,
463}
464
465impl CreatedFork {
466 pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
467 Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
468 }
469
470 fn inc_senders(&self, fork_id: ForkId) -> ForkId {
472 format!(
473 "{}-{}",
474 fork_id.as_str(),
475 self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
476 )
477 .into()
478 }
479}
480
481#[derive(Debug)]
488struct ShutDownMultiFork {
489 handler: Option<Sender<Request>>,
490}
491
492impl Drop for ShutDownMultiFork {
493 fn drop(&mut self) {
494 trace!(target: "fork::multi", "initiating shutdown");
495 let (sender, rx) = oneshot_channel();
496 let req = Request::ShutDown(sender);
497 if let Some(mut handler) = self.handler.take() {
498 if handler.try_send(req).is_ok() {
499 let _ = rx.recv();
500 trace!(target: "fork::cache", "multifork backend shutdown");
501 }
502 }
503 }
504}
505
506async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
510 let provider = Arc::new(
511 ProviderBuilder::new(fork.url.as_str())
512 .maybe_max_retry(fork.evm_opts.fork_retries)
513 .maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
514 .maybe_headers(fork.evm_opts.fork_headers.clone())
515 .compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
516 .build()?,
517 );
518
519 let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
521 fork.env = env;
522 let meta = BlockchainDbMeta::new(fork.env.clone(), fork.url.clone());
523
524 let number = block.header().number();
527
528 let cache_path = if fork.enable_caching {
530 Config::foundry_block_cache_dir(meta.cfg_env.chain_id, number)
531 } else {
532 None
533 };
534
535 let db = BlockchainDb::new(meta, cache_path);
536 let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
537 let fork = CreatedFork::new(fork, backend);
538 let fork_id = ForkId::new(&fork.opts.url, number.into());
539
540 Ok((fork_id, fork, handler))
541}