anvil/tasks/
block_listener.rs

1//! A task that listens for new blocks
2
3use crate::shutdown::Shutdown;
4use futures::{FutureExt, Stream, StreamExt};
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/// A Future that will execute a given `task` for each new block that arrives on the stream.
11pub struct BlockListener<St, F, Fut> {
12    stream: St,
13    task_factory: F,
14    task: Option<Pin<Box<Fut>>>,
15    on_shutdown: Shutdown,
16}
17
18impl<St, F, Fut> BlockListener<St, F, Fut>
19where
20    St: Stream,
21    F: Fn(<St as Stream>::Item) -> Fut,
22{
23    pub fn new(on_shutdown: Shutdown, block_stream: St, task_factory: F) -> Self {
24        Self { stream: block_stream, task_factory, task: None, on_shutdown }
25    }
26}
27
28impl<St, F, Fut> Future for BlockListener<St, F, Fut>
29where
30    St: Stream + Unpin,
31    F: Fn(<St as Stream>::Item) -> Fut + Unpin + Send + Sync + 'static,
32    Fut: Future<Output = ()> + Send,
33{
34    type Output = ();
35
36    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37        let pin = self.get_mut();
38
39        if pin.on_shutdown.poll_unpin(cx).is_ready() {
40            return Poll::Ready(());
41        }
42
43        let mut block = None;
44        // drain the stream
45        while let Poll::Ready(maybe_block) = pin.stream.poll_next_unpin(cx) {
46            if maybe_block.is_none() {
47                // stream complete
48                return Poll::Ready(());
49            }
50            block = maybe_block;
51        }
52
53        if let Some(block) = block {
54            pin.task = Some(Box::pin((pin.task_factory)(block)));
55        }
56
57        if let Some(mut task) = pin.task.take()
58            && task.poll_unpin(cx).is_pending()
59        {
60            pin.task = Some(task);
61        }
62        Poll::Pending
63    }
64}