From 7a2e1ef6c1a243e43fc2e42baf7b028ae5929dfb Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 1 Apr 2021 18:11:43 +0200 Subject: [PATCH] gossip: do not try to connect if we are not validators (#2786) * gossip: do not issue a connection request if we are not a validator * guide updates * use all relevant authorities when issuing a request * use AuthorityDiscoveryApi instead * update comments to the status quo --- polkadot/Cargo.lock | 4 + .../node/network/gossip-support/Cargo.toml | 5 + .../node/network/gossip-support/src/lib.rs | 100 +++++++++++++----- .../node/network/protocol/src/peer_set.rs | 9 +- polkadot/node/service/src/lib.rs | 10 +- .../subsystem-util/src/validator_discovery.rs | 3 +- .../roadmap/implementers-guide/src/SUMMARY.md | 1 + .../src/node/utility/gossip-support.md | 11 ++ 8 files changed, 108 insertions(+), 35 deletions(-) create mode 100644 polkadot/roadmap/implementers-guide/src/node/utility/gossip-support.md diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index a57b87059a..8e28627aa5 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5636,6 +5636,10 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", + "sp-api", + "sp-application-crypto", + "sp-authority-discovery", + "sp-keystore", "tracing", ] diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index 56d239f121..b160617343 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -5,6 +5,11 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } + polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 2e3e7e9bc3..0f68f5af71 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -18,7 +18,10 @@ //! and issuing a connection request to the validators relevant to //! the gossiping subsystems on every new session. -use futures::FutureExt as _; +use futures::{channel::mpsc, FutureExt as _}; +use std::sync::Arc; +use sp_api::ProvideRuntimeApi; +use sp_authority_discovery::AuthorityDiscoveryApi; use polkadot_node_subsystem::{ messages::{ GossipSupportMessage, @@ -27,30 +30,42 @@ use polkadot_node_subsystem::{ Subsystem, SpawnedSubsystem, SubsystemContext, }; use polkadot_node_subsystem_util::{ - validator_discovery::{ConnectionRequest, self}, + validator_discovery, self as util, }; use polkadot_primitives::v1::{ - Hash, ValidatorId, SessionIndex, + Hash, SessionIndex, AuthorityDiscoveryId, Block, BlockId, }; -use polkadot_node_network_protocol::peer_set::PeerSet; +use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; +use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; +use sp_application_crypto::{Public, AppKey}; const LOG_TARGET: &str = "parachain::gossip-support"; /// The Gossip Support subsystem. -pub struct GossipSupport {} +pub struct GossipSupport { + client: Arc, + keystore: SyncCryptoStorePtr, +} #[derive(Default)] struct State { last_session_index: Option, /// when we overwrite this, it automatically drops the previous request - _last_connection_request: Option, + _last_connection_request: Option>, } -impl GossipSupport { +impl GossipSupport +where + Client: ProvideRuntimeApi, + Client::Api: AuthorityDiscoveryApi, +{ /// Create a new instance of the [`GossipSupport`] subsystem. - pub fn new() -> Self { - Self {} + pub fn new(keystore: SyncCryptoStorePtr, client: Arc) -> Self { + Self { + client, + keystore, + } } #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] @@ -59,6 +74,7 @@ impl GossipSupport { Context: SubsystemContext, { let mut state = State::default(); + let Self { client, keystore } = self; loop { let message = match ctx.recv().await { Ok(message) => message, @@ -80,7 +96,7 @@ impl GossipSupport { tracing::trace!(target: LOG_TARGET, "active leaves signal"); let leaves = activated.into_iter().map(|a| a.hash); - if let Err(e) = state.handle_active_leaves(&mut ctx, leaves).await { + if let Err(e) = state.handle_active_leaves(&mut ctx, client.clone(), &keystore, leaves).await { tracing::debug!(target: LOG_TARGET, error = ?e); } } @@ -93,24 +109,51 @@ impl GossipSupport { } } -async fn determine_relevant_validators( - ctx: &mut impl SubsystemContext, +async fn determine_relevant_authorities( + client: Arc, relay_parent: Hash, - _session: SessionIndex, -) -> Result, util::Error> { - let validators = util::request_validators_ctx(relay_parent, ctx).await?.await??; - Ok(validators) +) -> Result, util::Error> +where + Client: ProvideRuntimeApi, + Client::Api: AuthorityDiscoveryApi, +{ + let api = client.runtime_api(); + let result = api.authorities(&BlockId::Hash(relay_parent)) + .map_err(|e| util::Error::RuntimeApi(format!("{:?}", e).into())); + result } +/// Return an error if we're not a validator in the given set (do not have keys). +async fn ensure_i_am_an_authority( + keystore: &SyncCryptoStorePtr, + authorities: &[AuthorityDiscoveryId], +) -> Result<(), util::Error> { + for v in authorities { + if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]) + .await + { + return Ok(()); + } + } + Err(util::Error::NotAValidator) +} + + impl State { /// 1. Determine if the current session index has changed. /// 2. If it has, determine relevant validators /// and issue a connection request. - async fn handle_active_leaves( + async fn handle_active_leaves( &mut self, ctx: &mut impl SubsystemContext, + client: Arc, + keystore: &SyncCryptoStorePtr, leaves: impl Iterator, - ) -> Result<(), util::Error> { + ) -> Result<(), util::Error> + where + Client: ProvideRuntimeApi, + Client::Api: AuthorityDiscoveryApi, + { for leaf in leaves { let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??; let maybe_new_session = match self.last_session_index { @@ -120,16 +163,15 @@ impl State { if let Some((new_session, relay_parent)) = maybe_new_session { tracing::debug!(target: LOG_TARGET, %new_session, "New session detected"); - let validators = determine_relevant_validators(ctx, relay_parent, new_session).await?; - tracing::debug!(target: LOG_TARGET, num = ?validators.len(), "Issuing a connection request"); + let authorities = determine_relevant_authorities(client.clone(), relay_parent).await?; + ensure_i_am_an_authority(keystore, &authorities).await?; + tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request"); - let request = validator_discovery::connect_to_validators_in_session( + let request = validator_discovery::connect_to_authorities( ctx, - relay_parent, - validators, + authorities, PeerSet::Validation, - new_session, - ).await?; + ).await; self.last_session_index = Some(new_session); self._last_connection_request = Some(request); @@ -140,11 +182,13 @@ impl State { } } -impl Subsystem for GossipSupport +impl Subsystem for GossipSupport where - C: SubsystemContext + Sync + Send, + Context: SubsystemContext + Sync + Send, + Client: ProvideRuntimeApi + Send + 'static + Sync, + Client::Api: AuthorityDiscoveryApi, { - fn start(self, ctx: C) -> SpawnedSubsystem { + fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self.run(ctx) .map(|_| Ok(())) .boxed(); diff --git a/polkadot/node/network/protocol/src/peer_set.rs b/polkadot/node/network/protocol/src/peer_set.rs index ee85da54a5..9a7561f98b 100644 --- a/polkadot/node/network/protocol/src/peer_set.rs +++ b/polkadot/node/network/protocol/src/peer_set.rs @@ -57,9 +57,12 @@ impl PeerSet { notifications_protocol: protocol, max_notification_size, set_config: sc_network::config::SetConfig { - // we want our gossip subset to always include reserved peers - in_peers: super::MIN_GOSSIP_PEERS as u32 / 2, - out_peers: 0, + // we allow full nodes to connect to validators for gossip + // to ensure any `MIN_GOSSIP_PEERS` always include reserved peers + // we limit the amount of non-reserved slots to be less + // than `MIN_GOSSIP_PEERS` in total + in_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, + out_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, reserved_nodes: Vec::new(), non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, }, diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 0390341760..8a86a4b18a 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -34,6 +34,7 @@ use { polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler}, polkadot_primitives::v1::ParachainHost, sc_authority_discovery::Service as AuthorityDiscoveryService, + sp_authority_discovery::AuthorityDiscoveryApi, sp_blockchain::HeaderBackend, sp_trie::PrefixedMemoryDB, sc_client_api::{AuxStore, ExecutorProvider}, @@ -429,7 +430,7 @@ fn real_overseer( ) -> Result<(Overseer, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, - RuntimeClient::Api: ParachainHost + BabeApi, + RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { Overseer::new( @@ -457,7 +458,7 @@ fn real_overseer( ) -> Result<(Overseer, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, - RuntimeClient::Api: ParachainHost + BabeApi, + RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { use polkadot_node_subsystem_util::metrics::Metrics; @@ -561,7 +562,10 @@ where keystore.clone(), Metrics::register(registry)?, )?, - gossip_support: GossipSupportSubsystem::new(), + gossip_support: GossipSupportSubsystem::new( + keystore.clone(), + runtime_client.clone(), + ), }; Overseer::new( diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index 292daa62f0..d851ebd6eb 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -115,7 +115,8 @@ pub async fn connect_to_validators_in_session( }) } -async fn connect_to_authorities( +/// A helper function for making a `ConnectToValidators` request. +pub async fn connect_to_authorities( ctx: &mut Context, validator_ids: Vec, peer_set: PeerSet, diff --git a/polkadot/roadmap/implementers-guide/src/SUMMARY.md b/polkadot/roadmap/implementers-guide/src/SUMMARY.md index 7a1e029992..313a765ecd 100644 --- a/polkadot/roadmap/implementers-guide/src/SUMMARY.md +++ b/polkadot/roadmap/implementers-guide/src/SUMMARY.md @@ -61,6 +61,7 @@ - [Candidate Validation](node/utility/candidate-validation.md) - [Provisioner](node/utility/provisioner.md) - [Network Bridge](node/utility/network-bridge.md) + - [Gossip Support](node/utility/gossip-support.md) - [Misbehavior Arbitration](node/utility/misbehavior-arbitration.md) - [Peer Set Manager](node/utility/peer-set-manager.md) - [Runtime API Requests](node/utility/runtime-api.md) diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/gossip-support.md b/polkadot/roadmap/implementers-guide/src/node/utility/gossip-support.md new file mode 100644 index 0000000000..55fafd7ca2 --- /dev/null +++ b/polkadot/roadmap/implementers-guide/src/node/utility/gossip-support.md @@ -0,0 +1,11 @@ +# Gossip Support + +The Gossip Support Subsystem is responsible for keeping track of session changes +and issuing a connection request to all validators in the next, current and a few past sessions +if we are a validator in these sessions. +The request will add all validators to a reserved PeerSet, meaning we will not reject a connection request +from any validator in that set. + +Gossiping subsystems will be notified when a new peer connects or disconnects by network bridge. +It is their responsibility to limit the amount of outgoing gossip messages. +At the moment we enforce a cap of `max(sqrt(peers.len()), 25)` message recipients at a time in each gossiping subsystem.