1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use crate::{
fib::{self, Fiber},
sync::spsc::oneshot::{channel, Canceled, Receiver},
thr::prelude::*,
};
use core::{
future::Future,
intrinsics::unreachable,
pin::Pin,
task::{Context, Poll},
};
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FiberFuture<T> {
rx: Receiver<T>,
}
#[marker]
pub trait YieldNone: Send + 'static {}
impl YieldNone for () {}
impl YieldNone for ! {}
impl<T> FiberFuture<T> {
#[inline]
pub fn close(&mut self) {
self.rx.close()
}
}
impl<T> Future for FiberFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let rx = unsafe { self.map_unchecked_mut(|x| &mut x.rx) };
rx.poll(cx).map(|value| match value {
Ok(value) => value,
Err(Canceled) => unsafe { unreachable() },
})
}
}
pub trait ThrFiberFuture: ThrToken {
#[inline]
fn add_future<F, Y, T>(self, fib: F) -> FiberFuture<T>
where
F: Fiber<Input = (), Yield = Y, Return = T>,
Y: YieldNone,
F: Send + 'static,
T: Send + 'static,
{
FiberFuture { rx: add_rx(self, || fib) }
}
#[inline]
fn add_future_factory<C, F, Y, T>(self, factory: C) -> FiberFuture<T>
where
C: FnOnce() -> F + Send + 'static,
F: Fiber<Input = (), Yield = Y, Return = T>,
Y: YieldNone,
F: 'static,
T: Send + 'static,
{
FiberFuture { rx: add_rx(self, factory) }
}
}
#[inline]
fn add_rx<C, H, F, Y, T>(thr: H, factory: C) -> Receiver<T>
where
C: FnOnce() -> F + Send + 'static,
H: ThrToken,
F: Fiber<Input = (), Yield = Y, Return = T>,
Y: YieldNone,
F: 'static,
T: Send + 'static,
{
let (tx, rx) = channel();
thr.add_factory(|| {
let mut fib = factory();
move || {
loop {
if tx.is_canceled() {
break;
}
match unsafe { Pin::new_unchecked(&mut fib) }.resume(()) {
fib::Yielded(_) => {}
fib::Complete(complete) => {
drop(tx.send(complete));
break;
}
}
yield;
}
}
});
rx
}
impl<T: ThrToken> ThrFiberFuture for T {}