diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 01ec85fca7..294be06190 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; use super::{LOG_TARGET, Result}; -use futures::{StreamExt, select, FutureExt}; +use futures::{select, FutureExt}; use polkadot_primitives::v1::{ CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, @@ -691,21 +691,15 @@ pub(crate) async fn run( loop { select! { - res = state.connection_requests.next() => { - let (relay_parent, validator_id, peer_id) = match res { - Some(res) => res, - // Will never happen, but better to be safe. - None => return Ok(()), - }; - + res = state.connection_requests.next().fuse() => { let _timer = state.metrics.time_handle_connection_request(); handle_validator_connected( &mut ctx, &mut state, - peer_id, - validator_id, - relay_parent, + res.peer_id, + res.validator_id, + res.relay_parent, ).await; }, msg = ctx.recv().fuse() => match msg? { diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 1b59441fc0..9d8df343b2 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -700,14 +700,7 @@ impl PoVDistribution { // peer view update messages may be racy and we want connection notifications // first. futures::select_biased! { - v = state.connection_requests.next() => { - match v { - Some((_relay_parent, _validator_id, peer_id)) => { - handle_validator_connected(&mut state, peer_id); - } - None => break, - } - } + v = state.connection_requests.next().fuse() => handle_validator_connected(&mut state, v.peer_id), v = ctx.recv().fuse() => { match v? { FromOverseer::Signal(signal) => if handle_signal( @@ -743,10 +736,8 @@ impl PoVDistribution { } } } - }; + } } - - Ok(()) } } diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index 3a381f7a6f..9472d44d40 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -23,6 +23,7 @@ use futures::{ channel::mpsc, task::{Poll, self}, stream, + StreamExt, }; use streamunordered::{StreamUnordered, StreamYield}; @@ -113,33 +114,60 @@ async fn connect_to_authorities( connected_rx } -/// A struct that assists performing multiple concurrent connection requests. +/// Represents a discovered validator. /// -/// This allows concurrent connections to validator sets at different `relay_parents` -/// and multiplexes their results into a single `Stream`. -#[derive(Default)] -pub struct ConnectionRequests { - // added connection requests relay_parent -> StreamUnordered token - id_map: HashMap, - - // Connection requests themselves. - requests: StreamUnordered, +/// Result of [`ConnectionRequests::next`]. +#[derive(Debug, PartialEq)] +pub struct DiscoveredValidator { + /// The relay parent associated with the connection request that returned a result. + pub relay_parent: Hash, + /// The [`ValidatorId`] that was resolved. + pub validator_id: ValidatorId, + /// The [`PeerId`] associated to the validator id. + pub peer_id: PeerId, } -impl stream::FusedStream for ConnectionRequests { - fn is_terminated(&self) -> bool { - false +/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`]. +struct ConnectionRequestForRelayParent { + request: ConnectionRequest, + relay_parent: Hash, +} + +impl stream::Stream for ConnectionRequestForRelayParent { + type Item = DiscoveredValidator; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { + self.request + .poll_next_unpin(cx) + .map(|r| r.map(|(validator_id, peer_id)| DiscoveredValidator { + validator_id, + peer_id, + relay_parent: self.relay_parent, + })) } } +/// A struct that assists performing multiple concurrent connection requests. +/// +/// This allows concurrent connections to validator sets at different `relay_parents`. +/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests. +#[derive(Default)] +pub struct ConnectionRequests { + /// Connection requests relay_parent -> StreamUnordered token + id_map: HashMap, + + /// Connection requests themselves. + requests: StreamUnordered, +} + impl ConnectionRequests { /// Insert a new connection request. /// /// If a `ConnectionRequest` under a given `relay_parent` already exists it will - /// be revoked and substituted with a new one. + /// be revoked and substituted with the given one. pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) { self.remove(&relay_parent); - let token = self.requests.push(request); + let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request }); self.id_map.insert(relay_parent, token); } @@ -155,39 +183,23 @@ impl ConnectionRequests { pub fn contains_request(&self, relay_parent: &Hash) -> bool { self.id_map.contains_key(relay_parent) } -} -impl stream::Stream for ConnectionRequests { - /// (relay_parent, validator_id, peer_id). - type Item = (Hash, ValidatorId, PeerId); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - // If there are currently no requests going on, pend instead of - // polling `StreamUnordered` which would lead to it terminating - // and returning `Poll::Ready(None)`. - if self.requests.is_empty() { - return Poll::Pending; + /// Returns the next available connection request result. + /// + /// # Note + /// + /// When there are no active requests this will wait indefinitely, like an always pending future. + pub async fn next(&mut self) -> DiscoveredValidator { + loop { + match self.requests.next().await { + Some((StreamYield::Item(item), _)) => { + return item + }, + // Ignore finished requests, they are required to be removed. + Some((StreamYield::Finished(_), _)) => (), + None => futures::pending!(), + } } - - match Pin::new(&mut self.requests).poll_next(cx) { - Poll::Ready(Some((yielded, token))) => { - match yielded { - StreamYield::Item(item) => { - if let Some((relay_parent, _)) = self.id_map.iter() - .find(|(_, &val)| val == token) - { - return Poll::Ready(Some((*relay_parent, item.0, item.1))); - } - } - StreamYield::Finished(_) => { - // `ConnectionRequest` is fullfilled, but not revoked - } - } - }, - _ => {}, - } - - Poll::Pending } } @@ -231,14 +243,20 @@ mod tests { use polkadot_primitives::v1::ValidatorPair; use sp_core::{Pair, Public}; - use futures::{executor, poll, StreamExt, SinkExt}; + use futures::{executor, poll, SinkExt}; + + async fn check_next_is_pending(connection_requests: &mut ConnectionRequests) { + let next = connection_requests.next(); + futures::pin_mut!(next); + assert_eq!(poll!(next), Poll::Pending); + } #[test] fn adding_a_connection_request_works() { let mut connection_requests = ConnectionRequests::default(); executor::block_on(async move { - assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending); + check_next_is_pending(&mut connection_requests).await; let validator_1 = ValidatorPair::generate().0.public(); let validator_2 = ValidatorPair::generate().0.public(); @@ -267,16 +285,19 @@ mod tests { rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent_1, validator_1, peer_id_1)); - - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent_1, validator_2, peer_id_2)); - + let res = connection_requests.next().await; assert_eq!( - poll!(Pin::new(&mut connection_requests).next()), - Poll::Pending, + res, + DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 }, ); + + let res = connection_requests.next().await; + assert_eq!( + res, + DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 }, + ); + + check_next_is_pending(&mut connection_requests).await; }); } @@ -285,7 +306,7 @@ mod tests { let mut connection_requests = ConnectionRequests::default(); executor::block_on(async move { - assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending); + check_next_is_pending(&mut connection_requests).await; let validator_1 = ValidatorPair::generate().0.public(); let validator_2 = ValidatorPair::generate().0.public(); @@ -325,16 +346,19 @@ mod tests { rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent_1, validator_1, peer_id_1)); - - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent_2, validator_2, peer_id_2)); - + let res = connection_requests.next().await; assert_eq!( - poll!(Pin::new(&mut connection_requests).next()), - Poll::Pending, + res, + DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 }, ); + + let res = connection_requests.next().await; + assert_eq!( + res, + DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 }, + ); + + check_next_is_pending(&mut connection_requests).await; }); } @@ -343,7 +367,7 @@ mod tests { let mut connection_requests = ConnectionRequests::default(); executor::block_on(async move { - assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending); + check_next_is_pending(&mut connection_requests).await; let validator_1 = ValidatorPair::generate().0.public(); let validator_2 = ValidatorPair::generate().0.public(); @@ -380,8 +404,8 @@ mod tests { rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap(); - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent, validator_1, peer_id_1.clone())); + let res = connection_requests.next().await; + assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() }); connection_requests.put(relay_parent.clone(), connection_request_2); @@ -389,13 +413,10 @@ mod tests { rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - let res = Pin::new(&mut connection_requests).next().await.unwrap(); - assert_eq!(res, (relay_parent, validator_2, peer_id_2)); + let res = connection_requests.next().await; + assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 }); - assert_eq!( - poll!(Pin::new(&mut connection_requests).next()), - Poll::Pending, - ); + check_next_is_pending(&mut connection_requests).await; }); } }