mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 01:21:01 +00:00
Timeouts now repeat forever (#372)
This commit is contained in:
committed by
Arkadiy Paronyan
parent
927cb59aaf
commit
3dfd53cc86
@@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
extern crate parking_lot;
|
extern crate parking_lot;
|
||||||
extern crate fnv;
|
extern crate fnv;
|
||||||
|
#[macro_use]
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
extern crate tokio_io;
|
extern crate tokio_io;
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ struct Shared {
|
|||||||
/// `NetworkProtocolHandler`. This can be closed if the background thread
|
/// `NetworkProtocolHandler`. This can be closed if the background thread
|
||||||
/// is not running. The sender will be overwritten every time we start
|
/// is not running. The sender will be overwritten every time we start
|
||||||
/// the service.
|
/// the service.
|
||||||
timeouts_register_tx: RwLock<mpsc::UnboundedSender<(Instant, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>>,
|
timeouts_register_tx: RwLock<mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>>,
|
||||||
|
|
||||||
/// Original address from the configuration, after being adjusted by the `Transport`.
|
/// Original address from the configuration, after being adjusted by the `Transport`.
|
||||||
/// Contains `None` if the network hasn't started yet.
|
/// Contains `None` if the network hasn't started yet.
|
||||||
@@ -360,9 +360,8 @@ impl NetworkContext for NetworkContextImpl {
|
|||||||
.ok_or(ErrorKind::BadProtocol)?
|
.ok_or(ErrorKind::BadProtocol)?
|
||||||
.custom_data()
|
.custom_data()
|
||||||
.clone();
|
.clone();
|
||||||
let at = Instant::now() + duration;
|
|
||||||
self.inner.timeouts_register_tx.read()
|
self.inner.timeouts_register_tx.read()
|
||||||
.unbounded_send((at, (handler, self.protocol, token)))
|
.unbounded_send((duration, (handler, self.protocol, token)))
|
||||||
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
|
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -393,7 +392,7 @@ impl NetworkContext for NetworkContextImpl {
|
|||||||
fn init_thread(
|
fn init_thread(
|
||||||
core: Handle,
|
core: Handle,
|
||||||
shared: Arc<Shared>,
|
shared: Arc<Shared>,
|
||||||
timeouts_register_rx: mpsc::UnboundedReceiver<(Instant, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))>,
|
timeouts_register_rx: mpsc::UnboundedReceiver<(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))>,
|
||||||
close_rx: oneshot::Receiver<()>
|
close_rx: oneshot::Receiver<()>
|
||||||
) -> Result<impl Future<Item = (), Error = IoError>, Error> {
|
) -> Result<impl Future<Item = (), Error = IoError>, Error> {
|
||||||
// Build the transport layer.
|
// Build the transport layer.
|
||||||
|
|||||||
@@ -14,9 +14,10 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||||
|
|
||||||
use futures::{future, Future, stream, Stream, sync::mpsc};
|
use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use std::time::Instant;
|
use std::marker::PhantomData;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
use tokio_core::reactor::{Handle, Timeout};
|
use tokio_core::reactor::{Handle, Timeout};
|
||||||
|
|
||||||
/// Builds the timeouts system.
|
/// Builds the timeouts system.
|
||||||
@@ -24,11 +25,13 @@ use tokio_core::reactor::{Handle, Timeout};
|
|||||||
/// The `timeouts_rx` should be a stream receiving newly-created timeout
|
/// The `timeouts_rx` should be a stream receiving newly-created timeout
|
||||||
/// requests. Returns a stream that produces items as their timeout elapses.
|
/// requests. Returns a stream that produces items as their timeout elapses.
|
||||||
/// `T` can be anything you want, as it is transparently passed from the input
|
/// `T` can be anything you want, as it is transparently passed from the input
|
||||||
/// to the output.
|
/// to the output. Timeouts continue to fire forever, as there is no way to
|
||||||
|
/// unregister them.
|
||||||
pub fn build_timeouts_stream<T>(
|
pub fn build_timeouts_stream<T>(
|
||||||
core: Handle,
|
core: Handle,
|
||||||
timeouts_rx: mpsc::UnboundedReceiver<(Instant, T)>
|
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
|
||||||
) -> impl Stream<Item = T, Error = IoError> {
|
) -> impl Stream<Item = T, Error = IoError>
|
||||||
|
where T: Clone {
|
||||||
let next_timeout = next_in_timeouts_stream(timeouts_rx);
|
let next_timeout = next_in_timeouts_stream(timeouts_rx);
|
||||||
|
|
||||||
// The `unfold` function is essentially a loop turned into a stream. The
|
// The `unfold` function is essentially a loop turned into a stream. The
|
||||||
@@ -47,11 +50,12 @@ pub fn build_timeouts_stream<T>(
|
|||||||
Some(future::select_ok(timeouts.into_iter())
|
Some(future::select_ok(timeouts.into_iter())
|
||||||
.and_then(move |(item, mut timeouts)|
|
.and_then(move |(item, mut timeouts)|
|
||||||
match item {
|
match item {
|
||||||
Out::NewTimeout((Some((at, item)), next_timeouts)) => {
|
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
|
||||||
// Received a new timeout request on the channel.
|
// Received a new timeout request on the channel.
|
||||||
let next_timeout = next_in_timeouts_stream(next_timeouts);
|
let next_timeout = next_in_timeouts_stream(next_timeouts);
|
||||||
let timeout = Timeout::new_at(at, &core)?
|
let at = Instant::now() + duration;
|
||||||
.map(move |()| Out::Timeout(item));
|
let timeout = Timeout::new_at(at, &core)?;
|
||||||
|
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
|
||||||
timeouts.push(future::Either::B(timeout));
|
timeouts.push(future::Either::B(timeout));
|
||||||
timeouts.push(future::Either::A(next_timeout));
|
timeouts.push(future::Either::A(next_timeout));
|
||||||
Ok((None, timeouts))
|
Ok((None, timeouts))
|
||||||
@@ -59,9 +63,15 @@ pub fn build_timeouts_stream<T>(
|
|||||||
Out::NewTimeout((None, _)) =>
|
Out::NewTimeout((None, _)) =>
|
||||||
// The channel has been closed.
|
// The channel has been closed.
|
||||||
Ok((None, timeouts)),
|
Ok((None, timeouts)),
|
||||||
Out::Timeout(item) =>
|
Out::Timeout(duration, item) => {
|
||||||
// A timeout has happened.
|
// A timeout has happened.
|
||||||
Ok((Some(item), timeouts)),
|
let returned = item.clone();
|
||||||
|
let at = Instant::now() + duration;
|
||||||
|
let timeout = Timeout::new_at(at, &core)?;
|
||||||
|
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
|
||||||
|
timeouts.push(future::Either::B(timeout));
|
||||||
|
Ok((Some(returned), timeouts))
|
||||||
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -71,17 +81,32 @@ pub fn build_timeouts_stream<T>(
|
|||||||
/// Local enum representing the output of the selection.
|
/// Local enum representing the output of the selection.
|
||||||
enum Out<A, B> {
|
enum Out<A, B> {
|
||||||
NewTimeout(A),
|
NewTimeout(A),
|
||||||
Timeout(B),
|
Timeout(Duration, B),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convenience function that calls `.into_future()` on the timeouts stream,
|
/// Convenience function that calls `.into_future()` on the timeouts stream,
|
||||||
/// and applies some modifiers.
|
/// and applies some modifiers.
|
||||||
/// This function is necessary. Otherwise if we copy-paste its content we run
|
/// This function is necessary. Otherwise if we copy-paste its content we run
|
||||||
/// into errors because the type of the copy-pasted closures differs.
|
/// into errors because the type of the copy-pasted closures differs.
|
||||||
fn next_in_timeouts_stream<T, B>(stream: mpsc::UnboundedReceiver<T>)
|
fn next_in_timeouts_stream<T, B>(
|
||||||
-> impl Future<Item = Out<(Option<T>, mpsc::UnboundedReceiver<T>), B>, Error = IoError> {
|
stream: mpsc::UnboundedReceiver<T>
|
||||||
|
) -> impl Future<Item = Out<(Option<T>, mpsc::UnboundedReceiver<T>), B>, Error = IoError> {
|
||||||
stream
|
stream
|
||||||
.into_future()
|
.into_future()
|
||||||
.map(Out::NewTimeout)
|
.map(Out::NewTimeout)
|
||||||
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
|
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Does the equivalent to `future.map(move |()| (duration, item))`.
|
||||||
|
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
|
||||||
|
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
|
||||||
|
where F: Future<Item = ()> {
|
||||||
|
type Item = Out<A, T>;
|
||||||
|
type Error = F::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let _ready: () = try_ready!(self.0.poll());
|
||||||
|
let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
|
||||||
|
Ok(Async::Ready(out))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user