anvil/tasks/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//! Task management support

#![allow(rustdoc::private_doc_tests)]

use crate::{shutdown::Shutdown, tasks::block_listener::BlockListener, EthApi};
use alloy_network::{AnyHeader, AnyNetwork};
use alloy_primitives::B256;
use alloy_provider::Provider;
use alloy_rpc_types::anvil::Forking;
use alloy_transport::Transport;
use futures::StreamExt;
use std::{fmt, future::Future};
use tokio::{runtime::Handle, task::JoinHandle};

pub mod block_listener;

/// A helper struct for managing additional tokio tasks.
#[derive(Clone)]
pub struct TaskManager {
    /// Tokio runtime handle that's used to spawn futures, See [tokio::runtime::Handle].
    tokio_handle: Handle,
    /// A receiver for the shutdown signal
    on_shutdown: Shutdown,
}

impl TaskManager {
    /// Creates a new instance of the task manager
    pub fn new(tokio_handle: Handle, on_shutdown: Shutdown) -> Self {
        Self { tokio_handle, on_shutdown }
    }

    /// Returns a receiver for the shutdown event
    pub fn on_shutdown(&self) -> Shutdown {
        self.on_shutdown.clone()
    }

    /// Spawns the given task.
    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()> {
        self.tokio_handle.spawn(task)
    }

    /// Spawns the blocking task.
    pub fn spawn_blocking(&self, task: impl Future<Output = ()> + Send + 'static) {
        let handle = self.tokio_handle.clone();
        self.tokio_handle.spawn_blocking(move || {
            handle.block_on(task);
        });
    }

    /// Spawns a new task that listens for new blocks and resets the forked provider for every new
    /// block
    ///
    /// ```
    /// use alloy_network::Ethereum;
    /// use alloy_provider::RootProvider;
    /// use anvil::{spawn, NodeConfig};
    ///
    /// # async fn t() {
    /// let endpoint = "http://....";
    /// let (api, handle) = spawn(NodeConfig::default().with_eth_rpc_url(Some(endpoint))).await;
    ///
    /// let provider = RootProvider::connect_builtin(endpoint).await.unwrap();
    ///
    /// handle.task_manager().spawn_reset_on_new_polled_blocks(provider, api);
    /// # }
    /// ```
    pub fn spawn_reset_on_new_polled_blocks<P, T>(&self, provider: P, api: EthApi)
    where
        P: Provider<T, AnyNetwork> + Clone + Unpin + 'static,
        T: Transport + Clone,
    {
        self.spawn_block_poll_listener(provider.clone(), move |hash| {
            let provider = provider.clone();
            let api = api.clone();
            async move {
                if let Ok(Some(block)) = provider.get_block(hash.into(), false.into()).await {
                    let _ = api
                        .anvil_reset(Some(Forking {
                            json_rpc_url: None,
                            block_number: Some(block.header.number),
                        }))
                        .await;
                }
            }
        })
    }

    /// Spawns a new [`BlockListener`] task that listens for new blocks (poll-based) See also
    /// [`Provider::watch_blocks`] and executes the future the `task_factory` returns for the new
    /// block hash
    pub fn spawn_block_poll_listener<P, T, F, Fut>(&self, provider: P, task_factory: F)
    where
        P: Provider<T, AnyNetwork> + 'static,
        T: Transport + Clone,
        F: Fn(B256) -> Fut + Unpin + Send + Sync + 'static,
        Fut: Future<Output = ()> + Send,
    {
        let shutdown = self.on_shutdown.clone();
        self.spawn(async move {
            let blocks = provider
                .watch_blocks()
                .await
                .unwrap()
                .into_stream()
                .flat_map(futures::stream::iter);
            BlockListener::new(shutdown, blocks, task_factory).await;
        });
    }

    /// Spawns a new task that listens for new blocks and resets the forked provider for every new
    /// block
    ///
    /// ```
    /// use alloy_network::Ethereum;
    /// use alloy_provider::RootProvider;
    /// use anvil::{spawn, NodeConfig};
    ///
    /// # async fn t() {
    /// let (api, handle) = spawn(NodeConfig::default().with_eth_rpc_url(Some("http://...."))).await;
    ///
    /// let provider = RootProvider::connect_builtin("ws://...").await.unwrap();
    ///
    /// handle.task_manager().spawn_reset_on_subscribed_blocks(provider, api);
    ///
    /// # }
    /// ```
    pub fn spawn_reset_on_subscribed_blocks<P, T>(&self, provider: P, api: EthApi)
    where
        P: Provider<T, AnyNetwork> + 'static,
        T: Transport + Clone,
    {
        self.spawn_block_subscription(provider, move |header| {
            let api = api.clone();
            async move {
                let _ = api
                    .anvil_reset(Some(Forking {
                        json_rpc_url: None,
                        block_number: Some(header.number),
                    }))
                    .await;
            }
        })
    }

    /// Spawns a new [`BlockListener`] task that listens for new blocks (via subscription) See also
    /// [`Provider::subscribe_blocks()`] and executes the future the `task_factory` returns for the
    /// new block hash
    pub fn spawn_block_subscription<P, T, F, Fut>(&self, provider: P, task_factory: F)
    where
        P: Provider<T, AnyNetwork> + 'static,
        T: Transport + Clone,
        F: Fn(alloy_rpc_types::Header<AnyHeader>) -> Fut + Unpin + Send + Sync + 'static,
        Fut: Future<Output = ()> + Send,
    {
        let shutdown = self.on_shutdown.clone();
        self.spawn(async move {
            let blocks = provider.subscribe_blocks().await.unwrap().into_stream();
            BlockListener::new(shutdown, blocks, task_factory).await;
        });
    }
}

impl fmt::Debug for TaskManager {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("TaskManager").finish_non_exhaustive()
    }
}