refactor grid topology to expose more info to subsystems (#6140)

* refactor grid topology to expose more info to subsystems

* fix grid_topology test

* fix overseer test

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* fix bug in populating topology

* fmt

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
asynchronous rob
2022-10-12 18:30:12 -05:00
committed by GitHub
parent bccffcad12
commit a7780e0797
17 changed files with 614 additions and 328 deletions
@@ -27,7 +27,7 @@ use futures::{channel::oneshot, FutureExt};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{
RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology,
GridNeighbors, RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
@@ -327,7 +327,7 @@ async fn handle_bitfield_distribution<Context>(
};
let msg = BitfieldGossipMessage { relay_parent, signed_availability };
let topology = state.topologies.get_topology_or_fallback(session_idx);
let topology = state.topologies.get_topology_or_fallback(session_idx).local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, true);
relay_message(
@@ -352,7 +352,7 @@ async fn handle_bitfield_distribution<Context>(
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
topology: &SessionGridTopology,
topology_neighbors: &GridNeighbors,
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
@@ -384,7 +384,7 @@ async fn relay_message<Context>(
let message_needed =
job_data.message_from_validator_needed_by_peer(&peer, &validator);
if message_needed {
let in_topology = topology.route_to_peer(required_routing, &peer);
let in_topology = topology_neighbors.route_to_peer(required_routing, &peer);
let need_routing = in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
@@ -533,7 +533,8 @@ async fn process_incoming_peer_message<Context>(
let topology = state
.topologies
.get_topology_or_fallback(job_data.signing_context.session_index);
.get_topology_or_fallback(job_data.signing_context.session_index)
.local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, false);
metrics.on_bitfield_received();
@@ -579,14 +580,24 @@ async fn handle_network_msg<Context>(
},
NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
let session_index = gossip_topology.session;
let new_topology = SessionGridTopology::from(gossip_topology);
let newly_added = new_topology.peers_diff(&new_topology);
state.topologies.update_topology(session_index, new_topology);
let new_topology = gossip_topology.topology;
let prev_neighbors =
state.topologies.get_current_topology().local_grid_neighbors().clone();
state.topologies.update_topology(
session_index,
new_topology,
gossip_topology.local_index,
);
let current_topology = state.topologies.get_current_topology();
let newly_added = current_topology.local_grid_neighbors().peers_diff(&prev_neighbors);
gum::debug!(
target: LOG_TARGET,
?session_index,
"New gossip topology received {} unseen peers",
newly_added.len()
newly_added_peers = ?newly_added.len(),
"New gossip topology received",
);
for new_peer in newly_added {
@@ -651,7 +662,7 @@ async fn handle_peer_view_change<Context>(
.cloned()
.collect::<Vec<_>>();
let topology = state.topologies.get_current_topology();
let topology = state.topologies.get_current_topology().local_grid_neighbors();
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
let lucky = is_gossip_peer ||
util::gen_ratio_rng(
@@ -20,8 +20,10 @@ use bitvec::bitvec;
use futures::executor;
use maplit::hashmap;
use polkadot_node_network_protocol::{
grid_topology::SessionBoundGridTopologyStorage, our_view, peer_set::ValidationVersion, view,
ObservedRole,
grid_topology::{SessionBoundGridTopologyStorage, SessionGridTopology, TopologyPeerInfo},
our_view,
peer_set::ValidationVersion,
view, ObservedRole,
};
use polkadot_node_subsystem::{
jaeger,
@@ -32,6 +34,7 @@ use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v2::{AvailabilityBitfield, Signed, ValidatorIndex};
use rand_chacha::ChaCha12Rng;
use sp_application_crypto::AppKey;
use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair;
use sp_core::Pair as PairT;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
@@ -61,10 +64,11 @@ fn prewarmed_state(
peers: Vec<PeerId>,
) -> ProtocolState {
let relay_parent = known_message.relay_parent.clone();
let mut topology: SessionGridTopology = Default::default();
topology.peers_x = peers.iter().cloned().collect();
let mut topologies = SessionBoundGridTopologyStorage::default();
topologies.update_topology(0_u32, topology);
topologies.update_topology(0_u32, SessionGridTopology::new(Vec::new(), Vec::new()), None);
topologies.get_current_topology_mut().local_grid_neighbors_mut().peers_x =
peers.iter().cloned().collect();
ProtocolState {
per_relay_parent: hashmap! {
relay_parent.clone() =>
@@ -456,10 +460,9 @@ fn do_not_relay_message_twice() {
let mut rng = dummy_rng();
executor::block_on(async move {
let gossip_peers = SessionGridTopology {
peers_x: HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter()),
..Default::default()
};
let mut gossip_peers = GridNeighbors::empty();
gossip_peers.peers_x = HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter());
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
@@ -780,33 +783,43 @@ fn topology_test() {
.try_init();
let hash: Hash = [0; 32].into();
let peers_x = (0..25).map(|_| PeerId::random()).collect::<Vec<_>>();
let peers_y = (0..25).map(|_| PeerId::random()).collect::<Vec<_>>();
// ensure all unique
assert_eq!(
peers_x.iter().chain(peers_y.iter()).collect::<HashSet<_>>().len(),
peers_x.len() + peers_y.len()
);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash);
// Create a simple grid
let mut topology: SessionGridTopology = Default::default();
topology.peers_x = peers_x.iter().cloned().collect::<HashSet<_>>();
topology.validator_indices_x = peers_x
// Create a simple grid without any shuffling. We occupy position 1.
let topology_peer_info: Vec<_> = (0..49)
.map(|i| TopologyPeerInfo {
peer_ids: vec![PeerId::random()],
validator_index: ValidatorIndex(i as _),
discovery_id: AuthorityDiscoveryPair::generate().0.public(),
})
.collect();
let topology = SessionGridTopology::new((0usize..49).collect(), topology_peer_info.clone());
state.topologies.update_topology(0_u32, topology, Some(ValidatorIndex(1)));
let peers_x: Vec<_> = [0, 2, 3, 4, 5, 6]
.iter()
.enumerate()
.map(|(idx, _)| ValidatorIndex(idx as u32))
.collect::<HashSet<_>>();
topology.peers_y = peers_y.iter().cloned().collect::<HashSet<_>>();
topology.validator_indices_y = peers_y
.cloned()
.map(|i| topology_peer_info[i].peer_ids[0].clone())
.collect();
let peers_y: Vec<_> = [8, 15, 22, 29, 36, 43]
.iter()
.enumerate()
.map(|(idx, _)| ValidatorIndex((idx + peers_x.len()) as u32))
.collect::<HashSet<_>>();
state.topologies.update_topology(0_u32, topology);
.cloned()
.map(|i| topology_peer_info[i].peer_ids[0].clone())
.collect();
{
let t = state.topologies.get_current_topology().local_grid_neighbors();
for p_x in &peers_x {
assert!(t.peers_x.contains(p_x));
}
for p_y in &peers_y {
assert!(t.peers_y.contains(p_y));
}
}
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
@@ -860,7 +873,7 @@ fn topology_test() {
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::SendValidationMessage(peers, send_msg),
) => {
let topology = state.topologies.get_current_topology();
let topology = state.topologies.get_current_topology().local_grid_neighbors();
// It should send message to all peers in y direction and to 4 random peers in x direction
assert_eq!(peers_y.len() + 4, peers.len());
assert!(topology.peers_y.iter().all(|peer| peers.contains(&peer)));