diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index f8a92c8836..2f4d624100 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6831,6 +6831,7 @@ dependencies = [ "sc-network", "strum 0.24.0", "thiserror", + "tracing-gum", ] [[package]] diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index a4f19b741a..39a5c75420 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -511,7 +511,7 @@ impl State { |block_entry| block_entry.session == session, |required_routing, local, validator_index| { if *required_routing == RequiredRouting::PendingTopology { - *required_routing = topology.required_routing_for(*validator_index, local); + *required_routing = topology.required_routing_by_index(*validator_index, local); } }, ) @@ -861,7 +861,7 @@ impl State { let local = source == MessageSource::Local; let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| { - t.required_routing_for(validator_index, local) + t.required_routing_by_index(validator_index, local) }); let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index e3ed0e6f89..feba334446 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -26,7 +26,9 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_node_network_protocol::{ self as net_protocol, - grid_topology::{RandomRouting, RequiredRouting, SessionGridTopology}, + grid_topology::{ + RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology, + }, v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_subsystem::{ @@ -34,9 +36,8 @@ use polkadot_node_subsystem::{ SpawnedSubsystem, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{self as util}; -use polkadot_primitives::v2::{ - Hash, SessionIndex, SignedAvailabilityBitfield, SigningContext, ValidatorId, -}; + +use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use rand::{CryptoRng, Rng, SeedableRng}; use std::collections::{HashMap, HashSet}; @@ -80,44 +81,6 @@ impl BitfieldGossipMessage { } } -/// A simple storage for a topology and the corresponding session index -#[derive(Default, Debug)] -struct GridTopologySessionBound(SessionGridTopology, SessionIndex); - -/// A storage for the current and maybe previous topology -#[derive(Default, Debug)] -struct BitfieldGridTopologyStorage { - current_topology: GridTopologySessionBound, - prev_topology: Option, -} - -impl BitfieldGridTopologyStorage { - /// Return a grid topology based on the session index: - /// If we need a previous session and it is registered in the storage, then return that session. - /// Otherwise, return a current session to have some grid topology in any case - fn get_topology(&self, idx: SessionIndex) -> &SessionGridTopology { - if let Some(prev_topology) = &self.prev_topology { - if idx == prev_topology.1 { - return &prev_topology.0 - } - } - // Return the current topology by default - &self.current_topology.0 - } - - /// Update the current topology preserving the previous one - fn update_topology(&mut self, idx: SessionIndex, topology: SessionGridTopology) { - let old_current = - std::mem::replace(&mut self.current_topology, GridTopologySessionBound(topology, idx)); - self.prev_topology.replace(old_current); - } - - /// Returns a current grid topology - fn get_current_topology(&self) -> &SessionGridTopology { - &self.current_topology.0 - } -} - /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. #[derive(Default, Debug)] @@ -127,7 +90,7 @@ struct ProtocolState { peer_views: HashMap, /// The current and previous gossip topologies - topologies: BitfieldGridTopologyStorage, + topologies: SessionBoundGridTopologyStorage, /// Our current view. view: OurView, @@ -364,8 +327,9 @@ async fn handle_bitfield_distribution( }; let msg = BitfieldGossipMessage { relay_parent, signed_availability }; - let topology = state.topologies.get_topology(session_idx); - let required_routing = topology.required_routing_for(validator_index, true); + let topology = state.topologies.get_topology_or_fallback(session_idx); + let required_routing = topology.required_routing_by_index(validator_index, true); + relay_message( ctx, job_data, @@ -567,8 +531,10 @@ async fn process_incoming_peer_message( let message = BitfieldGossipMessage { relay_parent, signed_availability }; - let topology = state.topologies.get_topology(job_data.signing_context.session_index); - let required_routing = topology.required_routing_for(validator_index, false); + let topology = state + .topologies + .get_topology_or_fallback(job_data.signing_context.session_index); + let required_routing = topology.required_routing_by_index(validator_index, false); metrics.on_bitfield_received(); one_per_validator.insert(validator.clone(), message.clone()); diff --git a/polkadot/node/network/bitfield-distribution/src/tests.rs b/polkadot/node/network/bitfield-distribution/src/tests.rs index 16c7656052..9bd17c542f 100644 --- a/polkadot/node/network/bitfield-distribution/src/tests.rs +++ b/polkadot/node/network/bitfield-distribution/src/tests.rs @@ -19,7 +19,9 @@ use assert_matches::assert_matches; use bitvec::bitvec; use futures::executor; use maplit::hashmap; -use polkadot_node_network_protocol::{our_view, view, ObservedRole}; +use polkadot_node_network_protocol::{ + grid_topology::SessionBoundGridTopologyStorage, our_view, view, ObservedRole, +}; use polkadot_node_subsystem::{ jaeger, jaeger::{PerLeafSpan, Span}, @@ -60,7 +62,7 @@ fn prewarmed_state( let relay_parent = known_message.relay_parent.clone(); let mut topology: SessionGridTopology = Default::default(); topology.peers_x = peers.iter().cloned().collect(); - let mut topologies: BitfieldGridTopologyStorage = Default::default(); + let mut topologies = SessionBoundGridTopologyStorage::default(); topologies.update_topology(0_u32, topology); ProtocolState { per_relay_parent: hashmap! { diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml index 780b6a27d3..f3a2dae4f0 100644 --- a/polkadot/node/network/protocol/Cargo.toml +++ b/polkadot/node/network/protocol/Cargo.toml @@ -19,6 +19,7 @@ thiserror = "1.0.31" fatality = "0.0.6" rand = "0.8" derive_more = "0.99" +gum = { package = "tracing-gum", path = "../../gum" } [dev-dependencies] rand_chacha = "0.3.1" diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index c6b4b8d4da..73de9cfc25 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -37,6 +37,8 @@ use std::{ fmt::Debug, }; +const LOG_TARGET: &str = "parachain::grid-topology"; + /// The sample rate for randomly propagating messages. This /// reduces the left tail of the binomial distribution but also /// introduces a bias towards peers who we sample before others @@ -60,9 +62,13 @@ pub struct SessionGridTopology { } impl SessionGridTopology { - /// Given the originator of a message, indicates the part of the topology + /// Given the originator of a message as a validator index, indicates the part of the topology /// we're meant to send the message to. - pub fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting { + pub fn required_routing_by_index( + &self, + originator: ValidatorIndex, + local: bool, + ) -> RequiredRouting { if local { return RequiredRouting::GridXY } @@ -78,6 +84,31 @@ impl SessionGridTopology { } } + /// Given the originator of a message as a peer index, indicates the part of the topology + /// we're meant to send the message to. + pub fn required_routing_by_peer_id(&self, originator: PeerId, local: bool) -> RequiredRouting { + if local { + return RequiredRouting::GridXY + } + + let grid_x = self.peers_x.contains(&originator); + let grid_y = self.peers_y.contains(&originator); + + match (grid_x, grid_y) { + (false, false) => RequiredRouting::None, + (true, false) => RequiredRouting::GridY, // messages from X go to Y + (false, true) => RequiredRouting::GridX, // messages from Y go to X + (true, true) => { + gum::debug!( + target: LOG_TARGET, + ?originator, + "Grid topology is unexpected, play it safe and send to X AND Y" + ); + RequiredRouting::GridXY + }, // if the grid works as expected, this shouldn't happen. + } + } + /// Get a filter function based on this topology and the required routing /// which returns `true` for peers that are within the required routing set /// and false otherwise. @@ -142,6 +173,59 @@ impl SessionGridTopologies { } } } + +/// A simple storage for a topology and the corresponding session index +#[derive(Default, Debug)] +pub struct GridTopologySessionBound { + topology: SessionGridTopology, + session_index: SessionIndex, +} + +/// A storage for the current and maybe previous topology +#[derive(Default, Debug)] +pub struct SessionBoundGridTopologyStorage { + current_topology: GridTopologySessionBound, + prev_topology: Option, +} + +impl SessionBoundGridTopologyStorage { + /// Return a grid topology based on the session index: + /// If we need a previous session and it is registered in the storage, then return that session. + /// Otherwise, return a current session to have some grid topology in any case + pub fn get_topology_or_fallback(&self, idx: SessionIndex) -> &SessionGridTopology { + self.get_topology(idx).unwrap_or(&self.current_topology.topology) + } + + /// Return the grid topology for the specific session index, if no such a session is stored + /// returns `None`. + pub fn get_topology(&self, idx: SessionIndex) -> Option<&SessionGridTopology> { + if let Some(prev_topology) = &self.prev_topology { + if idx == prev_topology.session_index { + return Some(&prev_topology.topology) + } + } + if self.current_topology.session_index == idx { + return Some(&self.current_topology.topology) + } + + None + } + + /// Update the current topology preserving the previous one + pub fn update_topology(&mut self, session_index: SessionIndex, topology: SessionGridTopology) { + let old_current = std::mem::replace( + &mut self.current_topology, + GridTopologySessionBound { topology, session_index }, + ); + self.prev_topology.replace(old_current); + } + + /// Returns a current grid topology + pub fn get_current_topology(&self) -> &SessionGridTopology { + &self.current_topology.topology + } +} + /// A representation of routing based on sample #[derive(Debug, Clone, Copy)] pub struct RandomRouting { diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index cfd19713a8..00759fd9e9 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -27,6 +27,7 @@ use parity_scale_codec::Encode; use polkadot_node_network_protocol::{ self as net_protocol, + grid_topology::{RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology}, peer_set::{IsAuthority, PeerSet}, request_response::{v1 as request_v1, IncomingRequestReceiver}, v1::{self as protocol_v1, StatementMetadata}, @@ -888,7 +889,7 @@ fn check_statement_signature( /// them but now can. #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn circulate_statement_and_dependents( - gossip_peers: &HashSet, + topology_store: &SessionBoundGridTopologyStorage, peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut Context, @@ -909,6 +910,7 @@ async fn circulate_statement_and_dependents( .with_candidate(statement.payload().candidate_hash()) .with_stage(jaeger::Stage::StatementDistribution); + let topology = topology_store.get_topology_or_fallback(active_head.session_index); // First circulate the statement directly to all peers needing it. // The borrow of `active_head` needs to encompass only this (Rust) statement. let outputs: Option<(CandidateHash, Vec)> = { @@ -916,7 +918,8 @@ async fn circulate_statement_and_dependents( NotedStatement::Fresh(stored) => Some(( *stored.compact().candidate_hash(), circulate_statement( - gossip_peers, + RequiredRouting::GridXY, + topology, peers, ctx, relay_parent, @@ -1005,7 +1008,8 @@ fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) /// an iterator over peers who need to have dependent statements sent. #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn circulate_statement<'a, Context>( - gossip_peers: &HashSet, + required_routing: RequiredRouting, + topology: &SessionGridTopology, peers: &mut HashMap, ctx: &mut Context, relay_parent: Hash, @@ -1036,7 +1040,7 @@ async fn circulate_statement<'a, Context>( peers_to_send.retain(|p| !priority_set.contains(p)); util::choose_random_subset_with_rng( - |e| gossip_peers.contains(e), + |e| topology.route_to_peer(required_routing, e), &mut peers_to_send, rng, MIN_GOSSIP_PEERS, @@ -1300,7 +1304,7 @@ async fn launch_request( #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn handle_incoming_message_and_circulate<'a, Context, R>( peer: PeerId, - gossip_peers: &HashSet, + topology_storage: &SessionBoundGridTopologyStorage, peers: &mut HashMap, active_heads: &'a mut HashMap, recent_outdated_heads: &RecentOutdatedHeads, @@ -1308,6 +1312,7 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>( message: protocol_v1::StatementDistributionMessage, req_sender: &mpsc::Sender, metrics: &Metrics, + runtime: &mut RuntimeInfo, rng: &mut R, ) where R: rand::Rng, @@ -1338,8 +1343,27 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>( // that require dependents. Thus, if this is a `Seconded` statement for a candidate we // were not aware of before, we cannot have any dependent statements from the candidate. let _ = metrics.time_network_bridge_update_v1("circulate_statement"); + + let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await; + let topology = match session_index { + Ok(session_index) => topology_storage.get_topology_or_fallback(session_index), + Err(e) => { + gum::debug!( + target: LOG_TARGET, + %relay_parent, + "cannot get session index for the specific relay parent: {:?}", + e + ); + + topology_storage.get_current_topology() + }, + }; + let required_routing = + topology.required_routing_by_index(statement.statement.validator_index(), false); + let _ = circulate_statement( - gossip_peers, + required_routing, + topology, peers, ctx, relay_parent, @@ -1558,7 +1582,7 @@ async fn handle_incoming_message<'a, Context>( #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn update_peer_view_and_maybe_send_unlocked( peer: PeerId, - gossip_peers: &HashSet, + topology: &SessionGridTopology, peer_data: &mut PeerData, ctx: &mut Context, active_heads: &HashMap, @@ -1575,10 +1599,11 @@ async fn update_peer_view_and_maybe_send_unlocked( let _ = peer_data.view_knowledge.remove(removed); } - let is_gossip_peer = gossip_peers.contains(&peer); + // Use both grid directions + let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &peer); let lucky = is_gossip_peer || util::gen_ratio_rng( - util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()), + util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()), util::MIN_GOSSIP_PEERS, rng, ); @@ -1600,7 +1625,7 @@ async fn update_peer_view_and_maybe_send_unlocked( #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn handle_network_update( peers: &mut HashMap, - gossip_peers: &mut HashSet, + topology_storage: &mut SessionBoundGridTopologyStorage, authorities: &mut HashMap, active_heads: &mut HashMap, recent_outdated_heads: &RecentOutdatedHeads, @@ -1608,6 +1633,7 @@ async fn handle_network_update( req_sender: &mpsc::Sender, update: NetworkBridgeEvent, metrics: &Metrics, + runtime: &mut RuntimeInfo, rng: &mut R, ) where R: rand::Rng, @@ -1638,22 +1664,19 @@ async fn handle_network_update( } }, NetworkBridgeEvent::NewGossipTopology(topology) => { - // Combine all peers in the x & y direction as we don't make any distinction. - let new_peers: HashSet = topology - .our_neighbors_x - .values() - .chain(topology.our_neighbors_y.values()) - .flat_map(|peer_info| peer_info.peer_ids.iter().cloned()) - .collect(); let _ = metrics.time_network_bridge_update_v1("new_gossip_topology"); - let newly_added: Vec = new_peers.difference(gossip_peers).cloned().collect(); - *gossip_peers = new_peers; + + let new_session_index = topology.session; + let new_topology: SessionGridTopology = topology.into(); + let old_topology = topology_storage.get_current_topology(); + let newly_added = new_topology.peers_diff(old_topology); + topology_storage.update_topology(new_session_index, new_topology); for peer in newly_added { if let Some(data) = peers.get_mut(&peer) { let view = std::mem::take(&mut data.view); update_peer_view_and_maybe_send_unlocked( peer, - gossip_peers, + topology_storage.get_current_topology(), data, ctx, &*active_heads, @@ -1668,7 +1691,7 @@ async fn handle_network_update( NetworkBridgeEvent::PeerMessage(peer, Versioned::V1(message)) => { handle_incoming_message_and_circulate( peer, - gossip_peers, + topology_storage, peers, active_heads, &*recent_outdated_heads, @@ -1676,6 +1699,7 @@ async fn handle_network_update( message, req_sender, metrics, + runtime, rng, ) .await; @@ -1687,7 +1711,7 @@ async fn handle_network_update( Some(data) => update_peer_view_and_maybe_send_unlocked( peer, - gossip_peers, + topology_storage.get_current_topology(), data, ctx, &*active_heads, @@ -1719,7 +1743,7 @@ impl StatementDistributionSubsystem { async fn run(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> { let mut peers: HashMap = HashMap::new(); - let mut gossip_peers: HashSet = HashSet::new(); + let mut topology_storage: SessionBoundGridTopologyStorage = Default::default(); let mut authorities: HashMap = HashMap::new(); let mut active_heads: HashMap = HashMap::new(); let mut recent_outdated_heads = RecentOutdatedHeads::default(); @@ -1751,7 +1775,7 @@ impl StatementDistributionSubsystem { &mut ctx, &mut runtime, &mut peers, - &mut gossip_peers, + &mut topology_storage, &mut authorities, &mut active_heads, &mut recent_outdated_heads, @@ -1769,11 +1793,12 @@ impl StatementDistributionSubsystem { let result = self .handle_requester_message( &mut ctx, - &gossip_peers, + &topology_storage, &mut peers, &mut active_heads, &recent_outdated_heads, &req_sender, + &mut runtime, result.ok_or(FatalError::RequesterReceiverFinished)?, ) .await; @@ -1836,11 +1861,12 @@ impl StatementDistributionSubsystem { async fn handle_requester_message( &mut self, ctx: &mut Context, - gossip_peers: &HashSet, + topology_storage: &SessionBoundGridTopologyStorage, peers: &mut HashMap, active_heads: &mut HashMap, recent_outdated_heads: &RecentOutdatedHeads, req_sender: &mpsc::Sender, + runtime: &mut RuntimeInfo, message: RequesterMessage, ) -> JfyiErrorResult<()> { match message { @@ -1884,7 +1910,7 @@ impl StatementDistributionSubsystem { for message in messages { handle_incoming_message_and_circulate( peer, - gossip_peers, + topology_storage, peers, active_heads, recent_outdated_heads, @@ -1892,6 +1918,7 @@ impl StatementDistributionSubsystem { message, req_sender, &self.metrics, + runtime, &mut self.rng, ) .await; @@ -1946,7 +1973,7 @@ impl StatementDistributionSubsystem { ctx: &mut Context, runtime: &mut RuntimeInfo, peers: &mut HashMap, - gossip_peers: &mut HashSet, + topology_storage: &mut SessionBoundGridTopologyStorage, authorities: &mut HashMap, active_heads: &mut HashMap, recent_outdated_heads: &mut RecentOutdatedHeads, @@ -2046,7 +2073,7 @@ impl StatementDistributionSubsystem { } }; circulate_statement_and_dependents( - gossip_peers, + topology_storage, peers, active_heads, ctx, @@ -2061,7 +2088,7 @@ impl StatementDistributionSubsystem { StatementDistributionMessage::NetworkBridgeUpdate(event) => { handle_network_update( peers, - gossip_peers, + topology_storage, authorities, active_heads, &*recent_outdated_heads, @@ -2069,6 +2096,7 @@ impl StatementDistributionSubsystem { req_sender, event, metrics, + runtime, &mut self.rng, ) .await; diff --git a/polkadot/node/network/statement-distribution/src/tests.rs b/polkadot/node/network/statement-distribution/src/tests.rs index a0342fea2e..a7405d3299 100644 --- a/polkadot/node/network/statement-distribution/src/tests.rs +++ b/polkadot/node/network/statement-distribution/src/tests.rs @@ -505,10 +505,11 @@ fn peer_view_update_sends_messages() { let peer = PeerId::random(); executor::block_on(async move { - let gossip_peers = HashSet::from_iter(vec![peer.clone()].into_iter()); + let mut topology: SessionGridTopology = Default::default(); + topology.peers_x = HashSet::from_iter(vec![peer.clone()].into_iter()); update_peer_view_and_maybe_send_unlocked( peer.clone(), - &gossip_peers, + &topology, &mut peer_data, &mut ctx, &active_heads, @@ -634,10 +635,12 @@ fn circulated_statement_goes_to_all_peers_with_view() { }; let statement = StoredStatement { comparator: &comparator, statement: &statement }; - let gossip_peers = + let mut topology: SessionGridTopology = Default::default(); + topology.peers_x = HashSet::from_iter(vec![peer_a.clone(), peer_b.clone(), peer_c.clone()].into_iter()); let needs_dependents = circulate_statement( - &gossip_peers, + RequiredRouting::GridXY, + &topology, &mut peer_data, &mut ctx, hash_b, @@ -2004,19 +2007,31 @@ fn handle_multiple_seconded_statements() { our_neighbors_y: HashMap::new(), }; - // This is relying on the fact that statement distribution - // just extracts the peer IDs from this struct and does nothing else - // with it. + // Create a topology to ensure that we send messages not to `peer_a`/`peer_b` for (i, peer) in lucky_peers.iter().enumerate() { let authority_id = AuthorityPair::generate().0.public(); - t.our_neighbors_x.insert( + t.our_neighbors_y.insert( authority_id, network_bridge_event::TopologyPeerInfo { peer_ids: vec![peer.clone()], - validator_index: (i as u32).into(), + validator_index: (i as u32 + 2_u32).into(), }, ); } + t.our_neighbors_x.insert( + AuthorityPair::generate().0.public(), + network_bridge_event::TopologyPeerInfo { + peer_ids: vec![peer_a.clone()], + validator_index: 0_u32.into(), + }, + ); + t.our_neighbors_x.insert( + AuthorityPair::generate().0.public(), + network_bridge_event::TopologyPeerInfo { + peer_ids: vec![peer_b.clone()], + validator_index: 1_u32.into(), + }, + ); t };