Implement grid topology routing for the statement distribution subsystem (#5476)

* Move NewGossipTopology -> SessionGridTopology outside as this implementation is shared

* Add method to return peers difference between topologies

* Implement basic grid topology usage for the bitfield distribution

* Fix tests

* Oops, fix tests

* Add some tests for random routing

* Add a unit test for topology distribution

* Store the current and the previous topology to match sessions boundaries

* Update tests

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Add some debug

* Fix tests as HashSet order is undefined

* Move session bounded topology to the common code part

* Fix tests

* Allow to select routing by peer index

* Implement grid topology in the statement distribution subsystem

* Fix tests compilation

* Fix test

* Refactor API slightly

* Address review comments

* Reduce runtime error logging severity

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Update node/network/bitfield-distribution/src/tests.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Fmt run

* Use named struct

* Fix logging stuff

* One more accidental fmt damage

* Increase active queue size and add metrics

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Revert "Increase active queue size and add metrics"

This reverts commit c4f48e8bded6dfeb9c62814ba2f8d815c34b04cf.

* Use validator index to choose the routing strategy

Noted by: @rphmeier

* Fix test after distribution logic fix

Co-authored-by: Andronik <write@reusable.software>
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
Co-authored-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Vsevolod Stakhov
2022-05-18 13:27:06 +01:00
committed by GitHub
parent 28cae2ef45
commit 195b901cc9
8 changed files with 189 additions and 92 deletions
@@ -27,6 +27,7 @@ use parity_scale_codec::Encode;
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology},
peer_set::{IsAuthority, PeerSet},
request_response::{v1 as request_v1, IncomingRequestReceiver},
v1::{self as protocol_v1, StatementMetadata},
@@ -888,7 +889,7 @@ fn check_statement_signature(
/// them but now can.
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn circulate_statement_and_dependents<Context>(
gossip_peers: &HashSet<PeerId>,
topology_store: &SessionBoundGridTopologyStorage,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut Context,
@@ -909,6 +910,7 @@ async fn circulate_statement_and_dependents<Context>(
.with_candidate(statement.payload().candidate_hash())
.with_stage(jaeger::Stage::StatementDistribution);
let topology = topology_store.get_topology_or_fallback(active_head.session_index);
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
@@ -916,7 +918,8 @@ async fn circulate_statement_and_dependents<Context>(
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(
gossip_peers,
RequiredRouting::GridXY,
topology,
peers,
ctx,
relay_parent,
@@ -1005,7 +1008,8 @@ fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option<usize>)
/// an iterator over peers who need to have dependent statements sent.
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn circulate_statement<'a, Context>(
gossip_peers: &HashSet<PeerId>,
required_routing: RequiredRouting,
topology: &SessionGridTopology,
peers: &mut HashMap<PeerId, PeerData>,
ctx: &mut Context,
relay_parent: Hash,
@@ -1036,7 +1040,7 @@ async fn circulate_statement<'a, Context>(
peers_to_send.retain(|p| !priority_set.contains(p));
util::choose_random_subset_with_rng(
|e| gossip_peers.contains(e),
|e| topology.route_to_peer(required_routing, e),
&mut peers_to_send,
rng,
MIN_GOSSIP_PEERS,
@@ -1300,7 +1304,7 @@ async fn launch_request<Context>(
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_message_and_circulate<'a, Context, R>(
peer: PeerId,
gossip_peers: &HashSet<PeerId>,
topology_storage: &SessionBoundGridTopologyStorage,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
@@ -1308,6 +1312,7 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>(
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
runtime: &mut RuntimeInfo,
rng: &mut R,
) where
R: rand::Rng,
@@ -1338,8 +1343,27 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>(
// that require dependents. Thus, if this is a `Seconded` statement for a candidate we
// were not aware of before, we cannot have any dependent statements from the candidate.
let _ = metrics.time_network_bridge_update_v1("circulate_statement");
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await;
let topology = match session_index {
Ok(session_index) => topology_storage.get_topology_or_fallback(session_index),
Err(e) => {
gum::debug!(
target: LOG_TARGET,
%relay_parent,
"cannot get session index for the specific relay parent: {:?}",
e
);
topology_storage.get_current_topology()
},
};
let required_routing =
topology.required_routing_by_index(statement.statement.validator_index(), false);
let _ = circulate_statement(
gossip_peers,
required_routing,
topology,
peers,
ctx,
relay_parent,
@@ -1558,7 +1582,7 @@ async fn handle_incoming_message<'a, Context>(
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn update_peer_view_and_maybe_send_unlocked<Context, R>(
peer: PeerId,
gossip_peers: &HashSet<PeerId>,
topology: &SessionGridTopology,
peer_data: &mut PeerData,
ctx: &mut Context,
active_heads: &HashMap<Hash, ActiveHeadData>,
@@ -1575,10 +1599,11 @@ async fn update_peer_view_and_maybe_send_unlocked<Context, R>(
let _ = peer_data.view_knowledge.remove(removed);
}
let is_gossip_peer = gossip_peers.contains(&peer);
// Use both grid directions
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &peer);
let lucky = is_gossip_peer ||
util::gen_ratio_rng(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()),
util::MIN_GOSSIP_PEERS,
rng,
);
@@ -1600,7 +1625,7 @@ async fn update_peer_view_and_maybe_send_unlocked<Context, R>(
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_network_update<Context, R>(
peers: &mut HashMap<PeerId, PeerData>,
gossip_peers: &mut HashSet<PeerId>,
topology_storage: &mut SessionBoundGridTopologyStorage,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
@@ -1608,6 +1633,7 @@ async fn handle_network_update<Context, R>(
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<net_protocol::StatementDistributionMessage>,
metrics: &Metrics,
runtime: &mut RuntimeInfo,
rng: &mut R,
) where
R: rand::Rng,
@@ -1638,22 +1664,19 @@ async fn handle_network_update<Context, R>(
}
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
// Combine all peers in the x & y direction as we don't make any distinction.
let new_peers: HashSet<PeerId> = topology
.our_neighbors_x
.values()
.chain(topology.our_neighbors_y.values())
.flat_map(|peer_info| peer_info.peer_ids.iter().cloned())
.collect();
let _ = metrics.time_network_bridge_update_v1("new_gossip_topology");
let newly_added: Vec<PeerId> = new_peers.difference(gossip_peers).cloned().collect();
*gossip_peers = new_peers;
let new_session_index = topology.session;
let new_topology: SessionGridTopology = topology.into();
let old_topology = topology_storage.get_current_topology();
let newly_added = new_topology.peers_diff(old_topology);
topology_storage.update_topology(new_session_index, new_topology);
for peer in newly_added {
if let Some(data) = peers.get_mut(&peer) {
let view = std::mem::take(&mut data.view);
update_peer_view_and_maybe_send_unlocked(
peer,
gossip_peers,
topology_storage.get_current_topology(),
data,
ctx,
&*active_heads,
@@ -1668,7 +1691,7 @@ async fn handle_network_update<Context, R>(
NetworkBridgeEvent::PeerMessage(peer, Versioned::V1(message)) => {
handle_incoming_message_and_circulate(
peer,
gossip_peers,
topology_storage,
peers,
active_heads,
&*recent_outdated_heads,
@@ -1676,6 +1699,7 @@ async fn handle_network_update<Context, R>(
message,
req_sender,
metrics,
runtime,
rng,
)
.await;
@@ -1687,7 +1711,7 @@ async fn handle_network_update<Context, R>(
Some(data) =>
update_peer_view_and_maybe_send_unlocked(
peer,
gossip_peers,
topology_storage.get_current_topology(),
data,
ctx,
&*active_heads,
@@ -1719,7 +1743,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut gossip_peers: HashSet<PeerId> = HashSet::new();
let mut topology_storage: SessionBoundGridTopologyStorage = Default::default();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut recent_outdated_heads = RecentOutdatedHeads::default();
@@ -1751,7 +1775,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
&mut ctx,
&mut runtime,
&mut peers,
&mut gossip_peers,
&mut topology_storage,
&mut authorities,
&mut active_heads,
&mut recent_outdated_heads,
@@ -1769,11 +1793,12 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
let result = self
.handle_requester_message(
&mut ctx,
&gossip_peers,
&topology_storage,
&mut peers,
&mut active_heads,
&recent_outdated_heads,
&req_sender,
&mut runtime,
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
.await;
@@ -1836,11 +1861,12 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
async fn handle_requester_message<Context>(
&mut self,
ctx: &mut Context,
gossip_peers: &HashSet<PeerId>,
topology_storage: &SessionBoundGridTopologyStorage,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>,
runtime: &mut RuntimeInfo,
message: RequesterMessage,
) -> JfyiErrorResult<()> {
match message {
@@ -1884,7 +1910,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
for message in messages {
handle_incoming_message_and_circulate(
peer,
gossip_peers,
topology_storage,
peers,
active_heads,
recent_outdated_heads,
@@ -1892,6 +1918,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
message,
req_sender,
&self.metrics,
runtime,
&mut self.rng,
)
.await;
@@ -1946,7 +1973,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
peers: &mut HashMap<PeerId, PeerData>,
gossip_peers: &mut HashSet<PeerId>,
topology_storage: &mut SessionBoundGridTopologyStorage,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &mut RecentOutdatedHeads,
@@ -2046,7 +2073,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
}
};
circulate_statement_and_dependents(
gossip_peers,
topology_storage,
peers,
active_heads,
ctx,
@@ -2061,7 +2088,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
StatementDistributionMessage::NetworkBridgeUpdate(event) => {
handle_network_update(
peers,
gossip_peers,
topology_storage,
authorities,
active_heads,
&*recent_outdated_heads,
@@ -2069,6 +2096,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
req_sender,
event,
metrics,
runtime,
&mut self.rng,
)
.await;
@@ -505,10 +505,11 @@ fn peer_view_update_sends_messages() {
let peer = PeerId::random();
executor::block_on(async move {
let gossip_peers = HashSet::from_iter(vec![peer.clone()].into_iter());
let mut topology: SessionGridTopology = Default::default();
topology.peers_x = HashSet::from_iter(vec![peer.clone()].into_iter());
update_peer_view_and_maybe_send_unlocked(
peer.clone(),
&gossip_peers,
&topology,
&mut peer_data,
&mut ctx,
&active_heads,
@@ -634,10 +635,12 @@ fn circulated_statement_goes_to_all_peers_with_view() {
};
let statement = StoredStatement { comparator: &comparator, statement: &statement };
let gossip_peers =
let mut topology: SessionGridTopology = Default::default();
topology.peers_x =
HashSet::from_iter(vec![peer_a.clone(), peer_b.clone(), peer_c.clone()].into_iter());
let needs_dependents = circulate_statement(
&gossip_peers,
RequiredRouting::GridXY,
&topology,
&mut peer_data,
&mut ctx,
hash_b,
@@ -2004,19 +2007,31 @@ fn handle_multiple_seconded_statements() {
our_neighbors_y: HashMap::new(),
};
// This is relying on the fact that statement distribution
// just extracts the peer IDs from this struct and does nothing else
// with it.
// Create a topology to ensure that we send messages not to `peer_a`/`peer_b`
for (i, peer) in lucky_peers.iter().enumerate() {
let authority_id = AuthorityPair::generate().0.public();
t.our_neighbors_x.insert(
t.our_neighbors_y.insert(
authority_id,
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![peer.clone()],
validator_index: (i as u32).into(),
validator_index: (i as u32 + 2_u32).into(),
},
);
}
t.our_neighbors_x.insert(
AuthorityPair::generate().0.public(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![peer_a.clone()],
validator_index: 0_u32.into(),
},
);
t.our_neighbors_x.insert(
AuthorityPair::generate().0.public(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![peer_b.clone()],
validator_index: 1_u32.into(),
},
);
t
};