anvil/tasks/
block_listener.rs
1use crate::shutdown::Shutdown;
4use futures::{FutureExt, Stream, StreamExt};
5use std::{
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11pub struct BlockListener<St, F, Fut> {
13 stream: St,
14 task_factory: F,
15 task: Option<Pin<Box<Fut>>>,
16 on_shutdown: Shutdown,
17}
18
19impl<St, F, Fut> BlockListener<St, F, Fut>
20where
21 St: Stream,
22 F: Fn(<St as Stream>::Item) -> Fut,
23{
24 pub fn new(on_shutdown: Shutdown, block_stream: St, task_factory: F) -> Self {
25 Self { stream: block_stream, task_factory, task: None, on_shutdown }
26 }
27}
28
29impl<St, F, Fut> Future for BlockListener<St, F, Fut>
30where
31 St: Stream + Unpin,
32 F: Fn(<St as Stream>::Item) -> Fut + Unpin + Send + Sync + 'static,
33 Fut: Future<Output = ()> + Send,
34{
35 type Output = ();
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 let pin = self.get_mut();
39
40 if pin.on_shutdown.poll_unpin(cx).is_ready() {
41 return Poll::Ready(())
42 }
43
44 let mut block = None;
45 while let Poll::Ready(maybe_block) = pin.stream.poll_next_unpin(cx) {
47 if maybe_block.is_none() {
48 return Poll::Ready(())
50 }
51 block = maybe_block;
52 }
53
54 if let Some(block) = block {
55 pin.task = Some(Box::pin((pin.task_factory)(block)));
56 }
57
58 if let Some(mut task) = pin.task.take() {
59 if task.poll_unpin(cx).is_pending() {
60 pin.task = Some(task);
61 }
62 }
63 Poll::Pending
64 }
65}