anvil/tasks/
block_listener.rs1use crate::shutdown::Shutdown;
4use futures::{FutureExt, Stream, StreamExt};
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10pub struct BlockListener<St, F, Fut> {
12 stream: St,
13 task_factory: F,
14 task: Option<Pin<Box<Fut>>>,
15 on_shutdown: Shutdown,
16}
17
18impl<St, F, Fut> BlockListener<St, F, Fut>
19where
20 St: Stream,
21 F: Fn(<St as Stream>::Item) -> Fut,
22{
23 pub fn new(on_shutdown: Shutdown, block_stream: St, task_factory: F) -> Self {
24 Self { stream: block_stream, task_factory, task: None, on_shutdown }
25 }
26}
27
28impl<St, F, Fut> Future for BlockListener<St, F, Fut>
29where
30 St: Stream + Unpin,
31 F: Fn(<St as Stream>::Item) -> Fut + Unpin + Send + Sync + 'static,
32 Fut: Future<Output = ()> + Send,
33{
34 type Output = ();
35
36 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37 let pin = self.get_mut();
38
39 if pin.on_shutdown.poll_unpin(cx).is_ready() {
40 return Poll::Ready(());
41 }
42
43 let mut block = None;
44 while let Poll::Ready(maybe_block) = pin.stream.poll_next_unpin(cx) {
46 if maybe_block.is_none() {
47 return Poll::Ready(());
49 }
50 block = maybe_block;
51 }
52
53 if let Some(block) = block {
54 pin.task = Some(Box::pin((pin.task_factory)(block)));
55 }
56
57 if let Some(mut task) = pin.task.take()
58 && task.poll_unpin(cx).is_pending()
59 {
60 pin.task = Some(task);
61 }
62 Poll::Pending
63 }
64}