Use grid topology for bitfileds distribution messages (#5389)

* Move NewGossipTopology -> SessionGridTopology outside as this implementation is shared

* Add method to return peers difference between topologies

* Implement basic grid topology usage for the bitfield distribution

* Fix tests

* Oops, fix tests

* Add some tests for random routing

* Add a unit test for topology distribution

* Store the current and the previous topology to match sessions boundaries

* Update tests

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Add some debug

* Fix tests as HashSet order is undefined

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Vsevolod Stakhov
2022-05-06 13:24:11 +01:00
committed by GitHub
parent 53a1db59bc
commit 673a32d968
9 changed files with 419 additions and 83 deletions
@@ -25,15 +25,19 @@
use futures::{channel::oneshot, FutureExt};
use polkadot_node_network_protocol::{
self as net_protocol, v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep,
Versioned, View,
self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopology},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_subsystem_util::{self as util};
use polkadot_primitives::v2::{
Hash, SessionIndex, SignedAvailabilityBitfield, SigningContext, ValidatorId,
};
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_subsystem::{
jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{HashMap, HashSet};
use self::metrics::Metrics;
@@ -76,6 +80,44 @@ impl BitfieldGossipMessage {
}
}
/// A simple storage for a topology and the corresponding session index
#[derive(Default, Debug)]
struct GridTopologySessionBound(SessionGridTopology, SessionIndex);
/// A storage for the current and maybe previous topology
#[derive(Default, Debug)]
struct BitfieldGridTopologyStorage {
current_topology: GridTopologySessionBound,
prev_topology: Option<GridTopologySessionBound>,
}
impl BitfieldGridTopologyStorage {
/// Return a grid topology based on the session index:
/// If we need a previous session and it is registered in the storage, then return that session.
/// Otherwise, return a current session to have some grid topology in any case
fn get_topology(&self, idx: SessionIndex) -> &SessionGridTopology {
if let Some(prev_topology) = &self.prev_topology {
if idx == prev_topology.1 {
return &prev_topology.0
}
}
// Return the current topology by default
&self.current_topology.0
}
/// Update the current topology preserving the previous one
fn update_topology(&mut self, idx: SessionIndex, topology: SessionGridTopology) {
let old_current =
std::mem::replace(&mut self.current_topology, GridTopologySessionBound(topology, idx));
self.prev_topology.replace(old_current);
}
/// Returns a current grid topology
fn get_current_topology(&self) -> &SessionGridTopology {
&self.current_topology.0
}
}
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Debug)]
@@ -84,9 +126,8 @@ struct ProtocolState {
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
/// Track all our neighbors in the current gossip topology.
/// We're not necessarily connected to all of them.
gossip_peers: HashSet<PeerId>,
/// The current and previous gossip topologies
topologies: BitfieldGridTopologyStorage,
/// Our current view.
view: OurView,
@@ -170,13 +211,27 @@ impl BitfieldDistribution {
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context)
async fn run<Context>(self, ctx: Context)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
Context: overseer::SubsystemContext<Message = BitfieldDistributionMessage>,
{
// work: process incoming messages from the overseer and process accordingly.
let mut state = ProtocolState::default();
let mut rng = rand::rngs::StdRng::from_entropy();
self.run_inner(ctx, &mut state, &mut rng).await
}
async fn run_inner<Context>(
self,
mut ctx: Context,
state: &mut ProtocolState,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
Context: overseer::SubsystemContext<Message = BitfieldDistributionMessage>,
{
// work: process incoming messages from the overseer and process accordingly.
loop {
let message = match ctx.recv().await {
Ok(message) => message,
@@ -200,10 +255,11 @@ impl BitfieldDistribution {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
handle_bitfield_distribution(
&mut ctx,
&mut state,
state,
&self.metrics,
relay_parent,
signed_availability,
rng,
)
.await;
},
@@ -212,7 +268,7 @@ impl BitfieldDistribution {
} => {
gum::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await;
handle_network_msg(&mut ctx, state, &self.metrics, event, rng).await;
},
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
@@ -268,7 +324,6 @@ where
ctx.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await
}
/// Distribute a given valid and signature checked bitfield message.
///
/// For this variant the source is this node.
@@ -278,6 +333,7 @@ async fn handle_bitfield_distribution<Context>(
metrics: &Metrics,
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -296,25 +352,36 @@ async fn handle_bitfield_distribution<Context>(
return
};
let session_idx = job_data.signing_context.session_index;
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
gum::debug!(target: LOG_TARGET, ?relay_parent, "validator set is empty");
return
}
let validator_index = signed_availability.validator_index().0 as usize;
let validator = if let Some(validator) = validator_set.get(validator_index) {
let validator_index = signed_availability.validator_index();
let validator = if let Some(validator) = validator_set.get(*&validator_index.0 as usize) {
validator.clone()
} else {
gum::debug!(target: LOG_TARGET, validator_index, "Could not find a validator for index");
gum::debug!(target: LOG_TARGET, validator_index = ?validator_index.0, "Could not find a validator for index");
return
};
let msg = BitfieldGossipMessage { relay_parent, signed_availability };
let gossip_peers = &state.gossip_peers;
let peer_views = &mut state.peer_views;
relay_message(ctx, job_data, gossip_peers, peer_views, validator, msg).await;
let topology = state.topologies.get_topology(session_idx);
let required_routing = topology.required_routing_for(validator_index, true);
relay_message(
ctx,
job_data,
topology,
&mut state.peer_views,
validator,
msg,
required_routing,
rng,
)
.await;
metrics.on_own_bitfield_sent();
}
@@ -325,10 +392,12 @@ async fn handle_bitfield_distribution<Context>(
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
gossip_peers: &HashSet<PeerId>,
topology: &SessionGridTopology,
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
required_routing: RequiredRouting,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -344,10 +413,12 @@ async fn relay_message<Context>(
.await;
drop(_span);
let total_peers = peer_views.len();
let mut random_routing: RandomRouting = Default::default();
let _span = span.child("interested-peers");
// pass on the bitfield distribution to all interested peers
let mut interested_peers = peer_views
let interested_peers = peer_views
.iter()
.filter_map(|(peer, view)| {
// check interest in the peer in this message's relay parent
@@ -355,7 +426,21 @@ async fn relay_message<Context>(
let message_needed =
job_data.message_from_validator_needed_by_peer(&peer, &validator);
if message_needed {
Some(peer.clone())
let in_topology = topology.route_to_peer(required_routing, &peer);
let need_routing = in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
random_routing.inc_sent();
}
route_random
};
if need_routing {
Some(peer.clone())
} else {
None
}
} else {
None
}
@@ -364,11 +449,7 @@ async fn relay_message<Context>(
}
})
.collect::<Vec<PeerId>>();
util::choose_random_subset(
|e| gossip_peers.contains(e),
&mut interested_peers,
MIN_GOSSIP_PEERS,
);
interested_peers.iter().for_each(|peer| {
// track the message as sent for this peer
job_data
@@ -403,6 +484,7 @@ async fn process_incoming_peer_message<Context>(
metrics: &Metrics,
origin: PeerId,
message: protocol_v1::BitfieldDistributionMessage,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -492,11 +574,23 @@ async fn process_incoming_peer_message<Context>(
let message = BitfieldGossipMessage { relay_parent, signed_availability };
let topology = state.topologies.get_topology(job_data.signing_context.session_index);
let required_routing = topology.required_routing_for(validator_index, false);
metrics.on_bitfield_received();
one_per_validator.insert(validator.clone(), message.clone());
relay_message(ctx, job_data, &state.gossip_peers, &mut state.peer_views, validator, message)
.await;
relay_message(
ctx,
job_data,
topology,
&mut state.peer_views,
validator,
message,
required_routing,
rng,
)
.await;
modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE_FIRST).await
}
@@ -508,6 +602,7 @@ async fn handle_network_msg<Context>(
state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<net_protocol::BitfieldDistributionMessage>,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -524,35 +619,37 @@ async fn handle_network_msg<Context>(
// get rid of superfluous data
state.peer_views.remove(&peer);
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
// Combine all peers in the x & y direction as we don't make any distinction.
let peers: HashSet<PeerId> = topology
.our_neighbors_x
.values()
.chain(topology.our_neighbors_y.values())
.flat_map(|peer_info| peer_info.peer_ids.iter().cloned())
.collect();
let newly_added: Vec<PeerId> = peers.difference(&state.gossip_peers).cloned().collect();
state.gossip_peers = peers;
NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
let session_index = gossip_topology.session;
let new_topology = SessionGridTopology::from(gossip_topology);
let newly_added = new_topology.peers_diff(&new_topology);
state.topologies.update_topology(session_index, new_topology);
gum::debug!(
target: LOG_TARGET,
?session_index,
"New gossip topology received {} unseen peers",
newly_added.len()
);
for new_peer in newly_added {
// in case we already knew that peer in the past
// it might have had an existing view, we use to initialize
// and minimize the delta on `PeerViewChange` to be sent
if let Some(old_view) = state.peer_views.remove(&new_peer) {
handle_peer_view_change(ctx, state, new_peer, old_view).await;
handle_peer_view_change(ctx, state, new_peer, old_view, rng).await;
}
}
},
NetworkBridgeEvent::PeerViewChange(peerid, new_view) => {
gum::trace!(target: LOG_TARGET, ?peerid, ?new_view, "Peer view change");
handle_peer_view_change(ctx, state, peerid, new_view).await;
handle_peer_view_change(ctx, state, peerid, new_view, rng).await;
},
NetworkBridgeEvent::OurViewChange(new_view) => {
gum::trace!(target: LOG_TARGET, ?new_view, "Our view change");
handle_our_view_change(state, new_view);
},
NetworkBridgeEvent::PeerMessage(remote, Versioned::V1(message)) =>
process_incoming_peer_message(ctx, state, metrics, remote, message).await,
process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await,
}
}
@@ -585,6 +682,7 @@ async fn handle_peer_view_change<Context>(
state: &mut ProtocolState,
origin: PeerId,
view: View,
rng: &mut (impl CryptoRng + Rng),
) where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -596,11 +694,13 @@ async fn handle_peer_view_change<Context>(
.cloned()
.collect::<Vec<_>>();
let is_gossip_peer = state.gossip_peers.contains(&origin);
let topology = state.topologies.get_current_topology();
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
let lucky = is_gossip_peer ||
util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(state.gossip_peers.len()),
util::gen_ratio_rng(
util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()),
util::MIN_GOSSIP_PEERS,
rng,
);
if !lucky {