more resilient subsystems (#1908)

* backing: extract log target

* bitfield-signing: extract log target

* utils: fix a typo

* provisioner: extract log target

* candidate selection: remove unused error variant

* bitfield-distribution: change the return type of run

* pov-distribution: extract log target

* collator-protocol: simplify runtime request

* collation-generation: do not exit early on error

* collation-generation: do not exit on double init

* collator-protocol: do not exit on errors and rename LOG_TARGET

* collator-protocol: a workaround for ununused imports warning

* Update node/network/bitfield-distribution/src/lib.rs

* collation-generation: elevate warn! to error!

* collator-protocol: fix imports

* post merge fix

* fix compilation
This commit is contained in:
Andronik Ordian
2020-11-05 14:22:41 +01:00
committed by GitHub
parent c418758ebc
commit 2cde7732da
11 changed files with 122 additions and 105 deletions
@@ -23,9 +23,9 @@
#![deny(unused_crate_dependencies)]
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use futures::{channel::oneshot, FutureExt};
use log::{trace, warn};
use log::{debug, trace, warn};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
@@ -33,7 +33,6 @@ use polkadot_subsystem::{
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange};
use polkadot_subsystem::SubsystemError;
use std::collections::{HashMap, HashSet};
const COST_SIGNATURE_INVALID: ReputationChange =
@@ -131,7 +130,7 @@ impl PerRelayParentData {
}
}
const TARGET: &'static str = "bitd";
const LOG_TARGET: &str = "bitfield_distribution";
/// The bitfield distribution subsystem.
pub struct BitfieldDistribution {
@@ -145,65 +144,82 @@ impl BitfieldDistribution {
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()>
async fn run<Context>(self, mut ctx: Context)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
// work: process incoming messages from the overseer and process accordingly.
let mut state = ProtocolState::default();
loop {
let message = ctx.recv().await?;
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
debug!(target: LOG_TARGET, "Failed to receive a message from Overseer: {}, exiting", e);
return;
},
};
match message {
FromOverseer::Communication {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
trace!(target: TARGET, "Processing DistributeBitfield");
handle_bitfield_distribution(&mut ctx, &mut state, &self.metrics, hash, signed_availability)
.await?;
trace!(target: LOG_TARGET, "Processing DistributeBitfield");
if let Err(err) = handle_bitfield_distribution(
&mut ctx,
&mut state,
&self.metrics,
hash,
signed_availability,
).await {
warn!(target: LOG_TARGET, "Failed to reply to `DistributeBitfield` message: {}", err);
}
}
FromOverseer::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
trace!(target: TARGET, "Processing NetworkMessage");
trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
warn!(target: TARGET, "Failed to handle incoming network messages: {:?}", e);
warn!(target: LOG_TARGET, "Failed to handle incoming network messages: {:?}", e);
}
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
for relay_parent in activated {
trace!(target: TARGET, "Start {:?}", relay_parent);
trace!(target: LOG_TARGET, "Start {:?}", relay_parent);
// query basic system parameters once
if let Some((validator_set, signing_context)) =
query_basics(&mut ctx, relay_parent).await?
{
// If our runtime API fails, we don't take down the node,
// but we might alter peers' reputations erroneously as a result
// of not having the correct bookkeeping. If we have lost a race
// with state pruning, it is unlikely that peers will be sending
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
match query_basics(&mut ctx, relay_parent).await {
Ok(Some((validator_set, signing_context))) => {
// If our runtime API fails, we don't take down the node,
// but we might alter peers' reputations erroneously as a result
// of not having the correct bookkeeping. If we have lost a race
// with state pruning, it is unlikely that peers will be sending
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
}
Err(e) => {
warn!(target: LOG_TARGET, "query_basics has failed: {}", e);
}
_ => {},
}
}
for relay_parent in deactivated {
trace!(target: TARGET, "Stop {:?}", relay_parent);
trace!(target: LOG_TARGET, "Stop {:?}", relay_parent);
// defer the cleanup to the view change
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
trace!(target: TARGET, "Block finalized {:?}", hash);
trace!(target: LOG_TARGET, "Block finalized {:?}", hash);
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
trace!(target: TARGET, "Conclude");
return Ok(());
trace!(target: LOG_TARGET, "Conclude");
return;
}
}
}
@@ -219,7 +235,7 @@ async fn modify_reputation<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
trace!(target: TARGET, "Reputation change of {:?} for peer {:?}", rep, peer);
trace!(target: LOG_TARGET, "Reputation change of {:?} for peer {:?}", rep, peer);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
@@ -245,7 +261,7 @@ where
job_data
} else {
trace!(
target: TARGET,
target: LOG_TARGET,
"Not supposed to work on relay parent {} related data",
relay_parent
);
@@ -254,7 +270,7 @@ where
};
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
trace!(target: TARGET, "Validator set for {:?} is empty", relay_parent);
trace!(target: LOG_TARGET, "Validator set for {:?} is empty", relay_parent);
return Ok(());
}
@@ -262,7 +278,7 @@ where
let validator = if let Some(validator) = validator_set.get(validator_index) {
validator.clone()
} else {
trace!(target: TARGET, "Could not find a validator for index {}", validator_index);
trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
return Ok(());
};
@@ -327,7 +343,7 @@ where
if interested_peers.is_empty() {
trace!(
target: TARGET,
target: LOG_TARGET,
"No peers are interested in gossip for relay parent {:?}",
message.relay_parent
);
@@ -370,7 +386,7 @@ where
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
trace!(
target: TARGET,
target: LOG_TARGET,
"Validator set for relay parent {:?} is empty",
&message.relay_parent
);
@@ -412,7 +428,7 @@ where
// only relay_message a message of a validator once
if one_per_validator.get(&validator).is_some() {
trace!(
target: TARGET,
target: LOG_TARGET,
"Already received a message for validator at index {}",
validator_index
);
@@ -458,7 +474,7 @@ where
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
trace!(target: TARGET, "Received bitfield gossip from peer {:?}", &remote);
trace!(target: LOG_TARGET, "Received bitfield gossip from peer {:?}", &remote);
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
signed_availability: bitfield,
@@ -478,7 +494,7 @@ fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemRes
for added in state.view.difference(&old_view) {
if !state.per_relay_parent.contains_key(&added) {
warn!(
target: TARGET,
target: LOG_TARGET,
"Our view contains {} but the overseer never told use we should work on this",
&added
);
@@ -583,9 +599,7 @@ where
{
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = self.run(ctx)
.map_err(|e| {
SubsystemError::with_origin("bitfield-distribution", e)
})
.map(|_| Ok(()))
.boxed();
SpawnedSubsystem {
@@ -625,7 +639,7 @@ where
SigningContext { parent_hash: relay_parent, session_index: s },
))),
(Err(e), _) | (_, Err(e)) => {
warn!(target: TARGET, "Failed to fetch basics from runtime API: {:?}", e);
warn!(target: LOG_TARGET, "Failed to fetch basics from runtime API: {:?}", e);
Ok(None)
}
}