1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; use futures::executor::{self, Notify}; use tokio_executor::{enter, EnterError}; pub(crate) fn timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Item, Waited<F::Error>> where F: Future, { let mut spawn = executor::spawn(fut); block_on(timeout, |notify| { spawn.poll_future_notify(notify, 0) }) } pub(crate) fn stream<S>(stream: S, timeout: Option<Duration>) -> WaitStream<S> where S: Stream { WaitStream { stream: executor::spawn(stream), timeout: timeout, } } #[derive(Debug)] pub(crate) enum Waited<E> { TimedOut, Executor(EnterError), Inner(E), } impl<E> From<E> for Waited<E> { fn from(err: E) -> Waited<E> { Waited::Inner(err) } } pub(crate) struct WaitStream<S> { stream: executor::Spawn<S>, timeout: Option<Duration>, } impl<S> Iterator for WaitStream<S> where S: Stream { type Item = Result<S::Item, Waited<S::Error>>; fn next(&mut self) -> Option<Self::Item> { let res = block_on(self.timeout, |notify| { self.stream.poll_stream_notify(notify, 0) }); match res { Ok(Some(val)) => Some(Ok(val)), Ok(None) => None, Err(err) => Some(Err(err)), } } } struct ThreadNotify { thread: thread::Thread, } impl Notify for ThreadNotify { fn notify(&self, _id: usize) { self.thread.unpark(); } } fn block_on<F, U, E>(timeout: Option<Duration>, mut poll: F) -> Result<U, Waited<E>> where F: FnMut(&Arc<ThreadNotify>) -> Poll<U, E>, { let _entered = enter().map_err(Waited::Executor)?; let deadline = timeout.map(|d| { Instant::now() + d }); let notify = Arc::new(ThreadNotify { thread: thread::current(), }); loop { match poll(¬ify)? { Async::Ready(val) => return Ok(val), Async::NotReady => {} } if let Some(deadline) = deadline { let now = Instant::now(); if now >= deadline { return Err(Waited::TimedOut); } thread::park_timeout(deadline - now); } else { thread::park(); } } }