use super::CreateFork;
use alloy_consensus::BlockHeader;
use alloy_primitives::{map::HashMap, U256};
use alloy_provider::network::BlockResponse;
use alloy_transport::layers::RetryBackoffService;
use foundry_common::provider::{
runtime_transport::RuntimeTransport, ProviderBuilder, RetryProvider,
};
use foundry_config::Config;
use foundry_fork_db::{cache::BlockchainDbMeta, BackendHandler, BlockchainDb, SharedBackend};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
stream::{Fuse, Stream},
task::{Context, Poll},
Future, FutureExt, StreamExt,
};
use revm::primitives::Env;
use std::{
fmt::{self, Write},
pin::Pin,
sync::{
atomic::AtomicUsize,
mpsc::{channel as oneshot_channel, Sender as OneshotSender},
Arc,
},
time::Duration,
};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ForkId(pub String);
impl ForkId {
pub fn new(url: &str, num: Option<u64>) -> Self {
let mut id = url.to_string();
id.push('@');
match num {
Some(n) => write!(id, "{n:#x}").unwrap(),
None => id.push_str("latest"),
}
Self(id)
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for ForkId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl<T: Into<String>> From<T> for ForkId {
fn from(id: T) -> Self {
Self(id.into())
}
}
#[derive(Clone, Debug)]
#[must_use]
pub struct MultiFork {
handler: Sender<Request>,
_shutdown: Arc<ShutDownMultiFork>,
}
impl MultiFork {
pub fn spawn() -> Self {
trace!(target: "fork::multi", "spawning multifork");
let (fork, mut handler) = Self::new();
std::thread::Builder::new()
.name("multi-fork-backend".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime");
rt.block_on(async move {
handler.set_flush_cache_interval(Duration::from_secs(60));
handler.await
});
})
.expect("failed to spawn thread");
trace!(target: "fork::multi", "spawned MultiForkHandler thread");
fork
}
#[doc(hidden)]
pub fn new() -> (Self, MultiForkHandler) {
let (handler, handler_rx) = channel(1);
let _shutdown = Arc::new(ShutDownMultiFork { handler: Some(handler.clone()) });
(Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
}
pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
let (sender, rx) = oneshot_channel();
let req = Request::CreateFork(Box::new(fork), sender);
self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
rx.recv()?
}
pub fn roll_fork(
&self,
fork: ForkId,
block: u64,
) -> eyre::Result<(ForkId, SharedBackend, Env)> {
trace!(?fork, ?block, "rolling fork");
let (sender, rx) = oneshot_channel();
let req = Request::RollFork(fork, block, sender);
self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
rx.recv()?
}
pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
trace!(?fork, "getting env config");
let (sender, rx) = oneshot_channel();
let req = Request::GetEnv(fork, sender);
self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
Ok(rx.recv()?)
}
pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
trace!(?fork, ?number, ?timestamp, "update fork block");
self.handler
.clone()
.try_send(Request::UpdateBlock(fork, number, timestamp))
.map_err(|e| eyre::eyre!("{:?}", e))
}
pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
let id = id.into();
trace!(?id, "get fork backend");
let (sender, rx) = oneshot_channel();
let req = Request::GetFork(id, sender);
self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
Ok(rx.recv()?)
}
pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
let (sender, rx) = oneshot_channel();
let req = Request::GetForkUrl(id.into(), sender);
self.handler.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))?;
Ok(rx.recv()?)
}
}
type Handler = BackendHandler<RetryBackoffService<RuntimeTransport>, Arc<RetryProvider>>;
type CreateFuture =
Pin<Box<dyn Future<Output = eyre::Result<(ForkId, CreatedFork, Handler)>> + Send>>;
type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
type GetEnvSender = OneshotSender<Option<Env>>;
#[derive(Debug)]
enum Request {
CreateFork(Box<CreateFork>, CreateSender),
GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
RollFork(ForkId, u64, CreateSender),
GetEnv(ForkId, GetEnvSender),
UpdateBlock(ForkId, U256, U256),
ShutDown(OneshotSender<()>),
GetForkUrl(ForkId, OneshotSender<Option<String>>),
}
enum ForkTask {
Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
}
#[must_use = "futures do nothing unless polled"]
pub struct MultiForkHandler {
incoming: Fuse<Receiver<Request>>,
handlers: Vec<(ForkId, Handler)>,
pending_tasks: Vec<ForkTask>,
forks: HashMap<ForkId, CreatedFork>,
flush_cache_interval: Option<tokio::time::Interval>,
}
impl MultiForkHandler {
fn new(incoming: Receiver<Request>) -> Self {
Self {
incoming: incoming.fuse(),
handlers: Default::default(),
pending_tasks: Default::default(),
forks: Default::default(),
flush_cache_interval: None,
}
}
pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
self.flush_cache_interval =
Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
self
}
fn find_in_progress_task(&mut self, id: &ForkId) -> Option<&mut Vec<CreateSender>> {
for task in self.pending_tasks.iter_mut() {
#[allow(irrefutable_let_patterns)]
if let ForkTask::Create(_, in_progress, _, additional) = task {
if in_progress == id {
return Some(additional);
}
}
}
None
}
fn create_fork(&mut self, fork: CreateFork, sender: CreateSender) {
let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
trace!(?fork_id, "created new forkId");
if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
in_progress.push(sender);
return;
}
let task = Box::pin(create_fork(fork));
self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
}
fn insert_new_fork(
&mut self,
fork_id: ForkId,
fork: CreatedFork,
sender: CreateSender,
additional_senders: Vec<CreateSender>,
) {
self.forks.insert(fork_id.clone(), fork.clone());
let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));
for sender in additional_senders {
let next_fork_id = fork.inc_senders(fork_id.clone());
self.forks.insert(next_fork_id.clone(), fork.clone());
let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
}
}
fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
if let Some(fork) = self.forks.get_mut(&fork_id) {
fork.opts.env.block.number = block_number;
fork.opts.env.block.timestamp = block_timestamp;
}
}
fn on_request(&mut self, req: Request) {
match req {
Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
Request::GetFork(fork_id, sender) => {
let fork = self.forks.get(&fork_id).map(|f| f.backend.clone());
let _ = sender.send(fork);
}
Request::RollFork(fork_id, block, sender) => {
if let Some(fork) = self.forks.get(&fork_id) {
trace!(target: "fork::multi", "rolling {} to {}", fork_id, block);
let mut opts = fork.opts.clone();
opts.evm_opts.fork_block_number = Some(block);
self.create_fork(opts, sender)
} else {
let _ = sender.send(Err(eyre::eyre!("No matching fork exits for {}", fork_id)));
}
}
Request::GetEnv(fork_id, sender) => {
let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
}
Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
self.update_block(fork_id, block_number, block_timestamp);
}
Request::ShutDown(sender) => {
trace!(target: "fork::multi", "received shutdown signal");
self.forks.clear();
self.handlers.clear();
let _ = sender.send(());
}
Request::GetForkUrl(fork_id, sender) => {
let fork = self.forks.get(&fork_id).map(|f| f.opts.url.clone());
let _ = sender.send(fork);
}
}
}
}
impl Future for MultiForkHandler {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
loop {
match Pin::new(&mut pin.incoming).poll_next(cx) {
Poll::Ready(Some(req)) => {
pin.on_request(req);
}
Poll::Ready(None) => {
trace!(target: "fork::multi", "request channel closed");
break;
}
Poll::Pending => break,
}
}
for n in (0..pin.pending_tasks.len()).rev() {
let task = pin.pending_tasks.swap_remove(n);
match task {
ForkTask::Create(mut fut, id, sender, additional_senders) => {
if let Poll::Ready(resp) = fut.poll_unpin(cx) {
match resp {
Ok((fork_id, fork, handler)) => {
if let Some(fork) = pin.forks.get(&fork_id).cloned() {
pin.insert_new_fork(
fork.inc_senders(fork_id),
fork,
sender,
additional_senders,
);
} else {
pin.handlers.push((fork_id.clone(), handler));
pin.insert_new_fork(fork_id, fork, sender, additional_senders);
}
}
Err(err) => {
let _ = sender.send(Err(eyre::eyre!("{err}")));
for sender in additional_senders {
let _ = sender.send(Err(eyre::eyre!("{err}")));
}
}
}
} else {
pin.pending_tasks.push(ForkTask::Create(
fut,
id,
sender,
additional_senders,
));
}
}
}
}
for n in (0..pin.handlers.len()).rev() {
let (id, mut handler) = pin.handlers.swap_remove(n);
match handler.poll_unpin(cx) {
Poll::Ready(_) => {
trace!(target: "fork::multi", "fork {:?} completed", id);
}
Poll::Pending => {
pin.handlers.push((id, handler));
}
}
}
if pin.handlers.is_empty() && pin.incoming.is_done() {
trace!(target: "fork::multi", "completed");
return Poll::Ready(());
}
if pin
.flush_cache_interval
.as_mut()
.map(|interval| interval.poll_tick(cx).is_ready())
.unwrap_or_default() &&
!pin.forks.is_empty()
{
trace!(target: "fork::multi", "tick flushing caches");
let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
std::thread::Builder::new()
.name("flusher".into())
.spawn(move || {
forks.into_iter().for_each(|fork| fork.flush_cache());
})
.expect("failed to spawn thread");
}
Poll::Pending
}
}
#[derive(Debug, Clone)]
struct CreatedFork {
opts: CreateFork,
backend: SharedBackend,
num_senders: Arc<AtomicUsize>,
}
impl CreatedFork {
pub fn new(opts: CreateFork, backend: SharedBackend) -> Self {
Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
}
fn inc_senders(&self, fork_id: ForkId) -> ForkId {
format!(
"{}-{}",
fork_id.as_str(),
self.num_senders.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
)
.into()
}
}
#[derive(Debug)]
struct ShutDownMultiFork {
handler: Option<Sender<Request>>,
}
impl Drop for ShutDownMultiFork {
fn drop(&mut self) {
trace!(target: "fork::multi", "initiating shutdown");
let (sender, rx) = oneshot_channel();
let req = Request::ShutDown(sender);
if let Some(mut handler) = self.handler.take() {
if handler.try_send(req).is_ok() {
let _ = rx.recv();
trace!(target: "fork::cache", "multifork backend shutdown");
}
}
}
}
async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
let provider = Arc::new(
ProviderBuilder::new(fork.url.as_str())
.maybe_max_retry(fork.evm_opts.fork_retries)
.maybe_initial_backoff(fork.evm_opts.fork_retry_backoff)
.compute_units_per_second(fork.evm_opts.get_compute_units_per_second())
.build()?,
);
let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
fork.env = env;
let meta = BlockchainDbMeta::new(fork.env.clone(), fork.url.clone());
let number = block.header().number();
let cache_path = if fork.enable_caching {
Config::foundry_block_cache_dir(meta.cfg_env.chain_id, number)
} else {
None
};
let db = BlockchainDb::new(meta, cache_path);
let (backend, handler) = SharedBackend::new(provider, db, Some(number.into()));
let fork = CreatedFork::new(fork, backend);
let fork_id = ForkId::new(&fork.opts.url, number.into());
Ok((fork_id, fork, handler))
}