anvil/tasks/
block_listener.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//! A task that listens for new blocks

use crate::shutdown::Shutdown;
use futures::{FutureExt, Stream, StreamExt};
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

/// A Future that will execute a given `task` for each new block that
pub struct BlockListener<St, F, Fut> {
    stream: St,
    task_factory: F,
    task: Option<Pin<Box<Fut>>>,
    on_shutdown: Shutdown,
}

impl<St, F, Fut> BlockListener<St, F, Fut>
where
    St: Stream,
    F: Fn(<St as Stream>::Item) -> Fut,
{
    pub fn new(on_shutdown: Shutdown, block_stream: St, task_factory: F) -> Self {
        Self { stream: block_stream, task_factory, task: None, on_shutdown }
    }
}

impl<St, F, Fut> Future for BlockListener<St, F, Fut>
where
    St: Stream + Unpin,
    F: Fn(<St as Stream>::Item) -> Fut + Unpin + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let pin = self.get_mut();

        if pin.on_shutdown.poll_unpin(cx).is_ready() {
            return Poll::Ready(())
        }

        let mut block = None;
        // drain the stream
        while let Poll::Ready(maybe_block) = pin.stream.poll_next_unpin(cx) {
            if maybe_block.is_none() {
                // stream complete
                return Poll::Ready(())
            }
            block = maybe_block;
        }

        if let Some(block) = block {
            pin.task = Some(Box::pin((pin.task_factory)(block)));
        }

        if let Some(mut task) = pin.task.take() {
            if task.poll_unpin(cx).is_pending() {
                pin.task = Some(task);
            }
        }
        Poll::Pending
    }
}