From a4b92cd3b6c6c2f069e0a7e4469b3372bc9dcfc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 29 Oct 2020 17:43:34 +0100 Subject: [PATCH] Make sure validator discovery works with a delayed peer to validator mapping (#1886) * Make sure validator discovery works with a delayed peer to validator mapping Currently the implementation checks on connect of a peer if this peer is a validator by asking the authority discovery. It can now happen that the authority discovery is not yet aware that a given peer is an authority. This can for example happen on start up of the node. This pr changes the behavior, to make it possible to later associate a peer to a validator id. Instead of just storing the connected validators, we now store all connected peers with a vector of associated validator ids. When we get a request to connect to a given given set of validators, we start by checking the connected peers. If we didn't find a validator id in the connected peers, we ask the authority discovery for the peerid of a given authority id. When the returned peerid is part of our connected peers set, we cache and return the authority id. * Update node/network/bridge/Cargo.toml Co-authored-by: Pierre Krieger * Update node/network/bridge/src/validator_discovery.rs Co-authored-by: Pierre Krieger * Update `Cargo.lock` Co-authored-by: Pierre Krieger --- polkadot/node/network/bridge/src/lib.rs | 4 +- .../network/bridge/src/validator_discovery.rs | 175 ++++++++++++------ 2 files changed, 121 insertions(+), 58 deletions(-) diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 0ad20577fd..9ce841f083 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -16,7 +16,7 @@ //! The Network Bridge Subsystem - protocol multiplexer for Polkadot. -#![deny(unused_crate_dependencies, unused_results)] +#![deny(unused_crate_dependencies)] #![warn(missing_docs)] @@ -703,7 +703,7 @@ where PeerSet::Collation => &mut collation_peers, }; - validator_discovery.on_peer_disconnected(&peer, &mut authority_discovery_service).await; + validator_discovery.on_peer_disconnected(&peer); if peer_map.remove(&peer).is_some() { let res = match peer_set { diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 531e41cd62..da5491bf6d 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use sc_network::Multiaddr; +use sc_network::multiaddr::{Multiaddr, Protocol}; use sc_authority_discovery::Service as AuthorityDiscoveryService; use polkadot_node_network_protocol::PeerId; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; @@ -70,7 +70,6 @@ impl AuthorityDiscovery for AuthorityDiscoveryService { } } - /// This struct tracks the state for one `ConnectToValidators` request. struct NonRevokedConnectionRequestState { requested: Vec, @@ -115,39 +114,93 @@ impl NonRevokedConnectionRequestState { } } +/// Will be called by [`Service::on_request`] when a request was revoked. +/// +/// Takes the `map` of requested validators and the `id` of the validator that should be revoked. +/// +/// Returns `Some(id)` iff the request counter is `0`. +fn on_revoke(map: &mut HashMap, id: AuthorityDiscoveryId) -> Option { + if let hash_map::Entry::Occupied(mut entry) = map.entry(id) { + if entry.get_mut().saturating_sub(1) == 0 { + return Some(entry.remove_entry().0); + } + } + + None +} + +fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option { + addr.iter().last().and_then(|protocol| if let Protocol::P2p(multihash) = protocol { + PeerId::from_multihash(multihash).ok() + } else { + None + }) +} pub(super) struct Service { - // we assume one PeerId per AuthorityId is enough - connected_validators: HashMap, - // the `u64` counts the number of pending non-revoked requests for this validator + // Peers that are connected to us and authority ids associated to them. + connected_peers: HashMap>, + // The `u64` counts the number of pending non-revoked requests for this validator // note: the validators in this map are not necessarily present // in the `connected_validators` map. // Invariant: the value > 0 for non-revoked requests. requested_validators: HashMap, non_revoked_discovery_requests: Vec, // PhantomData used to make the struct generic instead of having generic methods - network: PhantomData, - authority_discovery: PhantomData, + _phantom: PhantomData<(N, AD)>, } impl Service { pub fn new() -> Self { Self { - connected_validators: HashMap::new(), + connected_peers: HashMap::new(), requested_validators: HashMap::new(), non_revoked_discovery_requests: Vec::new(), - network: PhantomData, - authority_discovery: PhantomData, + _phantom: PhantomData, } } + /// Find connected validators using the given `validator_ids`. + /// + /// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s. + async fn find_connected_validators( + &mut self, + validator_ids: &[AuthorityDiscoveryId], + authority_discovery_service: &mut AD, + ) -> HashMap { + let mut result = HashMap::new(); + + for id in validator_ids { + // First check if we already cached the validator + if let Some(pid) = self.connected_peers + .iter() + .find_map(|(pid, ids)| if ids.contains(&id) { Some(pid) } else { None }) { + result.insert(id.clone(), pid.clone()); + continue; + } + + // If not ask the authority discovery + if let Some(addresses) = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await { + for peer_id in addresses.iter().filter_map(peer_id_from_multiaddr) { + if let Some(ids) = self.connected_peers.get_mut(&peer_id) { + ids.insert(id.clone()); + result.insert(id.clone(), peer_id.clone()); + continue; + } + } + } + } + + result + } + /// On a new connection request, a priority group update will be issued. /// It will ask the network to connect to the validators and not disconnect /// from them at least until all the pending requests containing them are revoked. /// /// This method will also clean up all previously revoked requests. - // it takes `network_service` and `authority_discovery_service` by value - // and returns them as a workaround for the Future: Send requirement imposed by async fn impl. + /// it takes `network_service` and `authority_discovery_service` by value + /// and returns them as a workaround for the Future: Send requirement imposed by async fn impl. pub async fn on_request( &mut self, validator_ids: Vec, @@ -158,37 +211,13 @@ impl Service { ) -> (N, AD) { const MAX_ADDR_PER_PEER: usize = 3; - let already_connected = validator_ids.iter() - .cloned() - .filter_map(|id| { - let counter = self.requested_validators.entry(id.clone()).or_default(); - // if the counter overflows, there is something really wrong going on - *counter += 1; - - self.connected_validators - .get(&id) - .map(|peer| (id, peer.clone())) - }); - - - let on_revoke = |map: &mut HashMap, id: AuthorityDiscoveryId| -> Option { - match map.entry(id) { - hash_map::Entry::Occupied(mut entry) => { - *entry.get_mut() -= 1; - if *entry.get() == 0 { - return Some(entry.remove_entry().0); - } - } - hash_map::Entry::Vacant(_) => { - // should be unreachable - } - } - None - }; + // Increment the counter of how many times the validators were requested. + validator_ids.iter().for_each(|id| *self.requested_validators.entry(id.clone()).or_default() += 1); + let already_connected = self.find_connected_validators(&validator_ids, &mut authority_discovery_service).await; // try to send already connected peers - for (id, peer) in already_connected { - match connected.try_send((id, peer)) { + for (id, peer) in already_connected.iter() { + match connected.try_send((id.clone(), peer.clone())) { Err(e) if e.is_disconnected() => { // the request is already revoked for peer_id in validator_ids { @@ -208,17 +237,15 @@ impl Service { // collect multiaddress of validators let mut multiaddr_to_add = HashSet::new(); - for authority in validator_ids.iter().cloned() { - let result = authority_discovery_service.get_addresses_by_authority_id(authority).await; + for authority in validator_ids.iter() { + let result = authority_discovery_service.get_addresses_by_authority_id(authority.clone()).await; if let Some(addresses) = result { // We might have several `PeerId`s per `AuthorityId` // depending on the number of sentry nodes, // so we limit the max number of sentries per node to connect to. // They are going to be removed soon though: // https://github.com/paritytech/substrate/issues/6845 - for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { - let _ = multiaddr_to_add.insert(addr); - } + multiaddr_to_add.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER)); } } @@ -246,9 +273,7 @@ impl Service { for id in revoked_validators.into_iter() { let result = authority_discovery_service.get_addresses_by_authority_id(id).await; if let Some(addresses) = result { - for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { - let _ = multiaddr_to_remove.insert(addr); - } + multiaddr_to_remove.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER)); } } @@ -265,7 +290,7 @@ impl Service { let pending = validator_ids.iter() .cloned() - .filter(|id| !self.connected_validators.contains_key(id)) + .filter(|id| !already_connected.contains_key(id)) .collect::>(); self.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new( @@ -278,6 +303,7 @@ impl Service { (network_service, authority_discovery_service) } + /// Should be called when a peer connected. pub async fn on_peer_connected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) { // check if it's an authority we've been waiting for let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; @@ -285,15 +311,16 @@ impl Service { for request in self.non_revoked_discovery_requests.iter_mut() { let _ = request.on_authority_connected(&authority, peer_id); } - let _ = self.connected_validators.insert(authority, peer_id.clone()); + + self.connected_peers.entry(peer_id.clone()).or_default().insert(authority); + } else { + self.connected_peers.insert(peer_id.clone(), Default::default()); } } - pub async fn on_peer_disconnected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) { - let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; - if let Some(authority) = maybe_authority { - let _ = self.connected_validators.remove(&authority); - } + /// Should be called when a peer disconnected. + pub fn on_peer_disconnected(&mut self, peer_id: &PeerId) { + self.connected_peers.remove(peer_id); } } @@ -319,6 +346,7 @@ mod tests { priority_group: HashSet, } + #[derive(Default)] struct TestAuthorityDiscovery { by_authority_id: HashMap, by_peer_id: HashMap, @@ -600,4 +628,39 @@ mod tests { assert_eq!(ns.priority_group.len(), 1); }); } + + /// A test for when a validator connects, but the authority discovery not yet knows that the connecting node + /// is a validator. This can happen for example at startup of a node. + #[test] + fn handle_validator_connect_without_authority_discovery_knowing_it() { + let mut service = new_service(); + + let ns = TestNetwork::default(); + let mut ads = TestAuthorityDiscovery::default(); + + let validator_peer_id = PeerId::random(); + let validator_id: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into(); + + futures::executor::block_on(async move { + let (sender, mut receiver) = mpsc::channel(1); + let (_revoke_tx, revoke_rx) = oneshot::channel(); + + service.on_peer_connected(&validator_peer_id, &mut ads).await; + + let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into())); + ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone()); + ads.by_authority_id.insert(validator_id.clone(), address); + + let _ = service.on_request( + vec![validator_id.clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + assert_eq!((validator_id.clone(), validator_peer_id.clone()), receiver.next().await.unwrap()); + assert!(service.connected_peers.get(&validator_peer_id).unwrap().contains(&validator_id)); + }); + } }