introduce errors with info (#1834)

This commit is contained in:
Bernhard Schuster
2020-10-27 08:10:03 +01:00
committed by GitHub
parent 40ea09389c
commit f345123748
58 changed files with 1983 additions and 2030 deletions
@@ -22,56 +22,99 @@
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.
#![deny(unused_crate_dependencies, unused_qualifications)]
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
PARACHAIN_KEY_TYPE_ID,
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk,
Hash as Hash, HashT, Id as ParaId,
ValidatorId, ValidatorIndex, SessionIndex,
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage,
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
NetworkBridgeEvent,
};
use std::collections::{HashMap, HashSet};
use std::io;
use std::iter;
use thiserror::Error;
const TARGET: &'static str = "avad";
#[derive(Debug, derive_more::From)]
#[derive(Debug, Error)]
enum Error {
#[from]
Erasure(polkadot_erasure_coding::Error),
#[from]
Io(io::Error),
#[from]
Oneshot(oneshot::Canceled),
#[from]
Subsystem(SubsystemError),
#[from]
RuntimeApi(RuntimeApiError),
#[from]
ChainApi(ChainApiError),
#[error("Sending PendingAvailability query failed")]
QueryPendingAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Sending StoreChunk query failed")]
StoreChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryChunk query failed")]
QueryChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryAncestors query failed")]
QueryAncestorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Sending QuerySession query failed")]
QuerySessionSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Sending QueryValidators query failed")]
QueryValidatorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
AvailabilityCoresSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
QueryAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Sending out a peer report message")]
ReportPeerMessageSend(#[source] SubsystemError),
#[error("Sending a gossip message")]
TrackedGossipMessage(#[source] SubsystemError),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
type Result<T> = std::result::Result<T, Error>;
@@ -199,22 +242,18 @@ impl ProtocolState {
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let candidates =
query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;
let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;
// register the relation of relay_parent to candidate..
// ..and the reverse association.
for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() {
self
.reverse
self.reverse
.insert(receipt_hash.clone(), relay_parent_or_ancestor.clone());
let per_candidate = self.per_candidate.entry(receipt_hash.clone())
.or_default();
let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default();
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
self
.receipts
self.receipts
.entry(relay_parent_or_ancestor)
.or_default()
.insert((receipt_hash, receipt));
@@ -240,8 +279,7 @@ impl ProtocolState {
.insert(relay_parent);
}
self
.per_relay_parent
self.per_relay_parent
.entry(relay_parent)
.or_default()
.ancestors = ancestors;
@@ -314,17 +352,21 @@ where
}
NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg {
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) =>
AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk }
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => {
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk: chunk,
}
}
};
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics).await?;
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
.await?;
}
}
Ok(())
}
/// Handle the changes necessary when our view changes.
async fn handle_our_view_change<Context>(
ctx: &mut Context,
@@ -346,19 +388,15 @@ where
for added in added.iter() {
let added = **added;
let validators = query_validators(ctx, added).await?;
let validator_index = obtain_our_validator_index(
&validators,
keystore.clone(),
).await;
state.add_relay_parent(ctx, added, validators, validator_index).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
state
.add_relay_parent(ctx, added, validators, validator_index)
.await?;
}
// handle all candidates
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) {
let per_candidate = state
.per_candidate
.entry(candidate_hash)
.or_default();
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// assure the node has the validator role
if per_candidate.validator_index.is_none() {
@@ -388,19 +426,18 @@ where
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
// only the peers which did not receive this particular erasure chunk
let per_candidate = state
.per_candidate
.entry(candidate_hash)
.or_default();
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the chunks from the cache, if not fallback
// and query the availability store
let message_id = (candidate_hash, chunk_index);
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index)
{
message.erasure_chunk.clone()
} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
} else if let Some(erasure_chunk) =
query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await?
{
erasure_chunk
} else {
continue;
@@ -415,9 +452,7 @@ where
!per_candidate
.sent_messages
.get(*peer)
.filter(|set| {
set.contains(&message_id)
})
.filter(|set| set.contains(&message_id))
.is_some()
})
.map(|peer| peer.clone())
@@ -427,7 +462,8 @@ where
erasure_chunk,
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await?;
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message)
.await?;
}
}
@@ -450,7 +486,8 @@ async fn send_tracked_gossip_message_to_peers<Context>(
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message))
.await
}
#[inline(always)]
@@ -464,7 +501,8 @@ async fn send_tracked_gossip_messages_to_peer<Context>(
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter)
.await
}
async fn send_tracked_gossip_messages_to_peers<Context>(
@@ -478,7 +516,7 @@ where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
if peers.is_empty() {
return Ok(())
return Ok(());
}
for message in message_iter {
for peer in peers.iter() {
@@ -506,7 +544,7 @@ where
),
))
.await
.map_err::<Error, _>(Into::into)?;
.map_err(|e| Error::TrackedGossipMessage(e))?;
metrics.on_chunk_distributed();
}
@@ -543,8 +581,7 @@ where
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the relevant chunk indices not sent yet
let messages = ((0 as ValidatorIndex)
..(per_candidate.validators.len() as ValidatorIndex))
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
.into_iter()
.filter_map(|erasure_chunk_index: ValidatorIndex| {
let message_id = (candidate_hash, erasure_chunk_index);
@@ -567,7 +604,8 @@ where
.cloned()
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await?;
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages)
.await?;
}
Ok(())
}
@@ -580,8 +618,13 @@ async fn obtain_our_validator_index(
keystore: SyncCryptoStorePtr,
) -> Option<ValidatorIndex> {
for (idx, validator) in validators.iter().enumerate() {
if CryptoStore::has_keys(&*keystore, &[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)]).await {
return Some(idx as ValidatorIndex)
if CryptoStore::has_keys(
&*keystore,
&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)],
)
.await
{
return Some(idx as ValidatorIndex);
}
}
None
@@ -664,8 +707,13 @@ where
live_candidate.descriptor.relay_parent.clone(),
message.erasure_chunk.index,
message.erasure_chunk.clone(),
).await? {
warn!(target: TARGET, "Failed to store erasure chunk to availability store");
)
.await?
{
warn!(
target: TARGET,
"Failed to store erasure chunk to availability store"
);
}
}
}
@@ -729,7 +777,10 @@ impl AvailabilityDistributionSubsystem {
// work: process incoming messages from the overseer.
let mut state = ProtocolState::default();
loop {
let message = ctx.recv().await.map_err::<Error, _>(Into::into)?;
let message = ctx
.recv()
.await
.map_err(|e| Error::IncomingMessageChannel(e))?;
match message {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
@@ -740,7 +791,9 @@ impl AvailabilityDistributionSubsystem {
&mut state,
&self.metrics,
event,
).await {
)
.await
{
warn!(
target: TARGET,
"Failed to handle incomming network messages: {:?}", e
@@ -767,9 +820,15 @@ where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
.map(|_| ())
.boxed();
SpawnedSubsystem {
name: "availability-distribution-subsystem",
future: Box::pin(async move { self.run(ctx) }.map(|_| ())),
future,
}
}
}
@@ -816,7 +875,6 @@ where
HashMap::<Hash, (Hash, CommittedCandidateReceipt)>::with_capacity(capacity);
for relay_parent in iter {
// register one of relay parents (not the ancestors)
let mut ancestors = query_up_to_k_ancestors_in_same_session(
ctx,
@@ -827,7 +885,6 @@ where
ancestors.push(relay_parent);
// ancestors might overlap, so check the cache too
let unknown = ancestors
.into_iter()
@@ -841,10 +898,7 @@ where
// directly extend the live_candidates with the cached value
live_candidates.extend(receipts.into_iter().map(
|(receipt_hash, receipt)| {
(
relay_parent,
(receipt_hash.clone(), receipt.clone()),
)
(relay_parent, (receipt_hash.clone(), receipt.clone()))
},
));
Some(())
@@ -877,10 +931,12 @@ where
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err::<Error, _>(Into::into)?;
.map_err(|e| Error::AvailabilityCoresSendQuery(e))?;
let all_para_ids: Vec<_> = rx
.await??;
.await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?;
let occupied_para_ids = all_para_ids
.into_iter()
@@ -910,14 +966,11 @@ where
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err::<Error, _>(Into::into)
.map_err(|e| Error::ReportPeerMessageSend(e))
}
/// Query the proof of validity for a particular candidate hash.
async fn query_data_availability<Context>(
ctx: &mut Context,
candidate_hash: Hash,
) -> Result<bool>
async fn query_data_availability<Context>(ctx: &mut Context, candidate_hash: Hash) -> Result<bool>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
@@ -925,11 +978,12 @@ where
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await?;
rx.await.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryAvailabilitySendQuery(e))?;
rx.await
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}
async fn query_chunk<Context>(
ctx: &mut Context,
candidate_hash: Hash,
@@ -940,13 +994,13 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
))
.await?;
rx.await.map_err::<Error, _>(Into::into)
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
))
.await
.map_err(|e| Error::QueryChunkSendQuery(e))?;
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
}
async fn store_chunk<Context>(
ctx: &mut Context,
candidate_hash: Hash,
@@ -958,16 +1012,19 @@ where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await?;
rx.await.map_err::<Error, _>(Into::into)
ctx.send_message(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await
.map_err(|e| Error::StoreChunkSendQuery(e))?;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
/// Request the head data for a particular para.
@@ -981,12 +1038,15 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
)))
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
)))
.await
.map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?;
rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
.map_err(|e| Error::QueryPendingAvailability(e))
}
/// Query the validator set.
@@ -1004,9 +1064,11 @@ where
));
ctx.send_message(query_validators)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryValidatorsSendQuery(e))?;
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))
}
/// Query the hash of the `K` ancestors
@@ -1026,9 +1088,11 @@ where
});
ctx.send_message(query_ancestors)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryAncestorsSendQuery(e))?;
rx.await
.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
.map_err(|e| Error::QueryAncestors(e))
}
/// Query the session index of a relay parent
@@ -1046,9 +1110,11 @@ where
));
ctx.send_message(query_session_idx_for_child)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QuerySessionSendQuery(e))?;
rx.await
.map_err(|e| Error::QuerySessionResponseChannel(e))?
.map_err(|e| Error::QuerySession(e))
}
/// Queries up to k ancestors with the constraints of equiv session
@@ -1089,7 +1155,6 @@ where
Ok(acc)
}
#[derive(Clone)]
struct MetricsInner {
gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
@@ -1108,12 +1173,14 @@ impl Metrics {
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
gossipped_availability_chunks: prometheus::register(
prometheus::Counter::new(
"parachain_gossipped_availability_chunks_total",
"Number of availability chunks gossipped to other peers."
"Number of availability chunks gossipped to other peers.",
)?,
registry,
)?,
@@ -17,22 +17,21 @@
use super::*;
use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_network_protocol::ObservedRole;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
GroupRotationInfo, HeadData, PersistedValidationData, OccupiedCore,
PoV, ScheduledCore,
GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore,
};
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_network_protocol::ObservedRole;
use futures::{executor, future, Future};
use futures_timer::Delay;
use smallvec::smallvec;
use std::{sync::Arc, time::Duration};
use sc_keystore::LocalKeystore;
use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
use smallvec::smallvec;
use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use std::{sync::Arc, time::Duration};
macro_rules! view {
( $( $hash:expr ),* $(,)? ) => [
@@ -46,9 +45,9 @@ macro_rules! delay {
};
}
fn chunk_protocol_message(message: AvailabilityGossipMessage)
-> protocol_v1::AvailabilityDistributionMessage
{
fn chunk_protocol_message(
message: AvailabilityGossipMessage,
) -> protocol_v1::AvailabilityDistributionMessage {
protocol_v1::AvailabilityDistributionMessage::Chunk(
message.candidate_hash,
message.erasure_chunk,
@@ -175,8 +174,12 @@ impl Default for TestState {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some(&validators[0].to_seed()))
.expect("Insert key into keystore");
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&validators[0].to_seed()),
)
.expect("Insert key into keystore");
let validator_public = validator_pubkeys(&validators);
@@ -867,10 +870,7 @@ fn reputation_verification() {
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
chunk_protocol_message(valid2),
),
NetworkBridgeEvent::PeerMessage(peer_a.clone(), chunk_protocol_message(valid2)),
),
)
.await;