Log info about low connectivity and unreachable validators (#3916)

* Attempt to add log stats to gossip-support.

* WIP: Keep track of connected validators.

* Clarify metric.

* WIP: Make gossip support report connectivity.

* WIP: Fixing tests.

* Fix network bridge + integrate in overseer.

* Consistent naming.

* Fix logic error

* cargo fmt

* Pretty logs.

* cargo fmt

* Use `Delay` to trigger periodic checks.

* fmt

* Fix warning for authority set size of 1.

* More correct ratio report if there are no resolved validators.

* Prettier rendering of empty set.

* Fix typo.

* Another typo.

* Don't check on every leaf update.

* Make compatible with older rustc.

* Fix tests.

* Demote warning.
This commit is contained in:
Robert Klotzner
2021-09-27 16:30:02 +02:00
committed by GitHub
parent fdebbbf4b3
commit 7c3b3c4a59
11 changed files with 635 additions and 315 deletions
+5
View File
@@ -6063,7 +6063,10 @@ name = "polkadot-gossip-support"
version = "0.9.9"
dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.17",
"futures-timer 3.0.2",
"lazy_static",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
@@ -6071,11 +6074,13 @@ dependencies = [
"polkadot-primitives",
"rand 0.8.4",
"rand_chacha 0.3.1",
"sc-network",
"sp-application-crypto",
"sp-consensus-babe",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-tracing",
"tracing",
]
+22 -1
View File
@@ -198,7 +198,7 @@ impl metrics::Metrics for Metrics {
prometheus::GaugeVec::new(
prometheus::Opts::new(
"parachain_desired_peer_count",
"The number of peers that the local node is expected to connect to on a parachain-related peer-set",
"The number of peers that the local node is expected to connect to on a parachain-related peer-set (either including or not including unresolvable authorities, depending on whether `ConnectToValidators` or `ConnectToValidatorsResolved` was used.)",
),
&["protocol"]
)?,
@@ -552,6 +552,27 @@ where
network_service = ns;
authority_discovery_service = ads;
}
NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set,
} => {
tracing::trace!(
target: LOG_TARGET,
action = "ConnectToPeers",
peer_set = ?peer_set,
?validator_addrs,
"Received a resolved validator connection request",
);
metrics.note_desired_peer_count(peer_set, validator_addrs.len());
let all_addrs = validator_addrs.into_iter().flatten().collect();
network_service = validator_discovery.on_resolved_request(
all_addrs,
peer_set,
network_service,
).await;
}
NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
} => {
+13 -3
View File
@@ -37,7 +37,8 @@ use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_subsystem::{
jaeger,
messages::{
ApprovalDistributionMessage, BitfieldDistributionMessage, StatementDistributionMessage,
ApprovalDistributionMessage, BitfieldDistributionMessage, GossipSupportMessage,
StatementDistributionMessage,
},
ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal,
};
@@ -337,6 +338,13 @@ async fn assert_sends_validation_event_to_all(
ApprovalDistributionMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::GossipSupport(
GossipSupportMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
}
async fn assert_sends_collation_event_to_all(
@@ -1189,7 +1197,7 @@ fn send_messages_to_peers() {
fn spread_event_to_subsystems_is_up_to_date() {
// Number of subsystems expected to be interested in a network event,
// and hence the network event broadcasted to.
const EXPECTED_COUNT: usize = 3;
const EXPECTED_COUNT: usize = 4;
let mut cnt = 0_usize;
for msg in AllMessages::dispatch_iter(NetworkBridgeEvent::PeerDisconnected(PeerId::random())) {
@@ -1219,7 +1227,9 @@ fn spread_event_to_subsystems_is_up_to_date() {
AllMessages::ApprovalDistribution(_) => {
cnt += 1;
},
AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
AllMessages::GossipSupport(_) => {
cnt += 1;
},
AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeParticipation(_) =>
unreachable!("Not interested in network events"),
@@ -47,6 +47,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
Self { state: Default::default(), _phantom: PhantomData }
}
/// Connect to already resolved addresses:
pub async fn on_resolved_request(
&mut self,
newly_requested: HashSet<Multiaddr>,
peer_set: PeerSet,
mut network_service: N,
) -> N {
let state = &mut self.state[peer_set];
// clean up revoked requests
let multiaddr_to_remove: HashSet<_> =
state.previously_requested.difference(&newly_requested).cloned().collect();
let multiaddr_to_add: HashSet<_> =
newly_requested.difference(&state.previously_requested).cloned().collect();
state.previously_requested = newly_requested;
tracing::debug!(
target: LOG_TARGET,
?peer_set,
added = multiaddr_to_add.len(),
removed = multiaddr_to_remove.len(),
"New ConnectToValidators resolved request",
);
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
.await;
network_service
}
/// On a new connection request, a peer set update will be issued.
/// It will ask the network to connect to the validators and not disconnect
/// from them at least until the next request is issued for the same peer set.
@@ -59,7 +97,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
failed: oneshot::Sender<usize>,
mut network_service: N,
network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
// collect multiaddress of validators
@@ -82,39 +120,19 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
}
let state = &mut self.state[peer_set];
// clean up revoked requests
let multiaddr_to_remove: HashSet<_> =
state.previously_requested.difference(&newly_requested).cloned().collect();
let multiaddr_to_add: HashSet<_> =
newly_requested.difference(&state.previously_requested).cloned().collect();
state.previously_requested = newly_requested;
tracing::debug!(
target: LOG_TARGET,
?peer_set,
?requested,
added = multiaddr_to_add.len(),
removed = multiaddr_to_remove.len(),
?failed_to_resolve,
"New ConnectToValidators request",
);
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
.await;
let r = self.on_resolved_request(newly_requested, peer_set, network_service).await;
let _ = failed.send(failed_to_resolve);
(network_service, authority_discovery_service)
(r, authority_discovery_service)
}
}
@@ -8,6 +8,7 @@ edition = "2018"
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem = { path = "../../subsystem" }
@@ -15,6 +16,7 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
futures = "0.3.17"
futures-timer = "3.0.2"
rand = { version = "0.8.3", default-features = false }
rand_chacha = { version = "0.3.1", default-features = false }
tracing = "0.1.28"
@@ -22,7 +24,10 @@ tracing = "0.1.28"
[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
assert_matches = "1.4.0"
async-trait = "0.1.51"
lazy_static = "1.4.0"
+290 -168
View File
@@ -24,20 +24,35 @@
//! in this graph will be forwarded to the network bridge with
//! the `NetworkBridgeMessage::NewGossipTopology` message.
use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::peer_set::PeerSet;
use std::{
collections::HashMap,
fmt,
time::{Duration, Instant},
};
use futures::{channel::oneshot, select, FutureExt as _};
use futures_timer::Delay;
use rand::{seq::SliceRandom as _, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sc_network::Multiaddr;
use sp_application_crypto::{AppKey, Public};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_network_protocol::{
authority_discovery::AuthorityDiscovery, peer_set::PeerSet, v1::GossipSuppportNetworkMessage,
PeerId,
};
use polkadot_node_subsystem::{
messages::{GossipSupportMessage, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest},
messages::{
GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest,
},
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
SubsystemError,
};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Hash, SessionIndex};
use rand::{seq::SliceRandom as _, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sp_application_crypto::{AppKey, Public};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use std::time::{Duration, Instant};
#[cfg(test)]
mod tests;
@@ -56,13 +71,13 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
/// https://github.com/paritytech/substrate/blob/fc49802f263529160635471c8a17888846035f5d/client/authority-discovery/src/lib.rs#L88
const LOW_CONNECTIVITY_WARN_DELAY: Duration = Duration::from_secs(600);
/// The Gossip Support subsystem.
pub struct GossipSupport {
keystore: SyncCryptoStorePtr,
}
/// If connectivity is lower than this in percent, issue warning in logs.
const LOW_CONNECTIVITY_WARN_THRESHOLD: usize = 90;
/// The Gossip Support subsystem.
pub struct GossipSupport<AD> {
keystore: SyncCryptoStorePtr,
#[derive(Default)]
struct State {
last_session_index: Option<SessionIndex>,
// Some(timestamp) if we failed to resolve
// at least a third of authorities the last time.
@@ -75,43 +90,73 @@ struct State {
/// potential sequence of failed attempts. It will be cleared once we reached >2/3
/// connectivity.
failure_start: Option<Instant>,
/// Successfully resolved connections
///
/// waiting for actual connection.
resolved_authorities: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
/// Actually connected authorities.
connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
/// By `PeerId`.
///
/// Needed for efficient handling of disconnect events.
connected_authorities_by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
/// Authority discovery service.
authority_discovery: AD,
}
impl GossipSupport {
impl<AD> GossipSupport<AD>
where
AD: AuthorityDiscovery,
{
/// Create a new instance of the [`GossipSupport`] subsystem.
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
Self { keystore }
pub fn new(keystore: SyncCryptoStorePtr, authority_discovery: AD) -> Self {
Self {
keystore,
last_session_index: None,
last_failure: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
connected_authorities_by_peer_id: HashMap::new(),
authority_discovery,
}
}
async fn run<Context>(self, ctx: Context)
async fn run<Context>(mut self, mut ctx: Context) -> Self
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let mut state = State::default();
self.run_inner(ctx, &mut state).await;
}
async fn run_inner<Context>(self, mut ctx: Context, state: &mut State)
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let Self { keystore } = self;
fn get_connectivity_check_delay() -> Delay {
Delay::new(LOW_CONNECTIVITY_WARN_DELAY)
}
let mut next_connectivity_check = get_connectivity_check_delay().fuse();
loop {
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to receive a message from Overseer, exiting",
);
return
},
};
let message = select!(
_ = next_connectivity_check => {
self.check_connectivity();
next_connectivity_check = get_connectivity_check_delay().fuse();
continue
}
result = ctx.recv().fuse() =>
match result {
Ok(message) => message,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to receive a message from Overseer, exiting",
);
return self
},
}
);
match message {
FromOverseer::Communication { .. } => {},
FromOverseer::Communication {
msg: GossipSupportMessage::NetworkBridgeUpdateV1(ev),
} => self.handle_connect_disconnect(ev),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
..
@@ -119,15 +164,191 @@ impl GossipSupport {
tracing::trace!(target: LOG_TARGET, "active leaves signal");
let leaves = activated.into_iter().map(|a| a.hash);
if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await {
if let Err(e) = self.handle_active_leaves(&mut ctx, leaves).await {
tracing::debug!(target: LOG_TARGET, error = ?e);
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return,
FromOverseer::Signal(OverseerSignal::Conclude) => return self,
}
}
}
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async fn handle_active_leaves<Context>(
&mut self,
ctx: &mut Context,
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
for leaf in leaves {
let current_index =
util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
_ => leaf_session,
};
let maybe_issue_connection =
if force_request { leaf_session } else { maybe_new_session };
if let Some((session_index, relay_parent)) = maybe_issue_connection {
let is_new_session = maybe_new_session.is_some();
if is_new_session {
tracing::debug!(
target: LOG_TARGET,
%session_index,
"New session detected",
);
}
let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?;
let our_index = ensure_i_am_an_authority(&self.keystore, &all_authorities).await?;
let other_authorities = {
let mut authorities = all_authorities.clone();
authorities.swap_remove(our_index);
authorities
};
self.issue_connection_request(ctx, other_authorities).await?;
if is_new_session {
self.last_session_index = Some(session_index);
update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?;
}
}
}
Ok(())
}
async fn issue_connection_request<Context>(
&mut self,
ctx: &mut Context,
authorities: Vec<AuthorityDiscoveryId>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let num = authorities.len();
let mut validator_addrs = Vec::with_capacity(authorities.len());
let mut failures = 0;
let mut resolved = HashMap::with_capacity(authorities.len());
for authority in authorities {
if let Some(addrs) =
self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
{
validator_addrs.push(addrs.clone());
resolved.insert(authority, addrs);
} else {
failures += 1;
tracing::debug!(
target: LOG_TARGET,
"Couldn't resolve addresses of authority: {:?}",
authority
);
}
}
self.resolved_authorities = resolved;
tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
ctx.send_message(NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set: PeerSet::Validation,
})
.await;
// issue another request for the same session
// if at least a third of the authorities were not resolved.
if 3 * failures >= num {
let timestamp = Instant::now();
match self.failure_start {
None => self.failure_start = Some(timestamp),
Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => {
tracing::warn!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity - authority lookup failed for too many validators."
);
},
Some(_) => {
tracing::debug!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity (due to authority lookup failures) - expected on startup."
);
},
}
self.last_failure = Some(timestamp);
} else {
self.last_failure = None;
self.failure_start = None;
};
Ok(())
}
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSuppportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => {
if let Some(authority) = o_authority {
self.connected_authorities.insert(authority.clone(), peer_id);
self.connected_authorities_by_peer_id.insert(peer_id, authority);
}
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) {
self.connected_authorities.remove(&authority);
}
},
NetworkBridgeEvent::OurViewChange(_) => {},
NetworkBridgeEvent::PeerViewChange(_, _) => {},
NetworkBridgeEvent::NewGossipTopology(_) => {},
NetworkBridgeEvent::PeerMessage(_, v) => {
match v {};
},
}
}
/// Check connectivity and report on it in logs.
fn check_connectivity(&mut self) {
let absolute_connected = self.connected_authorities.len();
let absolute_resolved = self.resolved_authorities.len();
let connected_ratio =
(100 * absolute_connected).checked_div(absolute_resolved).unwrap_or(100);
let unconnected_authorities = self
.resolved_authorities
.iter()
.filter(|(a, _)| !self.connected_authorities.contains_key(a));
// TODO: Make that warning once connectivity issues are fixed (no point in warning, if
// we already know it is broken.
// https://github.com/paritytech/polkadot/issues/3921
if connected_ratio <= LOW_CONNECTIVITY_WARN_THRESHOLD {
tracing::debug!(
target: LOG_TARGET,
"Connectivity seems low, we are only connected to {}% of available validators (see debug logs for details)", connected_ratio
);
}
tracing::debug!(
target: LOG_TARGET,
?connected_ratio,
?absolute_connected,
?absolute_resolved,
unconnected_authorities = %PrettyAuthorities(unconnected_authorities),
"Connectivity Report"
);
}
}
async fn determine_relevant_authorities<Context>(
@@ -161,22 +382,6 @@ async fn ensure_i_am_an_authority(
Err(util::Error::NotAValidator)
}
/// A helper function for making a `ConnectToValidators` request.
async fn connect_to_authorities<Context>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
) -> oneshot::Receiver<usize>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let (failed, failed_rx) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, failed })
.await;
failed_rx
}
/// We partition the list of all sorted `authorities` into `sqrt(len)` groups of `sqrt(len)` size
/// and form a matrix where each validator is connected to all validators in its row and column.
/// This is similar to `[web3]` research proposed topology, except for the groups are not parachain
@@ -253,119 +458,11 @@ fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator<Item = usize>
row_neighbors.chain(column_neighbors).filter(move |i| *i != our_index)
}
impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async fn handle_active_leaves<Context>(
&mut self,
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
for leaf in leaves {
let current_index =
util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
_ => leaf_session,
};
let maybe_issue_connection =
if force_request { leaf_session } else { maybe_new_session };
if let Some((session_index, relay_parent)) = maybe_issue_connection {
let is_new_session = maybe_new_session.is_some();
if is_new_session {
tracing::debug!(
target: LOG_TARGET,
%session_index,
"New session detected",
);
}
let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?;
let our_index = ensure_i_am_an_authority(keystore, &all_authorities).await?;
let other_authorities = {
let mut authorities = all_authorities.clone();
authorities.swap_remove(our_index);
authorities
};
self.issue_connection_request(ctx, other_authorities).await?;
if is_new_session {
self.last_session_index = Some(session_index);
update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?;
}
}
}
Ok(())
}
async fn issue_connection_request<Context>(
&mut self,
ctx: &mut Context,
authorities: Vec<AuthorityDiscoveryId>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let num = authorities.len();
tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
let failures = connect_to_authorities(ctx, authorities, PeerSet::Validation).await;
// we await for the request to be processed
// this is fine, it should take much less time than one session
let failures = failures.await.unwrap_or(num);
// issue another request for the same session
// if at least a third of the authorities were not resolved
if failures >= num / 3 {
let timestamp = Instant::now();
match self.failure_start {
None => self.failure_start = Some(timestamp),
Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => {
tracing::warn!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity - authority lookup failed for too many validators."
);
},
Some(_) => {
tracing::debug!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity (due to authority lookup failures) - expected on startup."
);
},
}
self.last_failure = Some(timestamp);
} else {
self.last_failure = None;
self.failure_start = None;
};
Ok(())
}
}
impl<Context> overseer::Subsystem<Context, SubsystemError> for GossipSupport
impl<Context, AD> overseer::Subsystem<Context, SubsystemError> for GossipSupport<AD>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
AD: AuthorityDiscovery + Clone,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
@@ -373,3 +470,28 @@ where
SpawnedSubsystem { name: "gossip-support-subsystem", future }
}
}
/// Helper struct to get a nice rendering of unreachable authorities.
struct PrettyAuthorities<I>(I);
impl<'a, I> fmt::Display for PrettyAuthorities<I>
where
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a Vec<Multiaddr>)> + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut authorities = self.0.clone().peekable();
if authorities.peek().is_none() {
write!(f, "None")?;
} else {
write!(f, "\n")?;
}
for (authority, addrs) in authorities {
write!(f, "{}:\n", authority)?;
for addr in addrs {
write!(f, " {}\n", addr)?;
}
write!(f, "\n")?;
}
Ok(())
}
}
+216 -114
View File
@@ -16,7 +16,17 @@
//! Unit tests for Gossip Support Subsystem.
use super::*;
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use async_trait::async_trait;
use futures::{executor, future, Future};
use lazy_static::lazy_static;
use sc_network::multiaddr::Protocol;
use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch};
use sp_keyring::Sr25519Keyring;
use polkadot_node_subsystem::{
jaeger,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest},
@@ -24,47 +34,124 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt as _;
use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch};
use sp_keyring::Sr25519Keyring;
use test_helpers::mock::make_ferdie_keystore;
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use std::{sync::Arc, time::Duration};
use super::*;
lazy_static! {
static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new();
static ref AUTHORITIES: Vec<AuthorityDiscoveryId> = {
let mut authorities = OTHER_AUTHORITIES.clone();
authorities.push(Sr25519Keyring::Ferdie.public().into());
authorities
};
static ref OTHER_AUTHORITIES: Vec<AuthorityDiscoveryId> = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
Sr25519Keyring::Eve.public().into(),
Sr25519Keyring::One.public().into(),
Sr25519Keyring::Two.public().into(),
];
static ref NEIGHBORS: Vec<AuthorityDiscoveryId> = vec![
Sr25519Keyring::Two.public().into(),
Sr25519Keyring::Charlie.public().into(),
Sr25519Keyring::Eve.public().into(),
];
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<GossipSupportMessage>;
fn test_harness<T: Future<Output = VirtualOverseer>>(
mut state: State,
#[derive(Debug, Clone)]
struct MockAuthorityDiscovery {
addrs: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
authorities: HashMap<PeerId, AuthorityDiscoveryId>,
}
impl MockAuthorityDiscovery {
fn new() -> Self {
let authorities: HashMap<_, _> =
AUTHORITIES.clone().into_iter().map(|a| (PeerId::random(), a)).collect();
let addrs = authorities
.clone()
.into_iter()
.map(|(p, a)| {
let multiaddr = Multiaddr::empty().with(Protocol::P2p(p.into()));
(a, vec![multiaddr])
})
.collect();
Self { addrs, authorities }
}
}
#[async_trait]
impl AuthorityDiscovery for MockAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
authority: polkadot_primitives::v1::AuthorityDiscoveryId,
) -> Option<Vec<sc_network::Multiaddr>> {
self.addrs.get(&authority).cloned()
}
async fn get_authority_id_by_peer_id(
&mut self,
peer_id: polkadot_node_network_protocol::PeerId,
) -> Option<polkadot_primitives::v1::AuthorityDiscoveryId> {
self.authorities.get(&peer_id).cloned()
}
}
async fn get_other_authorities_addrs() -> Vec<Vec<Multiaddr>> {
let mut addrs = Vec::with_capacity(OTHER_AUTHORITIES.len());
let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone();
for authority in OTHER_AUTHORITIES.iter().cloned() {
if let Some(addr) = discovery.get_addresses_by_authority_id(authority).await {
addrs.push(addr);
}
}
addrs
}
async fn get_other_authorities_addrs_map() -> HashMap<AuthorityDiscoveryId, Vec<Multiaddr>> {
let mut addrs = HashMap::with_capacity(OTHER_AUTHORITIES.len());
let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone();
for authority in OTHER_AUTHORITIES.iter().cloned() {
if let Some(addr) = discovery.get_addresses_by_authority_id(authority.clone()).await {
addrs.insert(authority, addr);
}
}
addrs
}
fn make_subsystem() -> GossipSupport<MockAuthorityDiscovery> {
GossipSupport::new(make_ferdie_keystore(), MOCK_AUTHORITY_DISCOVERY.clone())
}
fn test_harness<T: Future<Output = VirtualOverseer>, AD: AuthorityDiscovery>(
subsystem: GossipSupport<AD>,
test_fn: impl FnOnce(VirtualOverseer) -> T,
) -> State {
) -> GossipSupport<AD> {
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let keystore = make_ferdie_keystore();
let subsystem = GossipSupport::new(keystore);
{
let subsystem = subsystem.run_inner(context, &mut state);
let subsystem = subsystem.run(context);
let test_fut = test_fn(virtual_overseer);
let test_fut = test_fn(virtual_overseer);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::join(
async move {
let mut overseer = test_fut.await;
overseer
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.timeout(TIMEOUT)
.await
.expect("Conclude send timeout");
},
subsystem,
));
}
state
let (_, subsystem) = executor::block_on(future::join(
async move {
let mut overseer = test_fut.await;
overseer
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.timeout(TIMEOUT)
.await
.expect("Conclude send timeout");
},
subsystem,
));
subsystem
}
const TIMEOUT: Duration = Duration::from_millis(100);
@@ -91,32 +178,6 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
msg
}
fn authorities() -> Vec<AuthorityDiscoveryId> {
let mut authorities = other_authorities();
authorities.push(Sr25519Keyring::Ferdie.public().into());
authorities
}
// Authorities other than ourselves:
fn other_authorities() -> Vec<AuthorityDiscoveryId> {
vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
Sr25519Keyring::Eve.public().into(),
Sr25519Keyring::One.public().into(),
Sr25519Keyring::Two.public().into(),
]
}
fn neighbors() -> Vec<AuthorityDiscoveryId> {
vec![
Sr25519Keyring::Two.public().into(),
Sr25519Keyring::Charlie.public().into(),
Sr25519Keyring::Eve.public().into(),
]
}
async fn test_neighbors(overseer: &mut VirtualOverseer) {
assert_matches!(
overseer_recv(overseer).await,
@@ -145,7 +206,7 @@ async fn test_neighbors(overseer: &mut VirtualOverseer) {
}) => {
let mut got: Vec<_> = our_neighbors.into_iter().collect();
got.sort();
assert_eq!(got, neighbors());
assert_eq!(got, NEIGHBORS.clone());
}
);
}
@@ -153,7 +214,7 @@ async fn test_neighbors(overseer: &mut VirtualOverseer) {
#[test]
fn issues_a_connection_request_on_new_session() {
let hash = Hash::repeat_byte(0xAA);
let state = test_harness(State::default(), |mut virtual_overseer| async move {
let state = test_harness(make_subsystem(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
overseer_signal_active_leaves(overseer, hash).await;
assert_matches!(
@@ -173,20 +234,18 @@ fn issues_a_connection_request_on_new_session() {
RuntimeApiRequest::Authorities(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(authorities())).unwrap();
tx.send(Ok(AUTHORITIES.clone())).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set,
failed,
}) => {
assert_eq!(validator_ids, other_authorities());
assert_eq!(validator_addrs, get_other_authorities_addrs().await);
assert_eq!(peer_set, PeerSet::Validation);
failed.send(0).unwrap();
}
);
@@ -241,20 +300,18 @@ fn issues_a_connection_request_on_new_session() {
RuntimeApiRequest::Authorities(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(authorities())).unwrap();
tx.send(Ok(AUTHORITIES.clone())).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set,
failed,
}) => {
assert_eq!(validator_ids, other_authorities());
assert_eq!(validator_addrs, get_other_authorities_addrs().await);
assert_eq!(peer_set, PeerSet::Validation);
failed.send(0).unwrap();
}
);
@@ -266,54 +323,96 @@ fn issues_a_connection_request_on_new_session() {
assert!(state.last_failure.is_none());
}
#[test]
fn test_log_output() {
sp_tracing::try_init_simple();
let alice: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into();
let bob = Sr25519Keyring::Bob.public().into();
let unconnected_authorities = {
let mut m = HashMap::new();
let peer_id = PeerId::random();
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let addrs = vec![addr.clone(), addr];
m.insert(alice, addrs);
let peer_id = PeerId::random();
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let addrs = vec![addr.clone(), addr];
m.insert(bob, addrs);
m
};
tracing::debug!(
target: LOG_TARGET,
unconnected_authorities = %PrettyAuthorities(unconnected_authorities.iter()),
"Connectivity Report"
);
}
#[test]
fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
let hash = Hash::repeat_byte(0xAA);
let mut state = test_harness(State::default(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
overseer_signal_active_leaves(overseer, hash).await;
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(1)).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Authorities(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(authorities())).unwrap();
}
);
let mut state = make_subsystem();
// There will be two lookup failures:
let alice = Sr25519Keyring::Alice.public().into();
let bob = Sr25519Keyring::Bob.public().into();
let alice_addr = state.authority_discovery.addrs.remove(&alice);
state.authority_discovery.addrs.remove(&bob);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
peer_set,
failed,
}) => {
assert_eq!(validator_ids, other_authorities());
assert_eq!(peer_set, PeerSet::Validation);
failed.send(2).unwrap();
}
);
let mut state = {
let alice = alice.clone();
let bob = bob.clone();
test_neighbors(overseer).await;
test_harness(state, |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
overseer_signal_active_leaves(overseer, hash).await;
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(1)).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Authorities(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(AUTHORITIES.clone())).unwrap();
}
);
virtual_overseer
});
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
mut validator_addrs,
peer_set,
}) => {
let mut expected = get_other_authorities_addrs_map().await;
expected.remove(&alice);
expected.remove(&bob);
let mut expected: Vec<Vec<Multiaddr>> = expected.into_iter().map(|(_,v)| v).collect();
validator_addrs.sort();
expected.sort();
assert_eq!(validator_addrs, expected);
assert_eq!(peer_set, PeerSet::Validation);
}
);
test_neighbors(overseer).await;
virtual_overseer
})
};
assert_eq!(state.last_session_index, Some(1));
assert!(state.last_failure.is_some());
state.last_failure = state.last_failure.and_then(|i| i.checked_sub(BACKOFF_DURATION));
// One error less:
state.authority_discovery.addrs.insert(alice, alice_addr.unwrap());
let hash = Hash::repeat_byte(0xBB);
let state = test_harness(state, |mut virtual_overseer| async move {
@@ -336,20 +435,23 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
RuntimeApiRequest::Authorities(tx),
)) => {
assert_eq!(relay_parent, hash);
tx.send(Ok(authorities())).unwrap();
tx.send(Ok(AUTHORITIES.clone())).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
mut validator_addrs,
peer_set,
failed,
}) => {
assert_eq!(validator_ids, other_authorities());
let mut expected = get_other_authorities_addrs_map().await;
expected.remove(&bob);
let mut expected: Vec<Vec<Multiaddr>> = expected.into_iter().map(|(_,v)| v).collect();
expected.sort();
validator_addrs.sort();
assert_eq!(validator_addrs, expected);
assert_eq!(peer_set, PeerSet::Validation);
failed.send(1).unwrap();
}
);
+20
View File
@@ -294,6 +294,8 @@ pub mod v1 {
UncheckedSignedFullStatement,
};
use crate::WrongVariant;
/// Network messages used by the bitfield distribution subsystem.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub enum BitfieldDistributionMessage {
@@ -386,6 +388,10 @@ pub mod v1 {
Approvals(Vec<IndirectSignedApprovalVote>),
}
/// Dummy network message type, so we will receive connect/disconnect events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GossipSuppportNetworkMessage {}
/// Network messages used by the collator protocol subsystem
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub enum CollatorProtocolMessage {
@@ -420,6 +426,20 @@ pub mod v1 {
impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage);
impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage);
impl TryFrom<ValidationProtocol> for GossipSuppportNetworkMessage {
type Error = WrongVariant;
fn try_from(_: ValidationProtocol) -> Result<Self, Self::Error> {
Err(WrongVariant)
}
}
impl<'a> TryFrom<&'a ValidationProtocol> for &'a GossipSuppportNetworkMessage {
type Error = WrongVariant;
fn try_from(_: &'a ValidationProtocol) -> Result<Self, Self::Error> {
Err(WrongVariant)
}
}
/// All network messages on the collation peer-set.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub enum CollationProtocol {
+1 -1
View File
@@ -403,7 +403,7 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, ApprovalVotingMessage)]
approval_voting: ApprovalVoting,
#[subsystem(no_dispatch, GossipSupportMessage)]
#[subsystem(GossipSupportMessage)]
gossip_support: GossipSupport,
#[subsystem(no_dispatch, DisputeCoordinatorMessage)]
+5 -2
View File
@@ -147,7 +147,7 @@ pub fn create_default_subsystems<'a, Spawner, RuntimeClient>(
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
GossipSupportSubsystem,
GossipSupportSubsystem<AuthorityDiscoveryService>,
DisputeCoordinatorSubsystem,
DisputeParticipationSubsystem,
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
@@ -236,7 +236,10 @@ where
Box::new(network_service.clone()),
Metrics::register(registry)?,
),
gossip_support: GossipSupportSubsystem::new(keystore.clone()),
gossip_support: GossipSupportSubsystem::new(
keystore.clone(),
authority_discovery_service.clone(),
),
dispute_coordinator: DisputeCoordinatorSubsystem::new(
parachains_db.clone(),
dispute_coordinator_config,
+16 -2
View File
@@ -23,6 +23,7 @@
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.
use futures::channel::oneshot;
use sc_network::Multiaddr;
use thiserror::Error;
pub use sc_network::IfDisconnected;
@@ -345,6 +346,14 @@ pub enum NetworkBridgeMessage {
/// authority discovery has failed to resolve.
failed: oneshot::Sender<usize>,
},
/// Alternative to `ConnectToValidators` in case you already know the `Multiaddrs` you want to be
/// connected to.
ConnectToResolvedValidators {
/// Each entry corresponds to the addresses of an already resolved validator.
validator_addrs: Vec<Vec<Multiaddr>>,
/// The peer set we want the connection on.
peer_set: PeerSet,
},
/// Inform the distribution subsystems about the new
/// gossip network topology formed.
NewGossipTopology {
@@ -365,6 +374,7 @@ impl NetworkBridgeMessage {
Self::SendValidationMessages(_) => None,
Self::SendCollationMessages(_) => None,
Self::ConnectToValidators { .. } => None,
Self::ConnectToResolvedValidators { .. } => None,
Self::SendRequests { .. } => None,
Self::NewGossipTopology { .. } => None,
}
@@ -850,5 +860,9 @@ pub enum ApprovalDistributionMessage {
}
/// Message to the Gossip Support subsystem.
#[derive(Debug)]
pub enum GossipSupportMessage {}
#[derive(Debug, derive_more::From)]
pub enum GossipSupportMessage {
/// Dummy constructor, so we can receive networking events.
#[from]
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::GossipSuppportNetworkMessage>),
}