From e761c99852e514dc01d55276d2aced49a4c1e2f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 26 Nov 2020 16:13:44 +0100 Subject: [PATCH] Make sure we don't send messages multiple times in the availability distribution subsystem (#2015) --- .../availability-distribution/src/lib.rs | 76 ++++------- .../availability-distribution/src/tests.rs | 125 +++++++++++++++++- .../network/statement-distribution/src/lib.rs | 4 +- 3 files changed, 145 insertions(+), 60 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 53b98826d0..493d5bfbfc 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -52,6 +52,9 @@ use std::collections::{HashMap, HashSet}; use std::iter; use thiserror::Error; +#[cfg(test)] +mod tests; + const LOG_TARGET: &'static str = "availability_distribution"; #[derive(Debug, Error)] @@ -166,6 +169,14 @@ struct PerCandidate { validator_index: Option, } +impl PerCandidate { + /// Returns `true` iff the given `message` is required by the given `peer`. + fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool { + self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) + && self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true) + } +} + #[derive(Debug, Clone, Default)] struct PerRelayParent { /// Set of `K` ancestors for this relay parent. @@ -364,24 +375,22 @@ where { let _timer = metrics.time_handle_our_view_change(); - let old_view = std::mem::replace(&mut (state.view), view); + let old_view = std::mem::replace(&mut state.view, view); // needed due to borrow rules let view = state.view.clone(); - let added = view.difference(&old_view).collect::>(); // add all the relay parents and fill the cache - for added in added.iter() { - let added = **added; - let validators = query_validators(ctx, added).await?; + for added in view.difference(&old_view) { + let validators = query_validators(ctx, *added).await?; let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await; state - .add_relay_parent(ctx, added, validators, validator_index) + .add_relay_parent(ctx, *added, validators, validator_index) .await?; } // handle all candidates - for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) { + for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) { let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); // assure the node has the validator role @@ -418,12 +427,9 @@ where // obtain the chunks from the cache, if not fallback // and query the availability store let message_id = (candidate_hash, chunk_index); - let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) - { + let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { message.erasure_chunk.clone() - } else if let Some(erasure_chunk) = - query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? - { + } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { erasure_chunk } else { continue; @@ -433,23 +439,15 @@ where let peers = peers .iter() - .filter(|peer| { - // only pick those which were not sent before - !per_candidate - .sent_messages - .get(*peer) - .filter(|set| set.contains(&message_id)) - .is_some() - }) - .map(|peer| peer.clone()) + .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) + .cloned() .collect::>(); let message = AvailabilityGossipMessage { candidate_hash, erasure_chunk, }; - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message) - .await; + send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; } } @@ -472,8 +470,7 @@ async fn send_tracked_gossip_message_to_peers( where Context: SubsystemContext, { - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)) - .await + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await } #[inline(always)] @@ -487,8 +484,7 @@ async fn send_tracked_gossip_messages_to_peer( where Context: SubsystemContext, { - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter) - .await + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await } #[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))] @@ -576,21 +572,12 @@ where per_candidate .message_vault .get(&erasure_chunk_index) - .filter(|_| { - // check if that erasure chunk was already sent before - if let Some(sent_set) = per_candidate.sent_messages.get(&origin) { - if sent_set.contains(&message_id) { - return false; - } - } - true - }) + .filter(|_| per_candidate.message_required_by_peer(&origin, &message_id)) }) .cloned() .collect::>(); - send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages) - .await; + send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await; } } @@ -727,15 +714,7 @@ where let peers = peers .into_iter() - .filter(|peer| { - let peer: PeerId = peer.clone(); - // avoid sending duplicate messages - per_candidate - .sent_messages - .entry(peer) - .or_default() - .contains(&message_id) - }) + .filter(|peer| per_candidate.message_required_by_peer(peer, &message_id)) .collect::>(); // gossip that message to interested peers @@ -1210,6 +1189,3 @@ impl metrics::Metrics for Metrics { Ok(Metrics(Some(metrics))) } } - -#[cfg(test)] -mod tests; diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index c4212521a6..a7309043b0 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -314,7 +314,7 @@ fn helper_integrity() { let candidate = TestCandidateBuilder { para_id: test_state.chain_ids[0], relay_parent: test_state.relay_parent, - pov_hash: pov_hash, + pov_hash, erasure_root: make_erasure_root(&test_state, pov_block.clone()), ..Default::default() } @@ -395,7 +395,7 @@ fn reputation_verification() { } .build(), TestCandidateBuilder { - para_id: test_state.chain_ids[0], + para_id: test_state.chain_ids[1], relay_parent: test_state.relay_parent, pov_hash: pov_hash_b, erasure_root: make_erasure_root(&test_state, pov_block_b.clone()), @@ -643,10 +643,8 @@ fn reputation_verification() { ) ) => { let index = candidates2.iter().enumerate().find(|x| { x.1.hash() == candidate_hash }).map(|x| x.0).unwrap(); - expected = dbg!(candidates2.swap_remove(index).hash()); - tx.send( - i == 0 - ).unwrap(); + expected = candidates2.swap_remove(index).hash(); + tx.send(i == 0).unwrap(); } ); @@ -762,6 +760,23 @@ fn reputation_verification() { ) .await; + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::AvailabilityDistribution( + protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), + ), + ) + ) => { + assert_eq!(1, peers.len()); + assert_eq!(peers[0], peer_a); + assert_eq!(candidates[0].hash(), hash); + assert_eq!(valid.erasure_chunk, chunk); + } + ); + assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridge( @@ -861,7 +876,7 @@ fn reputation_verification() { { // send another message - let valid2: AvailabilityGossipMessage = make_valid_availability_gossip( + let valid2 = make_valid_availability_gossip( &test_state, candidates[2].hash(), 1, @@ -890,6 +905,102 @@ fn reputation_verification() { } ); } + + { + // send another message + let valid = make_valid_availability_gossip( + &test_state, + candidates[1].hash(), + 2, + pov_block_b.clone(), + ); + + // Make peer a and b listen on `current` + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), + ), + ) + .await; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current]), + ), + ) + .await; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + chunk_protocol_message(valid.clone()), + ), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + peers, + protocol_v1::ValidationProtocol::AvailabilityDistribution( + protocol_v1::AvailabilityDistributionMessage::Chunk(hash, chunk), + ), + ) + ) => { + assert_eq!(1, peers.len()); + assert_eq!(peers[0], peer_b); + assert_eq!(candidates[1].hash(), hash); + assert_eq!(valid.erasure_chunk, chunk); + } + ); + + // Let B send the same message + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + chunk_protocol_message(valid.clone()), + ), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, BENEFIT_VALID_MESSAGE); + } + ); + + // There shouldn't be any other message. + assert!(virtual_overseer.recv().timeout(TIMEOUT).await.is_none()); + } }); } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 04517477a3..5e6a58f948 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -30,9 +30,7 @@ use polkadot_subsystem::{ RuntimeApiMessage, RuntimeApiRequest, }, }; -use polkadot_node_subsystem_util::{ - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; use node_primitives::SignedFullStatement; use polkadot_primitives::v1::{ Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,