1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use crate::thr::wake::WakeRoot;
use core::{
future::Future,
iter::FusedIterator,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::stream::Stream;
pub trait FutureRootExt: Future {
fn root_wait(self) -> Self::Output;
}
pub trait StreamRootExt<'a>: Stream {
fn root_wait(self) -> StreamRootWait<'a, Self>
where
Self: Sized;
}
pub struct StreamRootWait<'a, T: Stream> {
stream: T,
exhausted: bool,
_marker: PhantomData<&'a &'a mut ()>,
}
impl<T: Future> FutureRootExt for T {
fn root_wait(mut self) -> Self::Output {
let waker = WakeRoot::new().to_waker();
let mut cx = Context::from_waker(&waker);
loop {
match unsafe { Pin::new_unchecked(&mut self) }.poll(&mut cx) {
Poll::Pending => WakeRoot::wait(),
Poll::Ready(value) => break value,
}
}
}
}
impl<'a, T: Stream> StreamRootExt<'a> for T {
#[inline]
fn root_wait(self) -> StreamRootWait<'a, Self>
where
Self: Sized,
{
StreamRootWait {
stream: self,
exhausted: false,
_marker: PhantomData,
}
}
}
impl<'a, T: Stream> Iterator for StreamRootWait<'a, T> {
type Item = T::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.exhausted {
return None;
}
let waker = WakeRoot::new().to_waker();
let mut cx = Context::from_waker(&waker);
loop {
match unsafe { Pin::new_unchecked(&mut self.stream) }.poll_next(&mut cx) {
Poll::Pending => WakeRoot::wait(),
Poll::Ready(Some(item)) => break Some(item),
Poll::Ready(None) => {
self.exhausted = true;
break None;
}
}
}
}
}
impl<'a, T: Stream> FusedIterator for StreamRootWait<'a, T> {}