From 3dfd53cc86ef73c38a01361384a6f02cf7fef098 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 18 Jul 2018 17:59:31 +0200 Subject: [PATCH] Timeouts now repeat forever (#372) --- substrate/substrate/network-libp2p/src/lib.rs | 1 + .../substrate/network-libp2p/src/service.rs | 7 ++- .../substrate/network-libp2p/src/timeouts.rs | 51 ++++++++++++++----- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/substrate/substrate/network-libp2p/src/lib.rs b/substrate/substrate/network-libp2p/src/lib.rs index b94e7c64d1..b428ca6ceb 100644 --- a/substrate/substrate/network-libp2p/src/lib.rs +++ b/substrate/substrate/network-libp2p/src/lib.rs @@ -19,6 +19,7 @@ extern crate parking_lot; extern crate fnv; +#[macro_use] extern crate futures; extern crate tokio_core; extern crate tokio_io; diff --git a/substrate/substrate/network-libp2p/src/service.rs b/substrate/substrate/network-libp2p/src/service.rs index 435958756e..50dea88aa3 100644 --- a/substrate/substrate/network-libp2p/src/service.rs +++ b/substrate/substrate/network-libp2p/src/service.rs @@ -86,7 +86,7 @@ struct Shared { /// `NetworkProtocolHandler`. This can be closed if the background thread /// is not running. The sender will be overwritten every time we start /// the service. - timeouts_register_tx: RwLock, ProtocolId, TimerToken))>>, + timeouts_register_tx: RwLock, ProtocolId, TimerToken))>>, /// Original address from the configuration, after being adjusted by the `Transport`. /// Contains `None` if the network hasn't started yet. @@ -360,9 +360,8 @@ impl NetworkContext for NetworkContextImpl { .ok_or(ErrorKind::BadProtocol)? .custom_data() .clone(); - let at = Instant::now() + duration; self.inner.timeouts_register_tx.read() - .unbounded_send((at, (handler, self.protocol, token))) + .unbounded_send((duration, (handler, self.protocol, token))) .map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?; Ok(()) } @@ -393,7 +392,7 @@ impl NetworkContext for NetworkContextImpl { fn init_thread( core: Handle, shared: Arc, - timeouts_register_rx: mpsc::UnboundedReceiver<(Instant, (Arc, ProtocolId, TimerToken))>, + timeouts_register_rx: mpsc::UnboundedReceiver<(Duration, (Arc, ProtocolId, TimerToken))>, close_rx: oneshot::Receiver<()> ) -> Result, Error> { // Build the transport layer. diff --git a/substrate/substrate/network-libp2p/src/timeouts.rs b/substrate/substrate/network-libp2p/src/timeouts.rs index c0c7d30e7a..12e16f34d8 100644 --- a/substrate/substrate/network-libp2p/src/timeouts.rs +++ b/substrate/substrate/network-libp2p/src/timeouts.rs @@ -14,9 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -use futures::{future, Future, stream, Stream, sync::mpsc}; +use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc}; use std::io::Error as IoError; -use std::time::Instant; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; use tokio_core::reactor::{Handle, Timeout}; /// Builds the timeouts system. @@ -24,11 +25,13 @@ use tokio_core::reactor::{Handle, Timeout}; /// The `timeouts_rx` should be a stream receiving newly-created timeout /// requests. Returns a stream that produces items as their timeout elapses. /// `T` can be anything you want, as it is transparently passed from the input -/// to the output. +/// to the output. Timeouts continue to fire forever, as there is no way to +/// unregister them. pub fn build_timeouts_stream( core: Handle, - timeouts_rx: mpsc::UnboundedReceiver<(Instant, T)> -) -> impl Stream { + timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)> +) -> impl Stream + where T: Clone { let next_timeout = next_in_timeouts_stream(timeouts_rx); // The `unfold` function is essentially a loop turned into a stream. The @@ -47,11 +50,12 @@ pub fn build_timeouts_stream( Some(future::select_ok(timeouts.into_iter()) .and_then(move |(item, mut timeouts)| match item { - Out::NewTimeout((Some((at, item)), next_timeouts)) => { + Out::NewTimeout((Some((duration, item)), next_timeouts)) => { // Received a new timeout request on the channel. let next_timeout = next_in_timeouts_stream(next_timeouts); - let timeout = Timeout::new_at(at, &core)? - .map(move |()| Out::Timeout(item)); + let at = Instant::now() + duration; + let timeout = Timeout::new_at(at, &core)?; + let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData); timeouts.push(future::Either::B(timeout)); timeouts.push(future::Either::A(next_timeout)); Ok((None, timeouts)) @@ -59,9 +63,15 @@ pub fn build_timeouts_stream( Out::NewTimeout((None, _)) => // The channel has been closed. Ok((None, timeouts)), - Out::Timeout(item) => + Out::Timeout(duration, item) => { // A timeout has happened. - Ok((Some(item), timeouts)), + let returned = item.clone(); + let at = Instant::now() + duration; + let timeout = Timeout::new_at(at, &core)?; + let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData); + timeouts.push(future::Either::B(timeout)); + Ok((Some(returned), timeouts)) + }, } ) ) @@ -71,17 +81,32 @@ pub fn build_timeouts_stream( /// Local enum representing the output of the selection. enum Out { NewTimeout(A), - Timeout(B), + Timeout(Duration, B), } /// Convenience function that calls `.into_future()` on the timeouts stream, /// and applies some modifiers. /// This function is necessary. Otherwise if we copy-paste its content we run /// into errors because the type of the copy-pasted closures differs. -fn next_in_timeouts_stream(stream: mpsc::UnboundedReceiver) - -> impl Future, mpsc::UnboundedReceiver), B>, Error = IoError> { +fn next_in_timeouts_stream( + stream: mpsc::UnboundedReceiver +) -> impl Future, mpsc::UnboundedReceiver), B>, Error = IoError> { stream .into_future() .map(Out::NewTimeout) .map_err(|_| unreachable!("an UnboundedReceiver can never error")) } + +/// Does the equivalent to `future.map(move |()| (duration, item))`. +struct TimeoutWrapper(F, Duration, Option, PhantomData); +impl Future for TimeoutWrapper + where F: Future { + type Item = Out; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let _ready: () = try_ready!(self.0.poll()); + let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success")); + Ok(Async::Ready(out)) + } +}