anvil/tasks/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
//! Task management support
#![allow(rustdoc::private_doc_tests)]
use crate::{shutdown::Shutdown, tasks::block_listener::BlockListener, EthApi};
use alloy_network::{AnyHeader, AnyNetwork};
use alloy_primitives::B256;
use alloy_provider::Provider;
use alloy_rpc_types::anvil::Forking;
use alloy_transport::Transport;
use futures::StreamExt;
use std::{fmt, future::Future};
use tokio::{runtime::Handle, task::JoinHandle};
pub mod block_listener;
/// A helper struct for managing additional tokio tasks.
#[derive(Clone)]
pub struct TaskManager {
/// Tokio runtime handle that's used to spawn futures, See [tokio::runtime::Handle].
tokio_handle: Handle,
/// A receiver for the shutdown signal
on_shutdown: Shutdown,
}
impl TaskManager {
/// Creates a new instance of the task manager
pub fn new(tokio_handle: Handle, on_shutdown: Shutdown) -> Self {
Self { tokio_handle, on_shutdown }
}
/// Returns a receiver for the shutdown event
pub fn on_shutdown(&self) -> Shutdown {
self.on_shutdown.clone()
}
/// Spawns the given task.
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()> {
self.tokio_handle.spawn(task)
}
/// Spawns the blocking task.
pub fn spawn_blocking(&self, task: impl Future<Output = ()> + Send + 'static) {
let handle = self.tokio_handle.clone();
self.tokio_handle.spawn_blocking(move || {
handle.block_on(task);
});
}
/// Spawns a new task that listens for new blocks and resets the forked provider for every new
/// block
///
/// ```
/// use alloy_network::Ethereum;
/// use alloy_provider::RootProvider;
/// use anvil::{spawn, NodeConfig};
///
/// # async fn t() {
/// let endpoint = "http://....";
/// let (api, handle) = spawn(NodeConfig::default().with_eth_rpc_url(Some(endpoint))).await;
///
/// let provider = RootProvider::connect_builtin(endpoint).await.unwrap();
///
/// handle.task_manager().spawn_reset_on_new_polled_blocks(provider, api);
/// # }
/// ```
pub fn spawn_reset_on_new_polled_blocks<P, T>(&self, provider: P, api: EthApi)
where
P: Provider<T, AnyNetwork> + Clone + Unpin + 'static,
T: Transport + Clone,
{
self.spawn_block_poll_listener(provider.clone(), move |hash| {
let provider = provider.clone();
let api = api.clone();
async move {
if let Ok(Some(block)) = provider.get_block(hash.into(), false.into()).await {
let _ = api
.anvil_reset(Some(Forking {
json_rpc_url: None,
block_number: Some(block.header.number),
}))
.await;
}
}
})
}
/// Spawns a new [`BlockListener`] task that listens for new blocks (poll-based) See also
/// [`Provider::watch_blocks`] and executes the future the `task_factory` returns for the new
/// block hash
pub fn spawn_block_poll_listener<P, T, F, Fut>(&self, provider: P, task_factory: F)
where
P: Provider<T, AnyNetwork> + 'static,
T: Transport + Clone,
F: Fn(B256) -> Fut + Unpin + Send + Sync + 'static,
Fut: Future<Output = ()> + Send,
{
let shutdown = self.on_shutdown.clone();
self.spawn(async move {
let blocks = provider
.watch_blocks()
.await
.unwrap()
.into_stream()
.flat_map(futures::stream::iter);
BlockListener::new(shutdown, blocks, task_factory).await;
});
}
/// Spawns a new task that listens for new blocks and resets the forked provider for every new
/// block
///
/// ```
/// use alloy_network::Ethereum;
/// use alloy_provider::RootProvider;
/// use anvil::{spawn, NodeConfig};
///
/// # async fn t() {
/// let (api, handle) = spawn(NodeConfig::default().with_eth_rpc_url(Some("http://...."))).await;
///
/// let provider = RootProvider::connect_builtin("ws://...").await.unwrap();
///
/// handle.task_manager().spawn_reset_on_subscribed_blocks(provider, api);
///
/// # }
/// ```
pub fn spawn_reset_on_subscribed_blocks<P, T>(&self, provider: P, api: EthApi)
where
P: Provider<T, AnyNetwork> + 'static,
T: Transport + Clone,
{
self.spawn_block_subscription(provider, move |header| {
let api = api.clone();
async move {
let _ = api
.anvil_reset(Some(Forking {
json_rpc_url: None,
block_number: Some(header.number),
}))
.await;
}
})
}
/// Spawns a new [`BlockListener`] task that listens for new blocks (via subscription) See also
/// [`Provider::subscribe_blocks()`] and executes the future the `task_factory` returns for the
/// new block hash
pub fn spawn_block_subscription<P, T, F, Fut>(&self, provider: P, task_factory: F)
where
P: Provider<T, AnyNetwork> + 'static,
T: Transport + Clone,
F: Fn(alloy_rpc_types::Header<AnyHeader>) -> Fut + Unpin + Send + Sync + 'static,
Fut: Future<Output = ()> + Send,
{
let shutdown = self.on_shutdown.clone();
self.spawn(async move {
let blocks = provider.subscribe_blocks().await.unwrap().into_stream();
BlockListener::new(shutdown, blocks, task_factory).await;
});
}
}
impl fmt::Debug for TaskManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskManager").finish_non_exhaustive()
}
}