Make sure we don't send messages multiple times in the availability distribution subsystem (#2015)

This commit is contained in:
Bastian Köcher
2020-11-26 16:13:44 +01:00
committed by GitHub
parent 119eb34fa8
commit e761c99852
3 changed files with 145 additions and 60 deletions
@@ -52,6 +52,9 @@ use std::collections::{HashMap, HashSet};
use std::iter;
use thiserror::Error;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
@@ -166,6 +169,14 @@ struct PerCandidate {
validator_index: Option<ValidatorIndex>,
}
impl PerCandidate {
/// Returns `true` iff the given `message` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
}
}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
/// Set of `K` ancestors for this relay parent.
@@ -364,24 +375,22 @@ where
{
let _timer = metrics.time_handle_our_view_change();
let old_view = std::mem::replace(&mut (state.view), view);
let old_view = std::mem::replace(&mut state.view, view);
// needed due to borrow rules
let view = state.view.clone();
let added = view.difference(&old_view).collect::<Vec<&'_ Hash>>();
// add all the relay parents and fill the cache
for added in added.iter() {
let added = **added;
let validators = query_validators(ctx, added).await?;
for added in view.difference(&old_view) {
let validators = query_validators(ctx, *added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
state
.add_relay_parent(ctx, added, validators, validator_index)
.add_relay_parent(ctx, *added, validators, validator_index)
.await?;
}
// handle all candidates
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) {
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// assure the node has the validator role
@@ -418,12 +427,9 @@ where
// obtain the chunks from the cache, if not fallback
// and query the availability store
let message_id = (candidate_hash, chunk_index);
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index)
{
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
message.erasure_chunk.clone()
} else if let Some(erasure_chunk) =
query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await?
{
} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
erasure_chunk
} else {
continue;
@@ -433,23 +439,15 @@ where
let peers = peers
.iter()
.filter(|peer| {
// only pick those which were not sent before
!per_candidate
.sent_messages
.get(*peer)
.filter(|set| set.contains(&message_id))
.is_some()
})
.map(|peer| peer.clone())
.filter(|peer| per_candidate.message_required_by_peer(peer, &message_id))
.cloned()
.collect::<Vec<_>>();
let message = AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message)
.await;
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await;
}
}
@@ -472,8 +470,7 @@ async fn send_tracked_gossip_message_to_peers<Context>(
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message))
.await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
}
#[inline(always)]
@@ -487,8 +484,7 @@ async fn send_tracked_gossip_messages_to_peer<Context>(
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter)
.await
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
@@ -576,21 +572,12 @@ where
per_candidate
.message_vault
.get(&erasure_chunk_index)
.filter(|_| {
// check if that erasure chunk was already sent before
if let Some(sent_set) = per_candidate.sent_messages.get(&origin) {
if sent_set.contains(&message_id) {
return false;
}
}
true
})
.filter(|_| per_candidate.message_required_by_peer(&origin, &message_id))
})
.cloned()
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages)
.await;
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await;
}
}
@@ -727,15 +714,7 @@ where
let peers = peers
.into_iter()
.filter(|peer| {
let peer: PeerId = peer.clone();
// avoid sending duplicate messages
per_candidate
.sent_messages
.entry(peer)
.or_default()
.contains(&message_id)
})
.filter(|peer| per_candidate.message_required_by_peer(peer, &message_id))
.collect::<Vec<_>>();
// gossip that message to interested peers
@@ -1210,6 +1189,3 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests;