diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index c2ea6aa3ce..d145a10a08 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6451,6 +6451,8 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", + "rand 0.8.5", + "rand_chacha 0.3.1", "sp-application-crypto", "sp-core", "sp-keyring", @@ -7106,6 +7108,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-primitives", "rand 0.8.5", + "rand_chacha 0.3.1", "sc-authority-discovery", "sc-network", "strum 0.24.0", diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 03705bb173..fe73271c5c 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -31,8 +31,8 @@ use polkadot_node_primitives::approval::{ }; use polkadot_node_subsystem::{ messages::{ - network_bridge_event, ApprovalCheckResult, ApprovalDistributionMessage, - ApprovalVotingMessage, AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeMessage, + ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, + AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeMessage, }, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError, @@ -145,29 +145,6 @@ impl Default for AggressionConfig { } } -struct ApprovalGridTopology(SessionGridTopology); - -impl From for ApprovalGridTopology { - fn from(topology: network_bridge_event::NewGossipTopology) -> Self { - let peers_x = - topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect(); - let peers_y = - topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect(); - - let validator_indices_x = - topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect(); - let validator_indices_y = - topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect(); - - ApprovalGridTopology(SessionGridTopology { - peers_x, - peers_y, - validator_indices_x, - validator_indices_y, - }) - } -} - #[derive(PartialEq)] enum Resend { Yes, @@ -368,12 +345,8 @@ impl State { }, NetworkBridgeEvent::NewGossipTopology(topology) => { let session = topology.session; - self.handle_new_session_topology( - ctx, - session, - ApprovalGridTopology::from(topology), - ) - .await; + self.handle_new_session_topology(ctx, session, SessionGridTopology::from(topology)) + .await; }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => { self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await; @@ -528,9 +501,9 @@ impl State { ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), session: SessionIndex, - topology: ApprovalGridTopology, + topology: SessionGridTopology, ) { - self.topologies.insert_topology(session, topology.0); + self.topologies.insert_topology(session, topology); let topology = self.topologies.get_topology(session).expect("just inserted above; qed"); adjust_required_routing_and_propagate( diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index b1953eca81..2693cbd8bc 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -21,7 +21,7 @@ use polkadot_node_network_protocol::{our_view, view, ObservedRole}; use polkadot_node_primitives::approval::{ AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT, }; -use polkadot_node_subsystem::messages::{AllMessages, ApprovalCheckError}; +use polkadot_node_subsystem::messages::{network_bridge_event, AllMessages, ApprovalCheckError}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; use polkadot_primitives::v2::{AuthorityDiscoveryId, BlakeTwo256, HashT}; diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml index 6418b27718..d9565b1a65 100644 --- a/polkadot/node/network/bitfield-distribution/Cargo.toml +++ b/polkadot/node/network/bitfield-distribution/Cargo.toml @@ -11,6 +11,7 @@ polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-network-protocol = { path = "../../network/protocol" } +rand = "0.8" [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } @@ -23,3 +24,4 @@ maplit = "1.0.2" log = "0.4.16" env_logger = "0.9.0" assert_matches = "1.4.0" +rand_chacha = "0.3.1" diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 4dea02d3d2..74245483e3 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -25,15 +25,19 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_node_network_protocol::{ - self as net_protocol, v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, - Versioned, View, + self as net_protocol, + grid_topology::{RandomRouting, RequiredRouting, SessionGridTopology}, + v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View, +}; +use polkadot_node_subsystem_util::{self as util}; +use polkadot_primitives::v2::{ + Hash, SessionIndex, SignedAvailabilityBitfield, SigningContext, ValidatorId, }; -use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS}; -use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use polkadot_subsystem::{ jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult, }; +use rand::{CryptoRng, Rng, SeedableRng}; use std::collections::{HashMap, HashSet}; use self::metrics::Metrics; @@ -76,6 +80,44 @@ impl BitfieldGossipMessage { } } +/// A simple storage for a topology and the corresponding session index +#[derive(Default, Debug)] +struct GridTopologySessionBound(SessionGridTopology, SessionIndex); + +/// A storage for the current and maybe previous topology +#[derive(Default, Debug)] +struct BitfieldGridTopologyStorage { + current_topology: GridTopologySessionBound, + prev_topology: Option, +} + +impl BitfieldGridTopologyStorage { + /// Return a grid topology based on the session index: + /// If we need a previous session and it is registered in the storage, then return that session. + /// Otherwise, return a current session to have some grid topology in any case + fn get_topology(&self, idx: SessionIndex) -> &SessionGridTopology { + if let Some(prev_topology) = &self.prev_topology { + if idx == prev_topology.1 { + return &prev_topology.0 + } + } + // Return the current topology by default + &self.current_topology.0 + } + + /// Update the current topology preserving the previous one + fn update_topology(&mut self, idx: SessionIndex, topology: SessionGridTopology) { + let old_current = + std::mem::replace(&mut self.current_topology, GridTopologySessionBound(topology, idx)); + self.prev_topology.replace(old_current); + } + + /// Returns a current grid topology + fn get_current_topology(&self) -> &SessionGridTopology { + &self.current_topology.0 + } +} + /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. #[derive(Default, Debug)] @@ -84,9 +126,8 @@ struct ProtocolState { /// to determine what is relevant to them. peer_views: HashMap, - /// Track all our neighbors in the current gossip topology. - /// We're not necessarily connected to all of them. - gossip_peers: HashSet, + /// The current and previous gossip topologies + topologies: BitfieldGridTopologyStorage, /// Our current view. view: OurView, @@ -170,13 +211,27 @@ impl BitfieldDistribution { } /// Start processing work as passed on from the Overseer. - async fn run(self, mut ctx: Context) + async fn run(self, ctx: Context) where Context: SubsystemContext, Context: overseer::SubsystemContext, { - // work: process incoming messages from the overseer and process accordingly. let mut state = ProtocolState::default(); + let mut rng = rand::rngs::StdRng::from_entropy(); + self.run_inner(ctx, &mut state, &mut rng).await + } + + async fn run_inner( + self, + mut ctx: Context, + state: &mut ProtocolState, + rng: &mut (impl CryptoRng + Rng), + ) where + Context: SubsystemContext, + Context: overseer::SubsystemContext, + { + // work: process incoming messages from the overseer and process accordingly. + loop { let message = match ctx.recv().await { Ok(message) => message, @@ -200,10 +255,11 @@ impl BitfieldDistribution { gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield"); handle_bitfield_distribution( &mut ctx, - &mut state, + state, &self.metrics, relay_parent, signed_availability, + rng, ) .await; }, @@ -212,7 +268,7 @@ impl BitfieldDistribution { } => { gum::trace!(target: LOG_TARGET, "Processing NetworkMessage"); // a network message was received - handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await; + handle_network_msg(&mut ctx, state, &self.metrics, event, rng).await; }, FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, @@ -268,7 +324,6 @@ where ctx.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await } - /// Distribute a given valid and signature checked bitfield message. /// /// For this variant the source is this node. @@ -278,6 +333,7 @@ async fn handle_bitfield_distribution( metrics: &Metrics, relay_parent: Hash, signed_availability: SignedAvailabilityBitfield, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, { @@ -296,25 +352,36 @@ async fn handle_bitfield_distribution( return }; + + let session_idx = job_data.signing_context.session_index; let validator_set = &job_data.validator_set; if validator_set.is_empty() { gum::debug!(target: LOG_TARGET, ?relay_parent, "validator set is empty"); return } - let validator_index = signed_availability.validator_index().0 as usize; - let validator = if let Some(validator) = validator_set.get(validator_index) { + let validator_index = signed_availability.validator_index(); + let validator = if let Some(validator) = validator_set.get(*&validator_index.0 as usize) { validator.clone() } else { - gum::debug!(target: LOG_TARGET, validator_index, "Could not find a validator for index"); + gum::debug!(target: LOG_TARGET, validator_index = ?validator_index.0, "Could not find a validator for index"); return }; let msg = BitfieldGossipMessage { relay_parent, signed_availability }; - - let gossip_peers = &state.gossip_peers; - let peer_views = &mut state.peer_views; - relay_message(ctx, job_data, gossip_peers, peer_views, validator, msg).await; + let topology = state.topologies.get_topology(session_idx); + let required_routing = topology.required_routing_for(validator_index, true); + relay_message( + ctx, + job_data, + topology, + &mut state.peer_views, + validator, + msg, + required_routing, + rng, + ) + .await; metrics.on_own_bitfield_sent(); } @@ -325,10 +392,12 @@ async fn handle_bitfield_distribution( async fn relay_message( ctx: &mut Context, job_data: &mut PerRelayParentData, - gossip_peers: &HashSet, + topology: &SessionGridTopology, peer_views: &mut HashMap, validator: ValidatorId, message: BitfieldGossipMessage, + required_routing: RequiredRouting, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, { @@ -344,10 +413,12 @@ async fn relay_message( .await; drop(_span); + let total_peers = peer_views.len(); + let mut random_routing: RandomRouting = Default::default(); let _span = span.child("interested-peers"); // pass on the bitfield distribution to all interested peers - let mut interested_peers = peer_views + let interested_peers = peer_views .iter() .filter_map(|(peer, view)| { // check interest in the peer in this message's relay parent @@ -355,7 +426,21 @@ async fn relay_message( let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator); if message_needed { - Some(peer.clone()) + let in_topology = topology.route_to_peer(required_routing, &peer); + let need_routing = in_topology || { + let route_random = random_routing.sample(total_peers, rng); + if route_random { + random_routing.inc_sent(); + } + + route_random + }; + + if need_routing { + Some(peer.clone()) + } else { + None + } } else { None } @@ -364,11 +449,7 @@ async fn relay_message( } }) .collect::>(); - util::choose_random_subset( - |e| gossip_peers.contains(e), - &mut interested_peers, - MIN_GOSSIP_PEERS, - ); + interested_peers.iter().for_each(|peer| { // track the message as sent for this peer job_data @@ -403,6 +484,7 @@ async fn process_incoming_peer_message( metrics: &Metrics, origin: PeerId, message: protocol_v1::BitfieldDistributionMessage, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, { @@ -492,11 +574,23 @@ async fn process_incoming_peer_message( let message = BitfieldGossipMessage { relay_parent, signed_availability }; + let topology = state.topologies.get_topology(job_data.signing_context.session_index); + let required_routing = topology.required_routing_for(validator_index, false); + metrics.on_bitfield_received(); one_per_validator.insert(validator.clone(), message.clone()); - relay_message(ctx, job_data, &state.gossip_peers, &mut state.peer_views, validator, message) - .await; + relay_message( + ctx, + job_data, + topology, + &mut state.peer_views, + validator, + message, + required_routing, + rng, + ) + .await; modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE_FIRST).await } @@ -508,6 +602,7 @@ async fn handle_network_msg( state: &mut ProtocolState, metrics: &Metrics, bridge_message: NetworkBridgeEvent, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, { @@ -524,35 +619,37 @@ async fn handle_network_msg( // get rid of superfluous data state.peer_views.remove(&peer); }, - NetworkBridgeEvent::NewGossipTopology(topology) => { - // Combine all peers in the x & y direction as we don't make any distinction. - let peers: HashSet = topology - .our_neighbors_x - .values() - .chain(topology.our_neighbors_y.values()) - .flat_map(|peer_info| peer_info.peer_ids.iter().cloned()) - .collect(); - let newly_added: Vec = peers.difference(&state.gossip_peers).cloned().collect(); - state.gossip_peers = peers; + NetworkBridgeEvent::NewGossipTopology(gossip_topology) => { + let session_index = gossip_topology.session; + let new_topology = SessionGridTopology::from(gossip_topology); + let newly_added = new_topology.peers_diff(&new_topology); + state.topologies.update_topology(session_index, new_topology); + gum::debug!( + target: LOG_TARGET, + ?session_index, + "New gossip topology received {} unseen peers", + newly_added.len() + ); + for new_peer in newly_added { // in case we already knew that peer in the past // it might have had an existing view, we use to initialize // and minimize the delta on `PeerViewChange` to be sent if let Some(old_view) = state.peer_views.remove(&new_peer) { - handle_peer_view_change(ctx, state, new_peer, old_view).await; + handle_peer_view_change(ctx, state, new_peer, old_view, rng).await; } } }, NetworkBridgeEvent::PeerViewChange(peerid, new_view) => { gum::trace!(target: LOG_TARGET, ?peerid, ?new_view, "Peer view change"); - handle_peer_view_change(ctx, state, peerid, new_view).await; + handle_peer_view_change(ctx, state, peerid, new_view, rng).await; }, NetworkBridgeEvent::OurViewChange(new_view) => { gum::trace!(target: LOG_TARGET, ?new_view, "Our view change"); handle_our_view_change(state, new_view); }, NetworkBridgeEvent::PeerMessage(remote, Versioned::V1(message)) => - process_incoming_peer_message(ctx, state, metrics, remote, message).await, + process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await, } } @@ -585,6 +682,7 @@ async fn handle_peer_view_change( state: &mut ProtocolState, origin: PeerId, view: View, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, { @@ -596,11 +694,13 @@ async fn handle_peer_view_change( .cloned() .collect::>(); - let is_gossip_peer = state.gossip_peers.contains(&origin); + let topology = state.topologies.get_current_topology(); + let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin); let lucky = is_gossip_peer || - util::gen_ratio( - util::MIN_GOSSIP_PEERS.saturating_sub(state.gossip_peers.len()), + util::gen_ratio_rng( + util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()), util::MIN_GOSSIP_PEERS, + rng, ); if !lucky { diff --git a/polkadot/node/network/bitfield-distribution/src/tests.rs b/polkadot/node/network/bitfield-distribution/src/tests.rs index ac6c211846..6a8363c887 100644 --- a/polkadot/node/network/bitfield-distribution/src/tests.rs +++ b/polkadot/node/network/bitfield-distribution/src/tests.rs @@ -27,6 +27,7 @@ use polkadot_subsystem::{ jaeger, jaeger::{PerLeafSpan, Span}, }; +use rand_chacha::ChaCha12Rng; use sp_application_crypto::AppKey; use sp_core::Pair as PairT; use sp_keyring::Sr25519Keyring; @@ -42,6 +43,11 @@ macro_rules! launch { }; } +/// Pre-seeded `crypto` random numbers generator for testing purposes +fn dummy_rng() -> ChaCha12Rng { + rand_chacha::ChaCha12Rng::seed_from_u64(12345) +} + /// A very limited state, only interested in the relay parent of the /// given message, which must be signed by `validator` and a set of peers /// which are also only interested in that relay parent. @@ -52,6 +58,10 @@ fn prewarmed_state( peers: Vec, ) -> ProtocolState { let relay_parent = known_message.relay_parent.clone(); + let mut topology: SessionGridTopology = Default::default(); + topology.peers_x = peers.iter().cloned().collect(); + let mut topologies: BitfieldGridTopologyStorage = Default::default(); + topologies.update_topology(0_u32, topology); ProtocolState { per_relay_parent: hashmap! { relay_parent.clone() => @@ -67,7 +77,7 @@ fn prewarmed_state( }, }, peer_views: peers.iter().cloned().map(|peer| (peer, view!(relay_parent))).collect(), - gossip_peers: peers.into_iter().collect(), + topologies, view: our_view!(relay_parent), } } @@ -191,6 +201,7 @@ fn receive_invalid_signature() { .unwrap() .validator_set .push(validator_1.into()); + let mut rng = dummy_rng(); executor::block_on(async move { launch!(handle_network_msg( @@ -198,6 +209,7 @@ fn receive_invalid_signature() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), invalid_msg.into_network_message()), + &mut rng, )); // reputation doesn't change due to one_job_per_validator check @@ -208,6 +220,7 @@ fn receive_invalid_signature() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), invalid_msg_2.into_network_message()), + &mut rng, )); // reputation change due to invalid signature assert_matches!( @@ -259,6 +272,7 @@ fn receive_invalid_validator_index() { let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); executor::block_on(async move { launch!(handle_network_msg( @@ -266,6 +280,7 @@ fn receive_invalid_validator_index() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), + &mut rng, )); // reputation change due to invalid validator index @@ -319,6 +334,7 @@ fn receive_duplicate_messages() { let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); executor::block_on(async move { // send a first message @@ -327,6 +343,7 @@ fn receive_duplicate_messages() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.clone().into_network_message(),), + &mut rng, )); // none of our peers has any interest in any messages @@ -359,6 +376,7 @@ fn receive_duplicate_messages() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_a.clone(), msg.clone().into_network_message(),), + &mut rng, )); assert_matches!( @@ -377,6 +395,7 @@ fn receive_duplicate_messages() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.clone().into_network_message(),), + &mut rng, )); assert_matches!( @@ -431,9 +450,13 @@ fn do_not_relay_message_twice() { let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); executor::block_on(async move { - let gossip_peers = HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter()); + let gossip_peers = SessionGridTopology { + peers_x: HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter()), + ..Default::default() + }; relay_message( &mut ctx, state.per_relay_parent.get_mut(&hash).unwrap(), @@ -441,6 +464,8 @@ fn do_not_relay_message_twice() { &mut state.peer_views, validator.clone(), msg.clone(), + RequiredRouting::GridXY, + &mut rng, ) .await; @@ -475,6 +500,8 @@ fn do_not_relay_message_twice() { &mut state.peer_views, validator.clone(), msg.clone(), + RequiredRouting::GridXY, + &mut rng, ) .await; @@ -532,6 +559,7 @@ fn changing_view() { let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); executor::block_on(async move { launch!(handle_network_msg( @@ -539,6 +567,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), + &mut rng, )); // make peer b interested @@ -547,6 +576,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a, hash_b]), + &mut rng, )); assert!(state.peer_views.contains_key(&peer_b)); @@ -557,6 +587,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.clone().into_network_message(),), + &mut rng, )); // gossip to the overseer @@ -587,6 +618,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![]), + &mut rng, )); assert!(state.peer_views.contains_key(&peer_b)); @@ -599,6 +631,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.clone().into_network_message(),), + &mut rng, )); // reputation change for peer B @@ -617,6 +650,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), + &mut rng, )); // we are not interested in any peers at all anymore @@ -629,6 +663,7 @@ fn changing_view() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_a.clone(), msg.clone().into_network_message(),), + &mut rng, )); // reputation change for peer B @@ -683,6 +718,7 @@ fn do_not_send_message_back_to_origin() { let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); executor::block_on(async move { // send a first message @@ -691,6 +727,7 @@ fn do_not_send_message_back_to_origin() { &mut state, &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.clone().into_network_message(),), + &mut rng, )); assert_matches!( @@ -727,6 +764,117 @@ fn do_not_send_message_back_to_origin() { }); } +#[test] +fn topology_test() { + let _ = env_logger::builder() + .filter(None, log::LevelFilter::Trace) + .is_test(true) + .try_init(); + + let hash: Hash = [0; 32].into(); + let peers_x = (0..25).map(|_| PeerId::random()).collect::>(); + let peers_y = (0..25).map(|_| PeerId::random()).collect::>(); + + // ensure all unique + assert_eq!( + peers_x.iter().chain(peers_y.iter()).collect::>().len(), + peers_x.len() + peers_y.len() + ); + + // validator 0 key pair + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash); + + // Create a simple grid + let mut topology: SessionGridTopology = Default::default(); + topology.peers_x = peers_x.iter().cloned().collect::>(); + topology.validator_indices_x = peers_x + .iter() + .enumerate() + .map(|(idx, _)| ValidatorIndex(idx as u32)) + .collect::>(); + topology.peers_y = peers_y.iter().cloned().collect::>(); + topology.validator_indices_y = peers_y + .iter() + .enumerate() + .map(|(idx, _)| ValidatorIndex((idx + peers_x.len()) as u32)) + .collect::>(); + state.topologies.update_topology(0_u32, topology); + + // create a signed message by validator 0 + let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]); + let signed_bitfield = executor::block_on(Signed::::sign( + &keystore, + payload, + &signing_context, + ValidatorIndex(0), + &validator, + )) + .ok() + .flatten() + .expect("should be signed"); + + peers_x.iter().chain(peers_y.iter()).for_each(|peer| { + state.peer_views.insert(peer.clone(), view![hash]); + }); + + let msg = BitfieldGossipMessage { + relay_parent: hash.clone(), + signed_availability: signed_bitfield.clone(), + }; + + let pool = sp_core::testing::TaskExecutor::new(); + let (mut ctx, mut handle) = make_subsystem_context::(pool); + let mut rng = dummy_rng(); + + executor::block_on(async move { + // send a first message + launch!(handle_network_msg( + &mut ctx, + &mut state, + &Default::default(), + NetworkBridgeEvent::PeerMessage(peers_x[0].clone(), msg.clone().into_network_message(),), + &mut rng, + )); + + assert_matches!( + handle.recv().await, + AllMessages::Provisioner(ProvisionerMessage::ProvisionableData( + _, + ProvisionableData::Bitfield(hash, signed) + )) => { + assert_eq!(hash, hash); + assert_eq!(signed, signed_bitfield) + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage(peers, send_msg), + ) => { + let topology = state.topologies.get_current_topology(); + // It should send message to all peers in y direction and to 4 random peers in x direction + assert_eq!(peers_y.len() + 4, peers.len()); + assert!(topology.peers_y.iter().all(|peer| peers.contains(&peer))); + assert!(topology.peers_x.iter().filter(|peer| peers.contains(&peer)).count() == 4); + // Must never include originator + assert!(!peers.contains(&peers_x[0])); + assert_eq!(send_msg, msg.clone().into_validation_protocol()); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peers_x[0]); + assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) + } + ); + }); +} + #[test] fn need_message_works() { let validators = vec![Sr25519Keyring::Alice.pair(), Sr25519Keyring::Bob.pair()]; diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml index 2275b36f8b..313e76a910 100644 --- a/polkadot/node/network/protocol/Cargo.toml +++ b/polkadot/node/network/protocol/Cargo.toml @@ -18,4 +18,7 @@ futures = "0.3.21" thiserror = "1.0.31" fatality = "0.0.6" rand = "0.8" -derive_more = "0.99" \ No newline at end of file +derive_more = "0.99" + +[dev-dependencies] +rand_chacha = "0.3.1" \ No newline at end of file diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index f1823b345b..c6b4b8d4da 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -47,6 +47,7 @@ pub const DEFAULT_RANDOM_SAMPLE_RATE: usize = crate::MIN_GOSSIP_PEERS; pub const DEFAULT_RANDOM_CIRCULATION: usize = 4; /// Topology representation +#[derive(Default, Clone, Debug)] pub struct SessionGridTopology { /// Represent peers in the X axis pub peers_x: HashSet, @@ -89,7 +90,23 @@ impl SessionGridTopology { RequiredRouting::None | RequiredRouting::PendingTopology => false, } } + + /// Returns the difference between this and the `other` topology as a vector of peers + pub fn peers_diff(&self, other: &SessionGridTopology) -> Vec { + self.peers_x + .iter() + .chain(self.peers_y.iter()) + .filter(|peer_id| !(other.peers_x.contains(peer_id) || other.peers_y.contains(peer_id))) + .cloned() + .collect::>() + } + + /// A convenience method that returns total number of peers in the topology + pub fn len(&self) -> usize { + self.peers_x.len().saturating_add(self.peers_y.len()) + } } + /// A set of topologies indexed by session #[derive(Default)] pub struct SessionGridTopologies { @@ -193,3 +210,75 @@ impl RequiredRouting { } } } + +#[cfg(test)] +mod tests { + use super::*; + use rand::SeedableRng; + use rand_chacha::ChaCha12Rng; + + fn dummy_rng() -> ChaCha12Rng { + rand_chacha::ChaCha12Rng::seed_from_u64(12345) + } + + #[test] + fn test_random_routing_sample() { + // This test is fragile as it relies on a specific ChaCha12Rng + // sequence that might be implementation defined even for a static seed + let mut rng = dummy_rng(); + let mut random_routing = RandomRouting { target: 4, sent: 0, sample_rate: 8 }; + + assert_eq!(random_routing.sample(16, &mut rng), true); + random_routing.inc_sent(); + assert_eq!(random_routing.sample(16, &mut rng), false); + assert_eq!(random_routing.sample(16, &mut rng), false); + assert_eq!(random_routing.sample(16, &mut rng), true); + random_routing.inc_sent(); + assert_eq!(random_routing.sample(16, &mut rng), true); + random_routing.inc_sent(); + assert_eq!(random_routing.sample(16, &mut rng), false); + assert_eq!(random_routing.sample(16, &mut rng), false); + assert_eq!(random_routing.sample(16, &mut rng), false); + assert_eq!(random_routing.sample(16, &mut rng), true); + random_routing.inc_sent(); + + for _ in 0..16 { + assert_eq!(random_routing.sample(16, &mut rng), false); + } + } + + fn run_random_routing( + random_routing: &mut RandomRouting, + rng: &mut (impl CryptoRng + Rng), + npeers: usize, + iters: usize, + ) -> usize { + let mut ret = 0_usize; + + for _ in 0..iters { + if random_routing.sample(npeers, rng) { + random_routing.inc_sent(); + ret += 1; + } + } + + ret + } + + #[test] + fn test_random_routing_distribution() { + let mut rng = dummy_rng(); + + let mut random_routing = RandomRouting { target: 4, sent: 0, sample_rate: 8 }; + assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 4); + + let mut random_routing = RandomRouting { target: 8, sent: 0, sample_rate: 100 }; + assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 8); + + let mut random_routing = RandomRouting { target: 0, sent: 0, sample_rate: 100 }; + assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 0); + + let mut random_routing = RandomRouting { target: 10, sent: 0, sample_rate: 10 }; + assert_eq!(run_random_routing(&mut random_routing, &mut rng, 10, 100), 10); + } +} diff --git a/polkadot/node/subsystem-types/src/messages/network_bridge_event.rs b/polkadot/node/subsystem-types/src/messages/network_bridge_event.rs index 97c71b7c48..4cf2bed6ef 100644 --- a/polkadot/node/subsystem-types/src/messages/network_bridge_event.rs +++ b/polkadot/node/subsystem-types/src/messages/network_bridge_event.rs @@ -21,7 +21,9 @@ use std::{ pub use sc_network::{PeerId, ReputationChange}; -use polkadot_node_network_protocol::{ObservedRole, OurView, ProtocolVersion, View, WrongVariant}; +use polkadot_node_network_protocol::{ + grid_topology::SessionGridTopology, ObservedRole, OurView, ProtocolVersion, View, WrongVariant, +}; use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; /// Information about a peer in the gossip topology for a session. @@ -119,3 +121,19 @@ impl NetworkBridgeEvent { }) } } + +impl From for SessionGridTopology { + fn from(topology: NewGossipTopology) -> Self { + let peers_x = + topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect(); + let peers_y = + topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect(); + + let validator_indices_x = + topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect(); + let validator_indices_y = + topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect(); + + SessionGridTopology { peers_x, peers_y, validator_indices_x, validator_indices_y } + } +}