Additional logging for polkadot network protocols (#2684)

* Additional logging for polkadot network protocols

* Additional log

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>

* Update node/network/availability-distribution/src/responder.rs

* Added additional chunk info

* Added additional peer info

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
This commit is contained in:
Arkadiy Paronyan
2021-03-24 12:55:50 +01:00
committed by GitHub
parent 1a3e67f41b
commit 5929d1ef15
9 changed files with 509 additions and 31 deletions
@@ -150,11 +150,22 @@ impl State {
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, _role) => {
NetworkBridgeEvent::PeerConnected(peer_id, role) => {
// insert a blank view if none already present
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?role,
"Peer connected",
);
self.peer_views.entry(peer_id).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Peer disconnected",
);
self.peer_views.remove(&peer_id);
self.blocks.iter_mut().for_each(|(_hash, entry)| {
entry.known_by.remove(&peer_id);
@@ -164,6 +175,11 @@ impl State {
self.handle_peer_view_change(ctx, peer_id, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Own view change",
);
for head in view.iter() {
if !self.blocks.contains_key(head) {
self.pending_known.entry(*head).or_default();
@@ -329,6 +345,11 @@ impl State {
peer_id: PeerId,
view: View,
) {
tracing::trace!(
target: LOG_TARGET,
?view,
"Peer view change",
);
Self::unify_with_peer(&mut self.blocks, ctx, peer_id.clone(), view.clone()).await;
let finalized_number = view.finalized_number;
let old_view = self.peer_views.insert(peer_id.clone(), view);
@@ -469,7 +490,7 @@ impl State {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
tracing::info!(
target: LOG_TARGET,
peer = ?peer_id,
?peer_id,
"Got a bad assignment from peer",
);
return;
@@ -635,7 +656,7 @@ impl State {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
tracing::info!(
target: LOG_TARGET,
peer = ?peer_id,
?peer_id,
"Got a bad approval from peer",
);
return;
@@ -705,7 +726,7 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending approval (block={}, index={})to {} peers",
"Sending approval (block={}, index={}) to {} peers",
block_hash,
candidate_index,
peers.len(),
@@ -881,7 +902,6 @@ impl ApprovalDistribution {
FromOverseer::Communication {
msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
tracing::debug!(target: LOG_TARGET, "Processing network message");
state.handle_network_msg(&mut ctx, &self.metrics, event).await;
}
FromOverseer::Communication {
@@ -101,6 +101,11 @@ impl Requester {
where
Context: SubsystemContext,
{
tracing::trace!(
target: LOG_TARGET,
?update,
"Update fetching heads"
);
let ActiveLeavesUpdate {
activated,
deactivated,
@@ -126,6 +131,11 @@ impl Requester {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
};
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
if let Some(err) = self.add_cores(ctx, leaf, cores).await? {
return Ok(Some(err));
}
@@ -74,6 +74,15 @@ where
let result = chunk.is_some();
tracing::trace!(
target: LOG_TARGET,
hash = ?req.payload.candidate_hash,
index = ?req.payload.index,
peer = ?req.peer,
has_data = ?chunk.is_some(),
"Serving chunk",
);
let response = match chunk {
None => v1::AvailabilityFetchingResponse::NoSuchChunk,
Some(chunk) => v1::AvailabilityFetchingResponse::Chunk(chunk.into()),
@@ -99,5 +108,14 @@ where
))
.await;
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
rx.await.map_err(|e| {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
?candidate_hash,
error = ?e,
"Error retrieving chunk",
);
Error::QueryChunkResponseChannel(e)
})
}
@@ -228,6 +228,12 @@ impl RequestFromBackersPhase {
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>
) -> Result<bool, mpsc::SendError> {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Requesting from backers",
);
loop {
// Pop the next backer, and proceed to next phase if we're out.
let validator_index = match self.shuffled_backers.pop() {
@@ -252,8 +258,19 @@ impl RequestFromBackersPhase {
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
).await?;
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Received full data",
);
return Ok(true);
} else {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator = ?peer_id,
"Invalid data response",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_INVALID_AVAILABLE_DATA,
@@ -264,12 +281,14 @@ impl RequestFromBackersPhase {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A response channel was cancelled while waiting for full data",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A full data request has timed out",
);
}
@@ -298,7 +317,13 @@ impl RequestChunksPhase {
while self.requesting_chunks.len() < N_PARALLEL {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
tracing::trace!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
?validator_index,
candidate_hash = ?params.candidate_hash,
"Requesting chunk",
);
to_state.send(FromInteraction::MakeChunkRequest(
params.validator_authority_keys[validator_index.0 as usize].clone(),
params.candidate_hash.clone(),
@@ -335,6 +360,13 @@ impl RequestChunksPhase {
// We need to check that the validator index matches the chunk index and
// not blindly trust the data from an untrusted peer.
if validator_index != chunk.index {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
chunk_index = ?chunk.index,
"Index mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
@@ -352,14 +384,32 @@ impl RequestChunksPhase {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Merkle proof mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
} else {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Received valid Merkle proof",
);
self.received_chunks.insert(validator_index, chunk);
}
} else {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Invalid Merkle proof",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
@@ -397,6 +447,15 @@ impl RequestChunksPhase {
self.shuffling.len(),
params.threshold,
) {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
received = %self.received_chunks.len(),
requesting = %self.requesting_chunks.len(),
n_validators = %self.shuffling.len(),
"Data recovery is not possible",
);
to_state.send(FromInteraction::Concluded(
params.candidate_hash,
Err(RecoveryError::Unavailable),
@@ -419,18 +478,39 @@ impl RequestChunksPhase {
) {
Ok(data) => {
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery complete",
);
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
} else {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery - root mismatch",
);
FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
}
}
Err(_) => FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
),
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
?err,
"Data recovery error ",
);
FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
},
};
to_state.send(concluded).await?;
@@ -871,6 +951,11 @@ async fn handle_network_update(
protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
match state.live_requests.remove(&request_id) {
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
@@ -898,6 +983,11 @@ async fn handle_network_update(
}
}
Some(a) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_requests.insert(request_id, a);
@@ -940,6 +1030,11 @@ async fn handle_network_update(
match state.live_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
@@ -965,6 +1060,11 @@ async fn handle_network_update(
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
@@ -1049,6 +1149,12 @@ async fn handle_validator_connected(
authority_id: AuthorityDiscoveryId,
peer_id: PeerId,
) -> error::Result<()> {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?authority_id,
"Validator connected",
);
if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
for awaited in discovering {
issue_request(state, ctx, peer_id.clone(), awaited).await?;
@@ -169,7 +169,11 @@ impl BitfieldDistribution {
FromOverseer::Communication {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield");
tracing::trace!(
target: LOG_TARGET,
?hash,
"Processing DistributeBitfield"
);
handle_bitfield_distribution(
&mut ctx,
&mut state,
@@ -235,7 +239,7 @@ async fn modify_reputation<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
tracing::trace!(target: LOG_TARGET, rep = ?rep, peer_id = %peer, "reputation change");
tracing::trace!(target: LOG_TARGET, ?rep, peer_id = %peer, "reputation change");
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
@@ -410,6 +414,7 @@ where
tracing::trace!(
target: LOG_TARGET,
relay_parent = %message.relay_parent,
?origin,
"Validator set is empty",
);
modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
@@ -438,6 +443,12 @@ where
if !received_set.contains(&validator) {
received_set.insert(validator.clone());
} else {
tracing::trace!(
target: LOG_TARGET,
validator_index,
?origin,
"Duplicate message",
);
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return;
};
@@ -485,24 +496,51 @@ where
let _timer = metrics.time_handle_network_msg();
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
NetworkBridgeEvent::PeerConnected(peerid, role) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
?role,
"Peer connected",
);
// insert if none already present
state.peer_views.entry(peerid).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peerid) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
"Peer disconnected",
);
// get rid of superfluous data
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
?view,
"Peer view change",
);
handle_peer_view_change(ctx, state, peerid, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Our view change",
);
handle_our_view_change(state, view);
}
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
tracing::trace!(
target: LOG_TARGET,
peer_id = %remote,
?relay_parent,
"received bitfield gossip from peer"
);
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
signed_availability: bitfield,
@@ -601,6 +639,13 @@ where
};
let _span = job_data.span.child("gossip");
tracing::trace!(
target: LOG_TARGET,
?dest,
?validator,
relay_parent = ?message.relay_parent,
"Sending gossip message"
);
job_data.message_sent_to_peer
.entry(dest.clone())
@@ -320,6 +320,17 @@ async fn distribute_collation(
return Ok(());
}
tracing::debug!(
target: LOG_TARGET,
para_id = %id,
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
?next_validators,
"Accepted collation, connecting to validators."
);
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(
ctx,
@@ -626,11 +637,20 @@ async fn send_collation(
pov: PoV,
) {
let pov = match CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Ok(compressed) => {
tracing::trace!(
target: LOG_TARGET,
size = %pov.block_data.0.len(),
compressed = %compressed.len(),
peer_id = ?request.peer,
"Sending collation."
);
compressed
},
Err(error) => {
tracing::error!(
target: LOG_TARGET,
error = ?error,
?error,
"Failed to create `CompressedPov`",
);
return
@@ -659,12 +679,14 @@ async fn handle_incoming_peer_message(
Declare(_) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
}
AdvertiseCollation(_, _) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
}
@@ -672,10 +694,17 @@ async fn handle_incoming_peer_message(
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
target: LOG_TARGET,
statement = ?statement,
?statement,
?origin,
"Collation seconded message received with none-seconded statement.",
);
} else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) {
tracing::trace!(
target: LOG_TARGET,
?statement,
?origin,
"received a `CollationSeconded`",
);
let _ = sender.send(statement);
}
}
@@ -744,18 +773,40 @@ async fn handle_network_msg(
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(_peer_id, _observed_role) => {
PeerConnected(peer_id, observed_role) => {
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?observed_role,
"Peer connected",
);
}
PeerViewChange(peer_id, view) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?view,
"Peer view change",
);
handle_peer_view_change(ctx, state, peer_id, view).await;
}
PeerDisconnected(peer_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Peer disconnected",
);
state.peer_views.remove(&peer_id);
state.declared_at.remove(&peer_id);
}
OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Own view change",
);
handle_our_view_change(state, view).await?;
}
PeerMessage(remote, msg) => {
@@ -1078,7 +1129,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
msg: CollatorProtocolMessage,
) {
tracing::trace!(msg = ?msg, "sending message");
tracing::trace!(?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
@@ -1093,7 +1144,7 @@ mod tests {
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
tracing::trace!(msg = ?msg, "received message");
tracing::trace!(?msg, "received message");
msg
}
@@ -230,6 +230,13 @@ async fn notify_all_we_are_awaiting(
let payload = awaiting_message(relay_parent, vec![pov_hash]);
tracing::trace!(
target: LOG_TARGET,
peers = ?peers_to_send,
?relay_parent,
?pov_hash,
"Sending awaiting message",
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
@@ -255,6 +262,13 @@ async fn notify_one_we_are_awaiting_many(
return;
}
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?awaiting_hashes,
"Sending awaiting message",
);
let payload = awaiting_message(relay_parent, awaiting_hashes);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
@@ -286,6 +300,13 @@ async fn distribute_to_awaiting(
}))
.collect();
tracing::trace!(
target: LOG_TARGET,
peers = ?peers_to_send,
?relay_parent,
?pov_hash,
"Sending PoV message",
);
if peers_to_send.is_empty() { return; }
let payload = send_pov_message(relay_parent, pov_hash, pov);
@@ -313,7 +334,12 @@ async fn connect_to_relevant_validators(
// so here we take this shortcut to avoid calling `connect_to_validators`
// more than once.
if !connection_requests.contains_request(&relay_parent, para_id) {
tracing::debug!(target: LOG_TARGET, validators=?relevant_validators, "connecting to validators");
tracing::debug!(
target: LOG_TARGET,
validators=?relevant_validators,
?relay_parent,
"connecting to validators"
);
match validator_discovery::connect_to_validators(
ctx,
relay_parent,
@@ -441,6 +467,7 @@ async fn handle_fetch(
if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators {
tracing::warn!(
target = LOG_TARGET,
relay_parent_state.fetching.len = relay_parent_state.fetching.len(),
"other subsystems have requested PoV distribution to fetch more PoVs than reasonably expected",
);
@@ -528,13 +555,32 @@ async fn handle_awaiting(
pov_hashes: Vec<Hash>,
) {
if !state.our_view.contains(&relay_parent) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hashes,
"Received awaiting message for unknown block",
);
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await;
return;
}
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hashes,
"Received awaiting message",
);
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
tracing::warn!("PoV Distribution relay parent state out-of-sync with our view");
tracing::warn!(
target: LOG_TARGET,
?peer,
?relay_parent,
"PoV Distribution relay parent state out-of-sync with our view"
);
return;
}
Some(s) => s,
@@ -556,6 +602,13 @@ async fn handle_awaiting(
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hash,
"Sending awaited PoV message",
);
let payload = send_pov_message(relay_parent, pov_hash, pov);
ctx.send_message(AllMessages::NetworkBridge(
@@ -566,6 +619,12 @@ async fn handle_awaiting(
}
}
} else {
tracing::debug!(
target: LOG_TARGET,
?peer,
?relay_parent,
"Too many PoV requests",
);
report_peer(ctx, peer, COST_APPARENT_FLOOD).await;
}
}
@@ -584,6 +643,13 @@ async fn handle_incoming_pov(
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hash,
"Unexpected PoV",
);
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
@@ -606,6 +672,13 @@ async fn handle_incoming_pov(
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hash,
"Unexpected PoV",
);
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
}
@@ -614,6 +687,14 @@ async fn handle_incoming_pov(
let hash = pov.hash();
if hash != pov_hash {
tracing::debug!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hash,
?hash,
"Mismatched PoV",
);
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
}
@@ -636,6 +717,13 @@ async fn handle_incoming_pov(
pov
};
tracing::debug!(
target: LOG_TARGET,
?peer,
?relay_parent,
?pov_hash,
"Received PoV",
);
// make sure we don't consider this peer as awaiting that PoV anymore.
if let Some(peer_state) = state.peer_state.get_mut(&peer) {
peer_state.awaited.remove(&pov_hash);
@@ -669,13 +757,30 @@ async fn handle_network_update(
let _timer = state.metrics.time_handle_network_update();
match update {
NetworkBridgeEvent::PeerConnected(peer, _observed_role) => {
NetworkBridgeEvent::PeerConnected(peer, role) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?role,
"Peer connected",
);
handle_validator_connected(state, peer);
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
"Peer disconnected",
);
state.peer_state.remove(&peer);
}
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?view,
"Peer view change",
);
if let Some(peer_state) = state.peer_state.get_mut(&peer_id) {
// prune anything not in the new view.
peer_state.awaited.retain(|relay_parent, _| view.contains(&relay_parent));
@@ -719,6 +824,10 @@ async fn handle_network_update(
}
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
"Own view change",
);
state.our_view = view;
}
}
@@ -109,8 +109,12 @@ impl VcPerPeerTracker {
/// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: CandidateHash) {
if !note_hash(&mut self.local_observed, h) {
tracing::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
tracing::warn!(
target: LOG_TARGET,
"Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring",
VC_THRESHOLD,
);
}
}
@@ -440,6 +444,12 @@ impl ActiveHeadData {
CompactStatement::Seconded(h) => {
let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
if *seconded_so_far >= VC_THRESHOLD {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?stored.statement,
"Extra statement is ignored"
);
return NotedStatement::NotUseful;
}
@@ -447,23 +457,55 @@ impl ActiveHeadData {
if self.statements.insert(stored) {
*seconded_so_far += 1;
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.last().expect("Just inserted").statement,
"Noted new statement"
);
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.get(&comparator)
.expect("Existence was just checked; qed").statement,
"Known statement"
);
NotedStatement::UsefulButKnown
}
}
CompactStatement::Valid(h) => {
if !self.candidates.contains(&h) {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?stored.statement,
"Statement for unknown candidate"
);
return NotedStatement::NotUseful;
}
if self.statements.insert(stored) {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.last().expect("Just inserted").statement,
"Noted new statement"
);
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.get(&comparator)
.expect("Existence was just checked; qed").statement,
"Known statement"
);
NotedStatement::UsefulButKnown
}
}
@@ -525,10 +567,13 @@ async fn circulate_statement_and_dependents(
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
NotedStatement::Fresh(stored) =>
{
Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
))
},
_ => None,
}
};
@@ -586,6 +631,13 @@ async fn circulate_statement(
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
let payload = statement_message(relay_parent, stored.statement.clone());
tracing::trace!(
target: LOG_TARGET,
?peers_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
payload,
@@ -617,6 +669,14 @@ async fn send_statements_about(
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
@@ -643,6 +703,13 @@ async fn send_statements(
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
@@ -682,6 +749,12 @@ async fn handle_incoming_message<'a>(
};
if !our_view.contains(&relay_parent) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?statement,
"Unexpected statement"
);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
return None;
}
@@ -692,6 +765,7 @@ async fn handle_incoming_message<'a>(
// This should never be out-of-sync with our view if the view updates
// correspond to actual `StartWork` messages. So we just log and ignore.
tracing::warn!(
target: LOG_TARGET,
requested_relay_parent = %relay_parent,
"our view out-of-sync with active heads; head not found",
);
@@ -706,6 +780,12 @@ async fn handle_incoming_message<'a>(
// check the signature on the statement.
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?statement,
"Invalid statement signature"
);
report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
return None;
}
@@ -718,10 +798,23 @@ async fn handle_incoming_message<'a>(
let max_message_count = active_head.validators.len() * 2;
match peer_data.receive(&relay_parent, &fingerprint, max_message_count) {
Err(rep) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?statement,
?rep,
"Error inserting received statement"
);
report_peer(ctx, peer, rep).await;
return None;
}
Ok(true) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?statement,
"Statement accepted"
);
// Send the peer all statements concerning the candidate that we have,
// since it appears to have just learned about the candidate.
send_statements_about(
@@ -808,13 +901,24 @@ async fn handle_network_update(
metrics: &Metrics,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
NetworkBridgeEvent::PeerConnected(peer, role) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?role,
"Peer connected",
);
peers.insert(peer, PeerData {
view: Default::default(),
view_knowledge: Default::default(),
});
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
"Peer disconnected",
);
peers.remove(&peer);
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
@@ -851,6 +955,12 @@ async fn handle_network_update(
}
}
NetworkBridgeEvent::PeerViewChange(peer, view) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?view,
"Peer view change",
);
match peers.get_mut(&peer) {
Some(data) => {
update_peer_view_and_send_unlocked(
@@ -866,6 +976,10 @@ async fn handle_network_update(
}
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
"Own view change",
);
let old_view = std::mem::replace(our_view, view);
active_heads.retain(|head, _| our_view.contains(head));
+5
View File
@@ -515,6 +515,11 @@ impl CompressedPoV {
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}
/// Get compressed data size.
pub fn len(&self) -> usize {
self.0.len()
}
}
#[cfg(feature = "std")]