End multiplexer stream once one of its inputs end. (#2380)

* End multiplexer stream once one of its inputs end.

Also add test, that we don't panic once a stream is exhausted.

* Don't fuse already fused stream.
This commit is contained in:
Robert Klotzner
2021-02-04 12:31:36 +01:00
committed by GitHub
parent 62d18c708c
commit cfcde3d3b0
2 changed files with 49 additions and 7 deletions
+1 -1
View File
@@ -169,7 +169,7 @@ where
let action = { let action = {
let subsystem_next = ctx.recv().fuse(); let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse(); let mut net_event_next = event_stream.next().fuse();
let mut req_res_event_next = bridge.request_multiplexer.next().fuse(); let mut req_res_event_next = bridge.request_multiplexer.next();
futures::pin_mut!(subsystem_next); futures::pin_mut!(subsystem_next);
futures::select! { futures::select! {
@@ -17,7 +17,7 @@
use std::pin::Pin; use std::pin::Pin;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::stream::Stream; use futures::stream::{FusedStream, Stream};
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
@@ -35,12 +35,15 @@ use polkadot_subsystem::messages::AllMessages;
/// ///
/// This multiplexer consumes all request streams and makes them a `Stream` of a single message /// This multiplexer consumes all request streams and makes them a `Stream` of a single message
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems. /// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
///
/// The resulting stream will end once any of its input ends.
pub struct RequestMultiplexer { pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>, receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
next_poll: usize, next_poll: usize,
} }
/// Multiplexing can fail in case of invalid messages. /// Multiplexing can fail in case of invalid messages.
#[derive(Debug, PartialEq, Eq)]
pub struct RequestMultiplexError { pub struct RequestMultiplexError {
/// The peer that sent the invalid message. /// The peer that sent the invalid message.
pub peer: PeerId, pub peer: PeerId,
@@ -85,15 +88,17 @@ impl Stream for RequestMultiplexer {
// % safe, because count initialized to len, loop would not be entered if 0, also // % safe, because count initialized to len, loop would not be entered if 0, also
// length of receivers is fixed. // length of receivers is fixed.
let (p, rx): &mut (_, _) = &mut self.receivers[i % len]; let (p, rx): &mut (_, _) = &mut self.receivers[i % len];
// Avoid panic:
if rx.is_terminated() {
// Early return, we don't want to update next_poll.
return Poll::Ready(None);
}
i += 1; i += 1;
count -= 1; count -= 1;
match Pin::new(rx).poll_next(cx) { match Pin::new(rx).poll_next(cx) {
// If at least one stream is pending, then we are not done yet (No
// Ready(None)).
Poll::Pending => result = Poll::Pending, Poll::Pending => result = Poll::Pending,
// Receiver is a fused stream, which allows for this simple handling of // We are done, once a single receiver is done.
// exhausted ones. Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(None) => {}
Poll::Ready(Some(v)) => { Poll::Ready(Some(v)) => {
result = Poll::Ready(Some(multiplex_single(*p, v))); result = Poll::Ready(Some(multiplex_single(*p, v)));
break; break;
@@ -105,6 +110,17 @@ impl Stream for RequestMultiplexer {
} }
} }
impl FusedStream for RequestMultiplexer {
fn is_terminated(&self) -> bool {
let len = self.receivers.len();
if len == 0 {
return true;
}
let (_, rx) = &self.receivers[self.next_poll % len];
rx.is_terminated()
}
}
/// Convert a single raw incoming request into a `MultiplexMessage`. /// Convert a single raw incoming request into a `MultiplexMessage`.
fn multiplex_single( fn multiplex_single(
p: Protocol, p: Protocol,
@@ -130,3 +146,29 @@ fn decode_with_peer<Req: Decode>(
) -> Result<Req, RequestMultiplexError> { ) -> Result<Req, RequestMultiplexError> {
Req::decode(&mut payload.as_ref()).map_err(|error| RequestMultiplexError { peer, error }) Req::decode(&mut payload.as_ref()).map_err(|error| RequestMultiplexError { peer, error })
} }
#[cfg(test)]
mod tests {
use futures::prelude::*;
use futures::stream::FusedStream;
use super::RequestMultiplexer;
#[test]
fn check_exhaustion_safety() {
// Create and end streams:
fn drop_configs() -> RequestMultiplexer {
let (multiplexer, _) = RequestMultiplexer::new();
multiplexer
}
let multiplexer = drop_configs();
futures::executor::block_on(async move {
let mut f = multiplexer;
assert!(f.next().await.is_none());
assert!(f.is_terminated());
assert!(f.next().await.is_none());
assert!(f.is_terminated());
assert!(f.next().await.is_none());
assert!(f.is_terminated());
});
}
}