anvil/tasks/
block_listener.rs

1//! A task that listens for new blocks
2
3use crate::shutdown::Shutdown;
4use futures::{FutureExt, Stream, StreamExt};
5use std::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11/// A Future that will execute a given `task` for each new block that
12pub struct BlockListener<St, F, Fut> {
13    stream: St,
14    task_factory: F,
15    task: Option<Pin<Box<Fut>>>,
16    on_shutdown: Shutdown,
17}
18
19impl<St, F, Fut> BlockListener<St, F, Fut>
20where
21    St: Stream,
22    F: Fn(<St as Stream>::Item) -> Fut,
23{
24    pub fn new(on_shutdown: Shutdown, block_stream: St, task_factory: F) -> Self {
25        Self { stream: block_stream, task_factory, task: None, on_shutdown }
26    }
27}
28
29impl<St, F, Fut> Future for BlockListener<St, F, Fut>
30where
31    St: Stream + Unpin,
32    F: Fn(<St as Stream>::Item) -> Fut + Unpin + Send + Sync + 'static,
33    Fut: Future<Output = ()> + Send,
34{
35    type Output = ();
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let pin = self.get_mut();
39
40        if pin.on_shutdown.poll_unpin(cx).is_ready() {
41            return Poll::Ready(())
42        }
43
44        let mut block = None;
45        // drain the stream
46        while let Poll::Ready(maybe_block) = pin.stream.poll_next_unpin(cx) {
47            if maybe_block.is_none() {
48                // stream complete
49                return Poll::Ready(())
50            }
51            block = maybe_block;
52        }
53
54        if let Some(block) = block {
55            pin.task = Some(Box::pin((pin.task_factory)(block)));
56        }
57
58        if let Some(mut task) = pin.task.take() {
59            if task.poll_unpin(cx).is_pending() {
60                pin.task = Some(task);
61            }
62        }
63        Poll::Pending
64    }
65}