diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index e5ec77b4be..c0fd2bf857 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5755,12 +5755,17 @@ dependencies = [ name = "polkadot-gossip-support" version = "0.1.0" dependencies = [ + "assert_matches", "futures 0.3.14", "polkadot-node-network-protocol", "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", + "sc-keystore", "sp-application-crypto", + "sp-core", + "sp-keyring", "sp-keystore", "tracing", ] diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 0a26de9fc7..5edfd576d6 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -509,6 +509,7 @@ where NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, + failed, } => { tracing::trace!( target: LOG_TARGET, @@ -521,6 +522,7 @@ where let (ns, ads) = validator_discovery.on_request( validator_ids, peer_set, + failed, network_service, authority_discovery_service, ).await; diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 63a968a6bc..94e7fd3171 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -22,6 +22,7 @@ use core::marker::PhantomData; use std::collections::HashSet; use async_trait::async_trait; +use futures::channel::oneshot; use sc_network::multiaddr::Multiaddr; use sc_authority_discovery::Service as AuthorityDiscoveryService; @@ -81,16 +82,19 @@ impl Service { &mut self, validator_ids: Vec, peer_set: PeerSet, + failed: oneshot::Sender, mut network_service: N, mut authority_discovery_service: AD, ) -> (N, AD) { // collect multiaddress of validators + let mut failed_to_resolve: usize = 0; let mut newly_requested = HashSet::new(); for authority in validator_ids.into_iter() { let result = authority_discovery_service.get_addresses_by_authority_id(authority.clone()).await; if let Some(addresses) = result { newly_requested.extend(addresses); } else { + failed_to_resolve += 1; tracing::debug!(target: LOG_TARGET, "Authority Discovery couldn't resolve {:?}", authority); } } @@ -120,6 +124,8 @@ impl Service { multiaddr_to_remove ).await; + let _ = failed.send(failed_to_resolve); + (network_service, authority_discovery_service) } } @@ -237,22 +243,55 @@ mod tests { let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); futures::executor::block_on(async move { + let (failed, _) = oneshot::channel(); let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], PeerSet::Validation, + failed, ns, ads, ).await; - let _ = service.on_request( + let (failed, _) = oneshot::channel(); + let (_, ads) = service.on_request( vec![authority_ids[1].clone()], PeerSet::Validation, + failed, ns, ads, ).await; let state = &service.state[PeerSet::Validation]; assert_eq!(state.previously_requested.len(), 1); + assert!(state.previously_requested.contains(ads.by_authority_id.get(&authority_ids[1]).unwrap())); + }); + } + + #[test] + fn failed_resolution_is_reported_properly() { + let mut service = new_service(); + + let (ns, ads) = new_network(); + + let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); + + futures::executor::block_on(async move { + let (failed, failed_rx) = oneshot::channel(); + let unknown = Sr25519Keyring::Ferdie.public().into(); + let (_, ads) = service.on_request( + vec![authority_ids[0].clone(), unknown], + PeerSet::Validation, + failed, + ns, + ads, + ).await; + + let state = &service.state[PeerSet::Validation]; + assert_eq!(state.previously_requested.len(), 1); + assert!(state.previously_requested.contains(ads.by_authority_id.get(&authority_ids[0]).unwrap())); + + let failed = failed_rx.await.unwrap(); + assert_eq!(failed, 1); }); } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index e01d41aa2b..5d17a358fc 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -455,8 +455,11 @@ async fn connect_to_validators( ctx: &mut impl SubsystemContext, validator_ids: Vec, ) { + // ignore address resolution failure + // will reissue a new request on new collation + let (failed, _) = oneshot::channel(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, peer_set: PeerSet::Collation, + validator_ids, peer_set: PeerSet::Collation, failed, })).await; } diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index 0de20c5378..61bc90e718 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -15,3 +15,12 @@ polkadot-primitives = { path = "../../../primitives" } futures = "0.3.8" tracing = "0.1.25" + +[dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } + +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } + +assert_matches = "1.4.0" diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 1ca994fdea..3d669e417c 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -18,7 +18,10 @@ //! and issuing a connection request to the validators relevant to //! the gossiping subsystems on every new session. -use futures::FutureExt as _; +#[cfg(test)] +mod tests; + +use futures::{channel::oneshot, FutureExt as _}; use polkadot_node_subsystem::{ messages::{ AllMessages, GossipSupportMessage, NetworkBridgeMessage, @@ -44,6 +47,7 @@ pub struct GossipSupport { #[derive(Default)] struct State { last_session_index: Option, + force_request: bool, } impl GossipSupport { @@ -54,12 +58,18 @@ impl GossipSupport { } } - #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] - async fn run(self, mut ctx: Context) + async fn run(self, ctx: Context) where Context: SubsystemContext, { let mut state = State::default(); + self.run_inner(ctx, &mut state).await; + } + + async fn run_inner(self, mut ctx: Context, state: &mut State) + where + Context: SubsystemContext, + { let Self { keystore } = self; loop { let message = match ctx.recv().await { @@ -128,13 +138,16 @@ pub async fn connect_to_authorities( ctx: &mut impl SubsystemContext, validator_ids: Vec, peer_set: PeerSet, -) { +) -> oneshot::Receiver { + let (failed, failed_rx) = oneshot::channel(); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, + failed, } )).await; + failed_rx } impl State { @@ -150,7 +163,7 @@ impl State { for leaf in leaves { let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??; let maybe_new_session = match self.last_session_index { - Some(i) if i >= current_index => None, + Some(i) if current_index <= i && !self.force_request => None, _ => Some((current_index, leaf)), }; @@ -158,15 +171,22 @@ impl State { tracing::debug!(target: LOG_TARGET, %new_session, "New session detected"); let authorities = determine_relevant_authorities(ctx, relay_parent).await?; ensure_i_am_an_authority(keystore, &authorities).await?; - tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request"); + let num = authorities.len(); + tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); - connect_to_authorities( + let failures = connect_to_authorities( ctx, authorities, PeerSet::Validation, ).await; + // we await for the request to be processed + // this is fine, it should take much less time than one session + let failures = failures.await.unwrap_or(num); + self.last_session_index = Some(new_session); + // issue another request if at least a third of the authorities were not resolved + self.force_request = failures >= num / 3; } } diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs new file mode 100644 index 0000000000..4d4c6abb5d --- /dev/null +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -0,0 +1,319 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Unit tests for Gossip Support Subsystem. + +use super::*; +use polkadot_node_subsystem::{ + jaeger, ActivatedLeaf, + messages::{RuntimeApiMessage, RuntimeApiRequest}, +}; +use polkadot_node_subsystem_test_helpers as test_helpers; +use polkadot_node_subsystem_util::TimeoutExt as _; +use sc_keystore::LocalKeystore; +use sp_keyring::Sr25519Keyring; +use sp_keystore::SyncCryptoStore; + +use std::sync::Arc; +use std::time::Duration; +use assert_matches::assert_matches; +use futures::{Future, executor, future}; + +type VirtualOverseer = test_helpers::TestSubsystemContextHandle; + +fn test_harness>( + mut state: State, + test_fn: impl FnOnce(VirtualOverseer) -> T, +) -> State { + let pool = sp_core::testing::TaskExecutor::new(); + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let keystore = make_ferdie_keystore(); + let subsystem = GossipSupport::new(keystore); + { + let subsystem = subsystem.run_inner(context, &mut state); + + let test_fut = test_fn(virtual_overseer); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer + .send(FromOverseer::Signal(OverseerSignal::Conclude)) + .timeout(TIMEOUT) + .await + .expect("Conclude send timeout"); + }, subsystem)); + } + + state +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +async fn overseer_signal_active_leaves( + overseer: &mut VirtualOverseer, + leaf: Hash, +) { + let leaf = ActivatedLeaf { + hash: leaf, + number: 0xdeadcafe, + span: Arc::new(jaeger::Span::Disabled), + }; + overseer + .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(leaf)))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); +} + +async fn overseer_recv( + overseer: &mut VirtualOverseer, +) -> AllMessages { + let msg = overseer + .recv() + .timeout(TIMEOUT) + .await + .expect("msg recv timeout"); + + msg +} + +fn make_ferdie_keystore() -> SyncCryptoStorePtr { + let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); + SyncCryptoStore::sr25519_generate_new( + &*keystore, + AuthorityDiscoveryId::ID, + Some(&Sr25519Keyring::Ferdie.to_seed()), + ) + .expect("Insert key into keystore"); + keystore +} + +fn authorities() -> Vec { + vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + Sr25519Keyring::Ferdie.public().into(), + Sr25519Keyring::Eve.public().into(), + Sr25519Keyring::One.public().into(), + ] +} + +#[test] +fn issues_a_connection_request_on_new_session() { + let hash = Hash::repeat_byte(0xAA); + let state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(authorities())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + failed, + }) => { + assert_eq!(validator_ids, authorities()); + assert_eq!(peer_set, PeerSet::Validation); + failed.send(0).unwrap(); + } + ); + + virtual_overseer + }); + + assert_eq!(state.last_session_index, Some(1)); + assert!(!state.force_request); + + // does not issue on the same session + let hash = Hash::repeat_byte(0xBB); + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + virtual_overseer + }); + + assert_eq!(state.last_session_index, Some(1)); + assert!(!state.force_request); + + // does on the new one + let hash = Hash::repeat_byte(0xCC); + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(2)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(authorities())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + failed, + }) => { + assert_eq!(validator_ids, authorities()); + assert_eq!(peer_set, PeerSet::Validation); + failed.send(0).unwrap(); + } + ); + + virtual_overseer + }); + assert_eq!(state.last_session_index, Some(2)); + assert!(!state.force_request); +} + +#[test] +fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { + let hash = Hash::repeat_byte(0xAA); + let state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(authorities())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + failed, + }) => { + assert_eq!(validator_ids, authorities()); + assert_eq!(peer_set, PeerSet::Validation); + failed.send(2).unwrap(); + } + ); + virtual_overseer + }); + + assert_eq!(state.last_session_index, Some(1)); + assert!(state.force_request); + + let hash = Hash::repeat_byte(0xBB); + let state = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(authorities())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + failed, + }) => { + assert_eq!(validator_ids, authorities()); + assert_eq!(peer_set, PeerSet::Validation); + failed.send(1).unwrap(); + } + ); + virtual_overseer + }); + + assert_eq!(state.last_session_index, Some(1)); + assert!(!state.force_request); +} + diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index edd6de3985..cb9a27ea89 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -253,6 +253,9 @@ pub enum NetworkBridgeMessage { validator_ids: Vec, /// The underlying protocol to use for this request. peer_set: PeerSet, + /// Sends back the number of `AuthorityDiscoveryId`s which + /// authority discovery has failed to resolve. + failed: oneshot::Sender, }, } diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 8bd002c5e8..6222744cbf 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -156,18 +156,18 @@ enum AvailabilityDistributionMessage { /// /// NOTE: The result of this fetch is not yet locally validated and could be bogus. FetchPoV { - /// The relay parent giving the necessary context. - relay_parent: Hash, - /// Validator to fetch the PoV from. - from_validator: ValidatorIndex, - /// Candidate hash to fetch the PoV for. - candidate_hash: CandidateHash, - /// Expected hash of the PoV, a PoV not matching this hash will be rejected. - pov_hash: Hash, - /// Sender for getting back the result of this fetch. - /// - /// The sender will be canceled if the fetching failed for some reason. - tx: oneshot::Sender, + /// The relay parent giving the necessary context. + relay_parent: Hash, + /// Validator to fetch the PoV from. + from_validator: ValidatorIndex, + /// Candidate hash to fetch the PoV for. + candidate_hash: CandidateHash, + /// Expected hash of the PoV, a PoV not matching this hash will be rejected. + pov_hash: Hash, + /// Sender for getting back the result of this fetch. + /// + /// The sender will be canceled if the fetching failed for some reason. + tx: oneshot::Sender, }, } ``` @@ -446,17 +446,22 @@ enum NetworkBridgeMessage { /// Connect to peers who represent the given `validator_ids`. /// /// Also ask the network to stay connected to these peers at least - /// until the request is revoked. - /// This can be done by dropping the receiver. + /// until a new request is issued. + /// + /// Because it overrides the previous request, it must be ensured + /// that `validator_ids` include all peers the subsystems + /// are interested in (per `PeerSet`). + /// + /// A caller can learn about validator connections by listening to the + /// `PeerConnected` events from the network bridge. ConnectToValidators { /// Ids of the validators to connect to. validator_ids: Vec, /// The underlying protocol to use for this request. peer_set: PeerSet, - /// Response sender by which the issuer can learn the `PeerId`s of - /// the validators as they are connected. - /// The response is sent immediately for already connected peers. - connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>, + /// Sends back the number of `AuthorityDiscoveryId`s which + /// authority discovery has failed to resolve. + failed: oneshot::Sender, }, } ```