diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index a7d856fefb..5b0038cd75 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -16,12 +16,12 @@ //! PoV requester takes care of requesting PoVs from validators of a backing group. -use futures::{FutureExt, channel::{mpsc, oneshot}, future::BoxFuture}; +use futures::{FutureExt, channel::oneshot, future::BoxFuture}; use lru::LruCache; use polkadot_subsystem::jaeger; use polkadot_node_network_protocol::{ - PeerId, peer_set::PeerSet, + peer_set::PeerSet, request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests}, v1::{PoVFetchingRequest, PoVFetchingResponse}} }; @@ -46,7 +46,7 @@ pub struct PoVRequester { /// /// So we keep an LRU for managing connection requests of size 2. /// Cache will contain `None` if we are not a validator in that session. - connected_validators: LruCache>>, + connected_validators: LruCache>>, } impl PoVRequester { @@ -78,8 +78,8 @@ impl PoVRequester { if self.connected_validators.contains(&session_index) { continue } - let rx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?; - self.connected_validators.put(session_index, rx); + let tx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?; + self.connected_validators.put(session_index, tx); } Ok(()) } @@ -190,17 +190,16 @@ async fn connect_to_relevant_validators( parent: Hash, session: SessionIndex ) - -> super::Result>> + -> super::Result>> where Context: SubsystemContext, { if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? { - // We don't actually care about `PeerId`s, just keeping receiver so we stay connected: - let (tx, rx) = mpsc::channel(0); + let (tx, keep_alive) = oneshot::channel(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, peer_set: PeerSet::Validation, connected: tx + validator_ids, peer_set: PeerSet::Validation, keep_alive })).await; - Ok(Some(rx)) + Ok(Some(tx)) } else { Ok(None) } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 617277dff0..6f9cf67766 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -23,7 +23,6 @@ use parity_scale_codec::{Encode, Decode}; use parking_lot::Mutex; use futures::prelude::*; -use futures::channel::mpsc; use sc_network::Event as NetworkEvent; use sp_consensus::SyncOracle; @@ -36,7 +35,7 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, }; -use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId}; +use polkadot_primitives::v1::{Hash, BlockNumber}; use polkadot_node_network_protocol::{ PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep, ObservedRole, @@ -314,12 +313,6 @@ impl From for UnexpectedAbort { } } -// notifications to be passed through to the validator discovery worker. -enum ValidatorDiscoveryNotification { - PeerConnected(PeerId, PeerSet, Option), - PeerDisconnected(PeerId, PeerSet), -} - #[derive(Default, Clone)] struct Shared(Arc>); @@ -339,7 +332,6 @@ async fn handle_subsystem_messages( mut ctx: Context, mut network_service: N, mut authority_discovery_service: AD, - validator_discovery_notifications: mpsc::Receiver, shared: Shared, sync_oracle: Box, metrics: Metrics, @@ -356,8 +348,6 @@ where let mut mode = Mode::Syncing(sync_oracle); - let mut validator_discovery_notifications = validator_discovery_notifications.fuse(); - loop { futures::select! { msg = ctx.recv().fuse() => match msg { @@ -514,7 +504,7 @@ where NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, - connected, + keep_alive, } => { tracing::trace!( target: LOG_TARGET, @@ -527,7 +517,7 @@ where let (ns, ads) = validator_discovery.on_request( validator_ids, peer_set, - connected, + keep_alive, network_service, authority_discovery_service, ).await; @@ -538,19 +528,6 @@ where } Err(e) => return Err(e.into()), }, - notification = validator_discovery_notifications.next().fuse() => match notification { - None => return Ok(()), - Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => { - validator_discovery.on_peer_connected( - peer.clone(), - peer_set, - maybe_auth, - ).await; - } - Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => { - validator_discovery.on_peer_disconnected(&peer, peer_set); - } - }, } } } @@ -560,7 +537,6 @@ async fn handle_network_messages( mut network_service: impl Network, mut authority_discovery_service: AD, mut request_multiplexer: RequestMultiplexer, - mut validator_discovery_notifications: mpsc::Sender, metrics: Metrics, shared: Shared, ) -> Result<(), UnexpectedAbort> { @@ -612,13 +588,6 @@ async fn handle_network_messages( authority_discovery_service .get_authority_id_by_peer_id(peer).await; - // Failure here means that the other side of the network bridge - // has concluded and this future will be dropped in due course. - let _ = validator_discovery_notifications.send( - ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone()) - ).await; - - match peer_set { PeerSet::Validation => { dispatch_validation_events_to_all( @@ -694,12 +663,6 @@ async fn handle_network_messages( w }; - // Failure here means that the other side of the network bridge - // has concluded and this future will be dropped in due course. - let _ = validator_discovery_notifications.send( - ValidatorDiscoveryNotification::PeerDisconnected(peer.clone(), peer_set) - ).await; - if was_connected { match peer_set { PeerSet::Validation => dispatch_validation_event_to_all( @@ -858,14 +821,11 @@ where .get_statement_fetching() .expect("Gets initialized, must be `Some` on startup. qed."); - let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024); - let (remote, network_event_handler) = handle_network_messages( ctx.sender().clone(), network_service.clone(), authority_discovery_service.clone(), request_multiplexer, - validation_worker_tx, metrics.clone(), shared.clone(), ).remote_handle(); @@ -880,7 +840,6 @@ where ctx, network_service, authority_discovery_service, - validation_worker_rx, shared, sync_oracle, metrics, diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 4d43d66a23..74796757c3 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -22,9 +22,9 @@ use core::marker::PhantomData; use std::collections::{HashSet, HashMap, hash_map}; use async_trait::async_trait; -use futures::channel::mpsc; +use futures::channel::oneshot; -use sc_network::{config::parse_addr, multiaddr::Multiaddr}; +use sc_network::multiaddr::Multiaddr; use sc_authority_discovery::Service as AuthorityDiscoveryService; use polkadot_node_network_protocol::PeerId; use polkadot_primitives::v1::AuthorityDiscoveryId; @@ -55,39 +55,24 @@ impl AuthorityDiscovery for AuthorityDiscoveryService { /// This struct tracks the state for one `ConnectToValidators` request. struct NonRevokedConnectionRequestState { requested: Vec, - pending: HashSet, - sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + keep_alive: oneshot::Receiver<()>, } impl NonRevokedConnectionRequestState { /// Create a new instance of `ConnectToValidatorsState`. pub fn new( requested: Vec, - pending: HashSet, - sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + keep_alive: oneshot::Receiver<()>, ) -> Self { Self { requested, - pending, - sender, - } - } - - pub fn on_authority_connected( - &mut self, - authority: &AuthorityDiscoveryId, - peer_id: &PeerId, - ) { - if self.pending.remove(authority) { - // an error may happen if the request was revoked or - // the channel's buffer is full, ignoring it is fine - let _ = self.sender.try_send((authority.clone(), peer_id.clone())); + keep_alive, } } /// Returns `true` if the request is revoked. pub fn is_revoked(&mut self) -> bool { - self.sender.is_closed() + self.keep_alive.try_recv().is_err() } pub fn requested(&self) -> &[AuthorityDiscoveryId] { @@ -120,8 +105,6 @@ pub(super) struct Service { #[derive(Default)] struct StatePerPeerSet { - // Peers that are connected to us and authority ids associated to them. - connected_peers: HashMap>, // The `u64` counts the number of pending non-revoked requests for this validator // note: the validators in this map are not necessarily present // in the `connected_validators` map. @@ -138,97 +121,27 @@ impl Service { } } - /// Find connected validators using the given `validator_ids`. - /// - /// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s. - #[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))] - async fn find_connected_validators( - &mut self, - validator_ids: &[AuthorityDiscoveryId], - peer_set: PeerSet, - authority_discovery_service: &mut AD, - ) -> HashMap { - let mut result = HashMap::new(); - let state = &mut self.state[peer_set]; - - for id in validator_ids { - // First check if we already cached the validator - if let Some(pid) = state.connected_peers - .iter() - .find_map(|(pid, ids)| { - if ids.contains(&id) { - Some(pid) - } else { - None - } - }) - { - result.insert(id.clone(), pid.clone()); - continue; - } - - // If not ask the authority discovery - if let Some(addresses) = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await { - for (peer_id, _) in addresses.into_iter().filter_map(|a| parse_addr(a).ok()) { - if let Some(ids) = state.connected_peers.get_mut(&peer_id) { - ids.insert(id.clone()); - result.insert(id.clone(), peer_id); - } - } - } - } - - result - } - - /// On a new connection request, a priority group update will be issued. + /// On a new connection request, a peer set update will be issued. /// It will ask the network to connect to the validators and not disconnect /// from them at least until all the pending requests containing them are revoked. /// /// This method will also clean up all previously revoked requests. /// it takes `network_service` and `authority_discovery_service` by value /// and returns them as a workaround for the Future: Send requirement imposed by async fn impl. - #[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))] pub async fn on_request( &mut self, validator_ids: Vec, peer_set: PeerSet, - mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + keep_alive: oneshot::Receiver<()>, mut network_service: N, mut authority_discovery_service: AD, ) -> (N, AD) { const MAX_ADDR_PER_PEER: usize = 3; - let already_connected = self.find_connected_validators( - &validator_ids, - peer_set, - &mut authority_discovery_service, - ).await; - let state = &mut self.state[peer_set]; // Increment the counter of how many times the validators were requested. validator_ids.iter().for_each(|id| *state.requested_validators.entry(id.clone()).or_default() += 1); - // try to send already connected peers - for (id, peer) in already_connected.iter() { - match connected.try_send((id.clone(), peer.clone())) { - Err(e) if e.is_disconnected() => { - // the request is already revoked - for peer_id in validator_ids { - let _ = on_revoke(&mut state.requested_validators, peer_id); - } - return (network_service, authority_discovery_service); - } - Err(_) => { - // the channel's buffer is full - // ignore the error, the receiver will miss out some peers - // but that's fine - break; - } - Ok(()) => continue, - } - } - // collect multiaddress of validators let mut multiaddr_to_add = HashSet::new(); for authority in validator_ids.iter() { @@ -292,45 +205,13 @@ impl Service { multiaddr_to_remove.clone() ).await; - let pending = validator_ids.iter() - .cloned() - .filter(|id| !already_connected.contains_key(id)) - .collect::>(); - state.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new( validator_ids, - pending, - connected, + keep_alive, )); (network_service, authority_discovery_service) } - - /// Should be called when a peer connected. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn on_peer_connected( - &mut self, - peer_id: PeerId, - peer_set: PeerSet, - maybe_authority: Option, - ) { - let state = &mut self.state[peer_set]; - // check if it's an authority we've been waiting for - if let Some(authority) = maybe_authority { - for request in state.non_revoked_discovery_requests.iter_mut() { - let _ = request.on_authority_connected(&authority, &peer_id); - } - - state.connected_peers.entry(peer_id).or_default().insert(authority); - } else { - state.connected_peers.insert(peer_id, Default::default()); - } - } - - /// Should be called when a peer disconnected. - pub fn on_peer_disconnected(&mut self, peer_id: &PeerId, peer_set: PeerSet) { - self.state[peer_set].connected_peers.remove(peer_id); - } } #[cfg(test)] @@ -339,8 +220,7 @@ mod tests { use crate::network::{Network, NetworkAction}; use std::{borrow::Cow, pin::Pin}; - use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}}; - use sc_network::multiaddr::Protocol; + use futures::{sink::Sink, stream::BoxStream}; use sc_network::{Event as NetworkEvent, IfDisconnected}; use sp_keyring::Sr25519Keyring; use polkadot_node_network_protocol::request_response::request::Requests; @@ -440,132 +320,53 @@ mod tests { #[test] fn request_is_revoked_when_the_receiver_is_dropped() { - let (sender, receiver) = mpsc::channel(0); + let (keep_alive_handle, keep_alive) = oneshot::channel(); let mut request = NonRevokedConnectionRequestState::new( Vec::new(), - HashSet::new(), - sender, + keep_alive, ); assert!(!request.is_revoked()); - drop(receiver); + drop(keep_alive_handle); assert!(request.is_revoked()); } - #[test] - fn requests_are_fulfilled_immediately_for_already_connected_peers() { - let mut service = new_service(); - - let (ns, mut ads) = new_network(); - - let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); - let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); - - futures::executor::block_on(async move { - let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; - let (sender, mut receiver) = mpsc::channel(2); - - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; - - let _ = service.on_request( - req1, - PeerSet::Validation, - sender, - ns, - ads, - ).await; - - - // the results should be immediately available - let reply1 = receiver.next().await.unwrap(); - assert_eq!(reply1.0, authority_ids[0]); - assert_eq!(reply1.1, peer_ids[0]); - }); - } - - #[test] - fn requests_are_fulfilled_on_peer_connection() { - let mut service = new_service(); - - let (ns, ads) = new_network(); - - let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); - let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); - - futures::executor::block_on(async move { - let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; - let (sender, mut receiver) = mpsc::channel(2); - - let (_, mut ads) = service.on_request( - req1, - PeerSet::Validation, - sender, - ns, - ads, - ).await; - - - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; - let reply1 = receiver.next().await.unwrap(); - assert_eq!(reply1.0, authority_ids[0]); - assert_eq!(reply1.1, peer_ids[0]); - - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; - let reply2 = receiver.next().await.unwrap(); - assert_eq!(reply2.0, authority_ids[1]); - assert_eq!(reply2.1, peer_ids[1]); - }); - } - // Test cleanup works. #[test] fn requests_are_removed_on_revoke() { let mut service = new_service(); - let (ns, mut ads) = new_network(); + let (ns, ads) = new_network(); - let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); futures::executor::block_on(async move { - let (sender, mut receiver) = mpsc::channel(1); - - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; + let (keep_alive_handle, keep_alive) = oneshot::channel(); let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], PeerSet::Validation, - sender, + keep_alive, ns, ads, ).await; - let _ = receiver.next().await.unwrap(); // revoke the request - drop(receiver); + drop(keep_alive_handle); - let (sender, mut receiver) = mpsc::channel(1); + let (_keep_alive_handle, keep_alive) = oneshot::channel(); let _ = service.on_request( vec![authority_ids[1].clone()], PeerSet::Validation, - sender, + keep_alive, ns, ads, ).await; - let reply = receiver.next().await.unwrap(); - assert_eq!(reply.0, authority_ids[1]); - assert_eq!(reply.1, peer_ids[1]); let state = &service.state[PeerSet::Validation]; assert_eq!(state.non_revoked_discovery_requests.len(), 1); }); @@ -576,104 +377,54 @@ mod tests { fn revoking_requests_with_overlapping_validator_sets() { let mut service = new_service(); - let (ns, mut ads) = new_network(); + let (ns, ads) = new_network(); - let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); futures::executor::block_on(async move { - let (sender, mut receiver) = mpsc::channel(1); - - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; - let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; + let (keep_alive_handle, keep_alive) = oneshot::channel(); let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[2].clone()], PeerSet::Validation, - sender, + keep_alive, ns, ads, ).await; - let _ = receiver.next().await.unwrap(); // revoke the first request - drop(receiver); + drop(keep_alive_handle); - let (sender, mut receiver) = mpsc::channel(1); + let (keep_alive_handle, keep_alive) = oneshot::channel(); let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[1].clone()], PeerSet::Validation, - sender, + keep_alive, ns, ads, ).await; - let _ = receiver.next().await.unwrap(); let state = &service.state[PeerSet::Validation]; assert_eq!(state.non_revoked_discovery_requests.len(), 1); assert_eq!(ns.peers_set.len(), 2); // revoke the second request - drop(receiver); + drop(keep_alive_handle); - let (sender, mut receiver) = mpsc::channel(1); + let (_keep_alive_handle, keep_alive) = oneshot::channel(); let (ns, _) = service.on_request( vec![authority_ids[0].clone()], PeerSet::Validation, - sender, + keep_alive, ns, ads, ).await; - let _ = receiver.next().await.unwrap(); let state = &service.state[PeerSet::Validation]; assert_eq!(state.non_revoked_discovery_requests.len(), 1); assert_eq!(ns.peers_set.len(), 1); }); } - - /// A test for when a validator connects, but the authority discovery not yet knows that the connecting node - /// is a validator. This can happen for example at startup of a node. - #[test] - fn handle_validator_connect_without_authority_discovery_knowing_it() { - let mut service = new_service(); - - let ns = TestNetwork::default(); - let mut ads = TestAuthorityDiscovery::default(); - - let validator_peer_id = PeerId::random(); - let validator_id: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into(); - - futures::executor::block_on(async move { - let (sender, mut receiver) = mpsc::channel(1); - - let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await; - service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await; - - let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into())); - ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone()); - ads.by_authority_id.insert(validator_id.clone(), address); - - let _ = service.on_request( - vec![validator_id.clone()], - PeerSet::Validation, - sender, - ns, - ads, - ).await; - - assert_eq!((validator_id.clone(), validator_peer_id.clone()), receiver.next().await.unwrap()); - let state = &service.state[PeerSet::Validation]; - assert!( - state.connected_peers - .get(&validator_peer_id) - .unwrap() - .contains(&validator_id) - ); - }); - } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index fe6279eaf3..7a3a22dff8 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; -use futures::{FutureExt, channel::oneshot, channel::mpsc}; +use futures::{FutureExt, channel::oneshot}; use sp_core::Pair; use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId}; @@ -218,7 +218,7 @@ struct State { peer_ids: HashMap, /// The connection handles to validators per group we are interested in. - connection_handles: HashMap>, + connection_handles: HashMap>, /// Metrics. metrics: Metrics, @@ -467,13 +467,13 @@ async fn connect_to_validators( state: &mut State, group: GroupValidators, ) { - let (tx, rx) = mpsc::channel(0); + let (keep_alive_handle, keep_alive) = oneshot::channel(); // Reconnect in all cases, as authority discovery cache might not have been fully populated // last time: ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids: group.validators, peer_set: PeerSet::Collation, connected: tx + validator_ids: group.validators, peer_set: PeerSet::Collation, keep_alive })).await; - state.connection_handles.insert(group.group, rx); + state.connection_handles.insert(group.group, keep_alive_handle); } /// Advertise collation to the given `peer`. @@ -917,7 +917,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use futures::{channel::mpsc, executor, future, Future}; + use futures::{executor, future, Future}; use sp_core::{crypto::Pair, Decode}; use sp_keyring::Sr25519Keyring; @@ -1024,7 +1024,7 @@ mod tests { validators, session_info: SessionInfo { validators: validator_public, - discovery_keys, + discovery_keys, validator_groups, ..Default::default() }, @@ -1188,8 +1188,6 @@ mod tests { /// Result of [`distribute_collation`] struct DistributeCollation { - /// Should be used to inform the subsystem about connected validators. - connected: Vec>, candidate: CandidateReceipt, pov_block: PoV, } @@ -1270,32 +1268,26 @@ mod tests { } } - let connected = if should_connect { - let connected_current = assert_matches!( + if should_connect { + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::NetworkBridge( NetworkBridgeMessage::ConnectToValidators { - connected, .. } - ) => { connected } + ) => {} ); - let connected_next = assert_matches!( + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::NetworkBridge( NetworkBridgeMessage::ConnectToValidators { - connected, .. } - ) => { connected } + ) => {} ); - vec![connected_current, connected_next] - } else { - Vec::new() - }; + } DistributeCollation { - connected, candidate, pov_block, } @@ -1416,7 +1408,7 @@ mod tests { setup_system(&mut virtual_overseer, &test_state).await; - let DistributeCollation { connected: _connected, candidate, pov_block } = + let DistributeCollation { candidate, pov_block } = distribute_collation(&mut virtual_overseer, &test_state, true).await; for (val, peer) in test_state.current_group_validator_authority_ids() @@ -1500,8 +1492,7 @@ mod tests { assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); - let DistributeCollation { connected: _connected, .. } = - distribute_collation(&mut virtual_overseer, &test_state, true).await; + distribute_collation(&mut virtual_overseer, &test_state, true).await; // Send info about peer's view. overseer_send( @@ -1569,7 +1560,7 @@ mod tests { // And let it tell us that it is has the same view. send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected; + distribute_collation(&mut virtual_overseer, &test_state, true).await; expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await; @@ -1608,14 +1599,14 @@ mod tests { expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; - let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected; + distribute_collation(&mut virtual_overseer, &test_state, true).await; let old_relay_parent = test_state.relay_parent; // Advance to a new round, while informing the subsystem that the old and the new relay parent are active. test_state.advance_to_new_round(&mut virtual_overseer, true).await; - let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected; + distribute_collation(&mut virtual_overseer, &test_state, true).await; send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await; expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await; @@ -1645,7 +1636,7 @@ mod tests { connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id.clone())).await; expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; - let _ = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected; + distribute_collation(&mut virtual_overseer, &test_state, true).await; send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 3cb1759071..adfc6344e1 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -18,22 +18,19 @@ //! and issuing a connection request to the validators relevant to //! the gossiping subsystems on every new session. -use futures::{channel::mpsc, FutureExt as _}; +use futures::{channel::oneshot, FutureExt as _}; use polkadot_node_subsystem::{ messages::{ - GossipSupportMessage, + AllMessages, GossipSupportMessage, NetworkBridgeMessage, }, ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SpawnedSubsystem, SubsystemContext, }; -use polkadot_node_subsystem_util::{ - validator_discovery, - self as util, -}; +use polkadot_node_subsystem_util as util; use polkadot_primitives::v1::{ Hash, SessionIndex, AuthorityDiscoveryId, }; -use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; +use polkadot_node_network_protocol::peer_set::PeerSet; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use sp_application_crypto::{Public, AppKey}; @@ -48,7 +45,7 @@ pub struct GossipSupport { struct State { last_session_index: Option, /// when we overwrite this, it automatically drops the previous request - _last_connection_request: Option>, + _last_connection_request: Option>, } impl GossipSupport { @@ -123,6 +120,24 @@ async fn ensure_i_am_an_authority( Err(util::Error::NotAValidator) } +/// A helper function for making a `ConnectToValidators` request. +pub async fn connect_to_authorities( + ctx: &mut impl SubsystemContext, + validator_ids: Vec, + peer_set: PeerSet, +) -> oneshot::Sender<()> { + let (keep_alive_handle, keep_alive) = oneshot::channel(); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + keep_alive, + } + )).await; + + keep_alive_handle +} impl State { /// 1. Determine if the current session index has changed. @@ -147,14 +162,14 @@ impl State { ensure_i_am_an_authority(keystore, &authorities).await?; tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request"); - let request = validator_discovery::connect_to_authorities( + let keep_alive_handle = connect_to_authorities( ctx, authorities, PeerSet::Validation, ).await; self.last_session_index = Some(new_session); - self._last_connection_request = Some(request); + self._last_connection_request = Some(keep_alive_handle); } } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 287b530440..0a7f85196d 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -51,7 +51,6 @@ use std::{ use streamunordered::{StreamUnordered, StreamYield}; use thiserror::Error; -pub mod validator_discovery; pub use metered_channel as metered; pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS; diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs deleted file mode 100644 index 57a55a9f1c..0000000000 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ /dev/null @@ -1,524 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Utility function to make it easier to connect to validators. - -use std::collections::HashMap; -use std::pin::Pin; - -use futures::{ - channel::mpsc, - task::{Poll, self}, - stream, - StreamExt, -}; -use streamunordered::{StreamUnordered, StreamYield}; - -use polkadot_node_subsystem::{ - errors::RuntimeApiError, - messages::{AllMessages, NetworkBridgeMessage}, - SubsystemContext, -}; -use polkadot_primitives::v1::{ - Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex, Id as ParaId, -}; -use polkadot_node_network_protocol::peer_set::PeerSet; -use sc_network::PeerId; -use crate::Error; - -/// Utility function to make it easier to connect to validators. -pub async fn connect_to_validators( - ctx: &mut Context, - relay_parent: Hash, - validators: Vec, - peer_set: PeerSet, -) -> Result { - let current_index = crate::request_session_index_for_child(relay_parent, ctx.sender()).await.await??; - connect_to_validators_in_session( - ctx, - relay_parent, - validators, - peer_set, - current_index, - ).await -} - -/// Utility function to make it easier to connect to validators in the given session. -pub async fn connect_to_validators_in_session( - ctx: &mut Context, - relay_parent: Hash, - validators: Vec, - peer_set: PeerSet, - session_index: SessionIndex, -) -> Result { - let session_info = crate::request_session_info( - relay_parent, - session_index, - ctx.sender(), - ).await.await??; - - let (session_validators, discovery_keys) = match session_info { - Some(info) => (info.validators, info.discovery_keys), - None => return Err(RuntimeApiError::from( - format!("No SessionInfo found for the index {}", session_index) - ).into()), - }; - - tracing::trace!( - target: "parachain::validator-discovery", - validators = ?validators, - discovery_keys = ?discovery_keys, - session_index, - "Trying to serve the validator discovery request", - ); - - let id_to_index = session_validators.iter() - .zip(0usize..) - .collect::>(); - - // We assume the same ordering in authorities as in validators so we can do an index search - let maybe_authorities: Vec<_> = validators.iter() - .map(|id| { - let validator_index = id_to_index.get(&id); - validator_index.and_then(|i| discovery_keys.get(*i).cloned()) - }) - .collect(); - - let authorities: Vec<_> = maybe_authorities.iter() - .cloned() - .filter_map(|id| id) - .collect(); - - let validator_map = validators.into_iter() - .zip(maybe_authorities.into_iter()) - .filter_map(|(k, v)| v.map(|v| (v, k))) - .collect::>(); - - let connections = connect_to_authorities(ctx, authorities, peer_set).await; - - Ok(ConnectionRequest { - validator_map, - connections, - }) -} - -/// A helper function for making a `ConnectToValidators` request. -pub async fn connect_to_authorities( - ctx: &mut Context, - validator_ids: Vec, - peer_set: PeerSet, -) -> mpsc::Receiver<(AuthorityDiscoveryId, PeerId)> { - const PEERS_CAPACITY: usize = 32; - - let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - connected, - } - )).await; - - connected_rx -} - -/// Represents a discovered validator. -/// -/// Result of [`ConnectionRequests::next`]. -#[derive(Debug, PartialEq)] -pub struct DiscoveredValidator { - /// The relay parent associated with the connection request that returned a result. - pub relay_parent: Hash, - /// The para ID associated with the connection request that returned a result. - pub para_id: ParaId, - /// The [`ValidatorId`] that was resolved. - pub validator_id: ValidatorId, - /// The [`PeerId`] associated to the validator id. - pub peer_id: PeerId, -} - -/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`]. -struct ConnectionRequestForRelayParentAndParaId { - request: ConnectionRequest, - relay_parent: Hash, - para_id: ParaId, -} - -impl stream::Stream for ConnectionRequestForRelayParentAndParaId { - type Item = DiscoveredValidator; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - self.request - .poll_next_unpin(cx) - .map(|r| r.map(|(validator_id, peer_id)| DiscoveredValidator { - validator_id, - peer_id, - relay_parent: self.relay_parent, - para_id: self.para_id, - })) - } -} - -/// A struct that assists performing multiple concurrent connection requests. -/// -/// This allows concurrent connections to validator sets at different `(relay_parents, para_id)`. -/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests. -#[derive(Default)] -pub struct ConnectionRequests { - /// Connection requests relay_parent -> para_id -> StreamUnordered token - /// - /// Q: Why not (relay_parent, para_id) -> Stream? - /// A: So that we can remove from it by relay_parent only. - id_map: HashMap>, - - /// Connection requests themselves. - requests: StreamUnordered, -} - -impl ConnectionRequests { - /// Insert a new connection request. - /// - /// If a `ConnectionRequest` under a given `relay_parent` and `para_id` already exists, - /// it will be revoked and substituted with the given one. - pub fn put(&mut self, relay_parent: Hash, para_id: ParaId, request: ConnectionRequest) { - self.remove(&relay_parent, para_id); - let token = self.requests.push(ConnectionRequestForRelayParentAndParaId { - relay_parent, - para_id, - request, - }); - - self.id_map.entry(relay_parent).or_default().insert(para_id, token); - } - - /// Remove all connection requests by a given `relay_parent`. - pub fn remove_all(&mut self, relay_parent: &Hash) { - let map = self.id_map.remove(relay_parent); - for token in map.map(|m| m.into_iter().map(|(_, v)| v)).into_iter().flatten() { - Pin::new(&mut self.requests).remove(token); - } - } - - /// Remove a connection request by a given `relay_parent` and `para_id`. - pub fn remove(&mut self, relay_parent: &Hash, para_id: ParaId) { - if let Some(map) = self.id_map.get_mut(relay_parent) { - if let Some(token) = map.remove(¶_id) { - Pin::new(&mut self.requests).remove(token); - } - } - } - - /// Is a connection at this relay parent and para_id already present in the request - pub fn contains_request(&self, relay_parent: &Hash, para_id: ParaId) -> bool { - self.id_map.get(relay_parent).map_or(false, |map| map.contains_key(¶_id)) - } - - /// Returns the next available connection request result. - /// - /// # Note - /// - /// When there are no active requests this will wait indefinitely, like an always pending future. - pub async fn next(&mut self) -> DiscoveredValidator { - loop { - match self.requests.next().await { - Some((StreamYield::Item(item), _)) => { - return item - }, - // Ignore finished requests, they are required to be removed. - Some((StreamYield::Finished(_), _)) => (), - None => futures::pending!(), - } - } - } -} - -/// A pending connection request to validators. -/// This struct implements `Stream` to allow for asynchronous -/// discovery of validator addresses. -/// -/// NOTE: the request will be revoked on drop. -#[must_use = "dropping a request will result in its immediate revokation"] -pub struct ConnectionRequest { - validator_map: HashMap, - #[must_use = "streams do nothing unless polled"] - connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, -} - -impl stream::Stream for ConnectionRequest { - type Item = (ValidatorId, PeerId); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - if self.validator_map.is_empty() { - return Poll::Ready(None); - } - match Pin::new(&mut self.connections).poll_next(cx) { - Poll::Ready(Some((id, peer_id))) => { - if let Some(validator_id) = self.validator_map.remove(&id) { - return Poll::Ready(Some((validator_id, peer_id))); - } else { - // unknown authority_id - // should be unreachable - } - } - _ => {}, - } - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::*; - use polkadot_primitives::v1::ValidatorPair; - use sp_core::{Pair, Public}; - - use futures::{executor, poll, SinkExt}; - - async fn check_next_is_pending(connection_requests: &mut ConnectionRequests) { - let next = connection_requests.next(); - futures::pin_mut!(next); - assert_eq!(poll!(next), Poll::Pending); - } - - #[test] - fn adding_a_connection_request_works() { - let mut connection_requests = ConnectionRequests::default(); - - executor::block_on(async move { - check_next_is_pending(&mut connection_requests).await; - - let validator_1 = ValidatorPair::generate().0.public(); - let validator_2 = ValidatorPair::generate().0.public(); - - let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]); - let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]); - - let mut validator_map = HashMap::new(); - validator_map.insert(auth_1.clone(), validator_1.clone()); - validator_map.insert(auth_2.clone(), validator_2.clone()); - - let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - - let peer_id_1 = PeerId::random(); - let peer_id_2 = PeerId::random(); - - let connection_request_1 = ConnectionRequest { - validator_map, - connections: rq1_rx, - }; - - let relay_parent_1 = Hash::repeat_byte(1); - let para_id = ParaId::from(3); - connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1); - - rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); - rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - - let res = connection_requests.next().await; - assert_eq!( - res, - DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 }, - ); - - let res = connection_requests.next().await; - assert_eq!( - res, - DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_2, peer_id: peer_id_2 }, - ); - - check_next_is_pending(&mut connection_requests).await; - }); - } - - #[test] - fn adding_two_connection_requests_works() { - let mut connection_requests = ConnectionRequests::default(); - - executor::block_on(async move { - check_next_is_pending(&mut connection_requests).await; - - let validator_1 = ValidatorPair::generate().0.public(); - let validator_2 = ValidatorPair::generate().0.public(); - - let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]); - let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]); - - let mut validator_map_1 = HashMap::new(); - let mut validator_map_2 = HashMap::new(); - - validator_map_1.insert(auth_1.clone(), validator_1.clone()); - validator_map_2.insert(auth_2.clone(), validator_2.clone()); - - let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - - let (mut rq2_tx, rq2_rx) = mpsc::channel(8); - - let peer_id_1 = PeerId::random(); - let peer_id_2 = PeerId::random(); - - let connection_request_1 = ConnectionRequest { - validator_map: validator_map_1, - connections: rq1_rx, - }; - - let connection_request_2 = ConnectionRequest { - validator_map: validator_map_2, - connections: rq2_rx, - }; - - let relay_parent_1 = Hash::repeat_byte(1); - let relay_parent_2 = Hash::repeat_byte(2); - let para_id = ParaId::from(3); - - connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1); - connection_requests.put(relay_parent_2.clone(), para_id, connection_request_2); - - rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); - rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - - let res = connection_requests.next().await; - assert_eq!( - res, - DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 }, - ); - - let res = connection_requests.next().await; - assert_eq!( - res, - DiscoveredValidator { relay_parent: relay_parent_2, para_id, validator_id: validator_2, peer_id: peer_id_2 }, - ); - - check_next_is_pending(&mut connection_requests).await; - }); - } - - #[test] - fn same_relay_parent_diffent_para_ids() { - let mut connection_requests = ConnectionRequests::default(); - - executor::block_on(async move { - check_next_is_pending(&mut connection_requests).await; - - let validator_1 = ValidatorPair::generate().0.public(); - let validator_2 = ValidatorPair::generate().0.public(); - - let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]); - let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]); - - let mut validator_map_1 = HashMap::new(); - let mut validator_map_2 = HashMap::new(); - - validator_map_1.insert(auth_1.clone(), validator_1.clone()); - validator_map_2.insert(auth_2.clone(), validator_2.clone()); - - let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - let (mut rq2_tx, rq2_rx) = mpsc::channel(8); - - let peer_id_1 = PeerId::random(); - let peer_id_2 = PeerId::random(); - - let connection_request_1 = ConnectionRequest { - validator_map: validator_map_1, - connections: rq1_rx, - }; - - let connection_request_2 = ConnectionRequest { - validator_map: validator_map_2, - connections: rq2_rx, - }; - - let relay_parent = Hash::repeat_byte(1); - let para_id_1 = ParaId::from(1); - let para_id_2 = ParaId::from(2); - - connection_requests.put(relay_parent.clone(), para_id_1, connection_request_1); - connection_requests.put(relay_parent.clone(), para_id_2, connection_request_2); - - rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); - rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - - connection_requests.remove(&relay_parent, para_id_1); - - let res = connection_requests.next().await; - assert_eq!( - res, - DiscoveredValidator { relay_parent, para_id: para_id_2, validator_id: validator_2, peer_id: peer_id_2 }, - ); - - check_next_is_pending(&mut connection_requests).await; - }); - } - - #[test] - fn replacing_a_connection_request_works() { - let mut connection_requests = ConnectionRequests::default(); - - executor::block_on(async move { - check_next_is_pending(&mut connection_requests).await; - - let validator_1 = ValidatorPair::generate().0.public(); - let validator_2 = ValidatorPair::generate().0.public(); - - let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]); - let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]); - - let mut validator_map_1 = HashMap::new(); - let mut validator_map_2 = HashMap::new(); - - validator_map_1.insert(auth_1.clone(), validator_1.clone()); - validator_map_2.insert(auth_2.clone(), validator_2.clone()); - - let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - - let (mut rq2_tx, rq2_rx) = mpsc::channel(8); - - let peer_id_1 = PeerId::random(); - let peer_id_2 = PeerId::random(); - - let connection_request_1 = ConnectionRequest { - validator_map: validator_map_1, - connections: rq1_rx, - }; - - let connection_request_2 = ConnectionRequest { - validator_map: validator_map_2, - connections: rq2_rx, - }; - - let relay_parent = Hash::repeat_byte(3); - let para_id = ParaId::from(3); - - connection_requests.put(relay_parent.clone(), para_id, connection_request_1); - - rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap(); - - let res = connection_requests.next().await; - assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_1, peer_id: peer_id_1.clone() }); - - connection_requests.put(relay_parent.clone(), para_id, connection_request_2); - - assert!(rq1_tx.send((auth_1, peer_id_1.clone())).await.is_err()); - - rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); - - let res = connection_requests.next().await; - assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_2, peer_id: peer_id_2 }); - - check_next_is_pending(&mut connection_requests).await; - }); - } -} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 55b733864f..a0d274ec6f 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -241,16 +241,17 @@ pub enum NetworkBridgeMessage { /// /// Also ask the network to stay connected to these peers at least /// until the request is revoked. - /// This can be done by dropping the receiver. + /// + /// A caller can learn about validator connections by listening to the + /// `PeerConnected` events from the network bridge. ConnectToValidators { /// Ids of the validators to connect to. validator_ids: Vec, /// The underlying protocol to use for this request. peer_set: PeerSet, - /// Response sender by which the issuer can learn the `PeerId`s of - /// the validators as they are connected. - /// The response is sent immediately for already connected peers. - connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + /// A request is revoked by dropping the `keep_alive` sender. + /// The revokation takes place upon the next connection request. + keep_alive: oneshot::Receiver<()>, }, }