Extract peers grid topology related code to a separate unit (#5365)

* Initial attempt to extract grid topology related code

* Use shared code in the approval distribution subsystem

* Fix spellcheck issues

* Moe Aggression stuff back to the approval-distribution subsystem

* Cargo fmt
This commit is contained in:
Vsevolod Stakhov
2022-04-22 11:19:36 +01:00
committed by GitHub
parent bb3cc7b041
commit a5742b9ec1
6 changed files with 256 additions and 159 deletions
+1
View File
@@ -6993,6 +6993,7 @@ dependencies = [
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-primitives",
"rand 0.8.5",
"sc-authority-discovery",
"sc-network",
"strum 0.24.0",
@@ -22,8 +22,9 @@
use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::{
self as net_protocol, v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned,
View,
self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::approval::{
AssignmentCert, BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote,
@@ -62,14 +63,6 @@ const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message"
const BENEFIT_VALID_MESSAGE_FIRST: Rep =
Rep::BenefitMinorFirst("Valid message with new information");
/// The number of peers to randomly propagate messages to.
const RANDOM_CIRCULATION: usize = 4;
/// The sample rate for randomly propagating messages. This
/// reduces the left tail of the binomial distribution but also
/// introduces a bias towards peers who we sample before others
/// (i.e. those who get a block before others).
const RANDOM_SAMPLE_RATE: usize = polkadot_node_subsystem_util::MIN_GOSSIP_PEERS;
/// The Approval Distribution subsystem.
pub struct ApprovalDistribution {
metrics: Metrics,
@@ -98,99 +91,26 @@ impl RecentlyOutdated {
}
}
struct SessionTopology {
peers_x: HashSet<PeerId>,
validator_indices_x: HashSet<ValidatorIndex>,
peers_y: HashSet<PeerId>,
validator_indices_y: HashSet<ValidatorIndex>,
}
impl SessionTopology {
// Given the originator of a message, indicates the part of the topology
// we're meant to send the message to.
fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting {
if local {
return RequiredRouting::GridXY
}
let grid_x = self.validator_indices_x.contains(&originator);
let grid_y = self.validator_indices_y.contains(&originator);
match (grid_x, grid_y) {
(false, false) => RequiredRouting::None,
(true, false) => RequiredRouting::GridY, // messages from X go to Y
(false, true) => RequiredRouting::GridX, // messages from Y go to X
(true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen.
}
}
// Get a filter function based on this topology and the required routing
// which returns `true` for peers that are within the required routing set
// and false otherwise.
fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool {
match required_routing {
RequiredRouting::All => true,
RequiredRouting::GridX => self.peers_x.contains(peer),
RequiredRouting::GridY => self.peers_y.contains(peer),
RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer),
RequiredRouting::None | RequiredRouting::PendingTopology => false,
}
}
}
impl From<network_bridge_event::NewGossipTopology> for SessionTopology {
fn from(topology: network_bridge_event::NewGossipTopology) -> Self {
let peers_x =
topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect();
let peers_y =
topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect();
let validator_indices_x =
topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect();
let validator_indices_y =
topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect();
SessionTopology { peers_x, peers_y, validator_indices_x, validator_indices_y }
}
}
#[derive(Default)]
struct SessionTopologies {
inner: HashMap<SessionIndex, (Option<SessionTopology>, usize)>,
}
impl SessionTopologies {
fn get_topology(&self, session: SessionIndex) -> Option<&SessionTopology> {
self.inner.get(&session).and_then(|val| val.0.as_ref())
}
fn inc_session_refs(&mut self, session: SessionIndex) {
self.inner.entry(session).or_insert((None, 0)).1 += 1;
}
fn dec_session_refs(&mut self, session: SessionIndex) {
if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) {
occupied.get_mut().1 = occupied.get().1.saturating_sub(1);
if occupied.get().1 == 0 {
let _ = occupied.remove();
}
}
}
// No-op if already present.
fn insert_topology(&mut self, session: SessionIndex, topology: SessionTopology) {
let entry = self.inner.entry(session).or_insert((None, 0));
if entry.0.is_none() {
entry.0 = Some(topology);
}
}
}
// In case the original gtid topology mechanisms don't work on their own, we need to trade bandwidth
// for protocol liveliness by introducing aggression.
//
// Aggression has 3 levels:
//
// * Aggression Level 0: The basic behaviors described above.
// * Aggression Level 1: The originator of a message sends to all peers. Other peers follow the rules above.
// * Aggression Level 2: All peers send all messages to all their row and column neighbors.
// This means that each validator will, on average, receive each message approximately `2*sqrt(n)` times.
// The aggression level of messages pertaining to a block increases when that block is unfinalized and
// is a child of the finalized block.
// This means that only one block at a time has its messages propagated with aggression > 0.
//
// A note on aggression thresholds: changes in propagation apply only to blocks which are the
// _direct descendants_ of the finalized block which are older than the given threshold,
// not to all blocks older than the threshold. Most likely, a few assignments struggle to
// be propagated in a single block and this holds up all of its descendants blocks.
// Accordingly, we only step on the gas for the block which is most obviously holding up finality.
/// Aggression configuration representation
#[derive(Clone)]
struct AggressionConfig {
/// Aggression level 1: all validators send all their own messages to all peers.
@@ -203,6 +123,7 @@ struct AggressionConfig {
}
impl AggressionConfig {
/// Returns `true` if block is not too old depending on the aggression level
fn is_age_relevant(&self, block_age: BlockNumber) -> bool {
if let Some(t) = self.l1_threshold {
block_age >= t
@@ -224,6 +145,29 @@ impl Default for AggressionConfig {
}
}
struct ApprovalGridTopology(SessionGridTopology);
impl From<network_bridge_event::NewGossipTopology> for ApprovalGridTopology {
fn from(topology: network_bridge_event::NewGossipTopology) -> Self {
let peers_x =
topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect();
let peers_y =
topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect();
let validator_indices_x =
topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect();
let validator_indices_y =
topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect();
ApprovalGridTopology(SessionGridTopology {
peers_x,
peers_y,
validator_indices_x,
validator_indices_y,
})
}
}
#[derive(PartialEq)]
enum Resend {
Yes,
@@ -252,7 +196,7 @@ struct State {
peer_views: HashMap<PeerId, View>,
/// Keeps a topology for various different sessions.
topologies: SessionTopologies,
topologies: SessionGridTopologies,
/// Tracks recently finalized blocks.
recent_outdated_blocks: RecentlyOutdated,
@@ -361,58 +305,6 @@ impl ApprovalState {
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum RequiredRouting {
/// We don't know yet, because we're waiting for topology info
/// (race condition between learning about the first blocks in a new session
/// and getting the topology for that session)
PendingTopology,
/// Propagate to all peers of any kind.
All,
/// Propagate to all peers sharing either the X or Y dimension of the grid.
GridXY,
/// Propagate to all peers sharing the X dimension of the grid.
GridX,
/// Propagate to all peers sharing the Y dimension of the grid.
GridY,
/// No required propagation.
None,
}
impl RequiredRouting {
// Whether the required routing set is definitely empty.
fn is_empty(self) -> bool {
match self {
RequiredRouting::PendingTopology | RequiredRouting::None => true,
_ => false,
}
}
}
#[derive(Debug, Default, Clone, Copy)]
struct RandomRouting {
// The number of peers to target.
target: usize,
// The number of peers this has been sent to.
sent: usize,
}
impl RandomRouting {
fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool {
if n_peers_total == 0 || self.sent >= self.target {
false
} else if RANDOM_SAMPLE_RATE > n_peers_total {
true
} else {
rng.gen_ratio(RANDOM_SAMPLE_RATE as _, n_peers_total as _)
}
}
fn inc_sent(&mut self) {
self.sent += 1
}
}
// routing state bundled with messages for the candidate. Corresponding assignments
// and approvals are stored together and should be routed in the same way, with
// assignments preceding approvals in all cases.
@@ -476,8 +368,12 @@ impl State {
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
let session = topology.session;
self.handle_new_session_topology(ctx, session, SessionTopology::from(topology))
.await;
self.handle_new_session_topology(
ctx,
session,
ApprovalGridTopology::from(topology),
)
.await;
},
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await;
@@ -632,9 +528,9 @@ impl State {
ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
+ overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
session: SessionIndex,
topology: SessionTopology,
topology: ApprovalGridTopology,
) {
self.topologies.insert_topology(session, topology);
self.topologies.insert_topology(session, topology.0);
let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
adjust_required_routing_and_propagate(
@@ -1000,7 +896,7 @@ impl State {
candidate_entry.messages.entry(validator_index).or_insert_with(|| MessageState {
required_routing,
local,
random_routing: RandomRouting { target: RANDOM_CIRCULATION, sent: 0 },
random_routing: Default::default(),
approval_state: ApprovalState::Assigned(assignment.cert.clone()),
})
},
@@ -1344,7 +1240,7 @@ impl State {
+ overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
metrics: &Metrics,
entries: &mut HashMap<Hash, BlockEntry>,
topologies: &SessionTopologies,
topologies: &SessionGridTopologies,
total_peers: usize,
peer_id: PeerId,
view: View,
@@ -1592,7 +1488,7 @@ async fn adjust_required_routing_and_propagate(
ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
+ overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
blocks: &mut HashMap<Hash, BlockEntry>,
topologies: &SessionTopologies,
topologies: &SessionGridTopologies,
block_filter: impl Fn(&mut BlockEntry) -> bool,
routing_modifier: impl Fn(&mut RequiredRouting, bool, &ValidatorIndex),
) {
+2 -1
View File
@@ -17,4 +17,5 @@ strum = { version = "0.24", features = ["derive"] }
futures = "0.3.21"
thiserror = "1.0.30"
fatality = "0.0.6"
derive_more = "0.99"
rand = "0.8"
derive_more = "0.99"
@@ -0,0 +1,195 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Grid topology support implementation
//! The basic operation of the 2D grid topology is that:
//! * A validator producing a message sends it to its row-neighbors and its column-neighbors
//! * A validator receiving a message originating from one of its row-neighbors sends it to its column-neighbors
//! * A validator receiving a message originating from one of its column-neighbors sends it to its row-neighbors
//!
//! This grid approach defines 2 unique paths for every validator to reach every other validator in at most 2 hops.
//!
//! However, we also supplement this with some degree of random propagation:
//! every validator, upon seeing a message for the first time, propagates it to 8 random peers.
//! This inserts some redundancy in case the grid topology isn't working or is being attacked -
//! an adversary doesn't know which peers a validator will send to.
//! This is combined with the property that the adversary doesn't know which validators will elect to check a block.
//!
use crate::PeerId;
use polkadot_primitives::v2::{SessionIndex, ValidatorIndex};
use rand::{CryptoRng, Rng};
use std::{
collections::{hash_map, HashMap, HashSet},
fmt::Debug,
};
/// The sample rate for randomly propagating messages. This
/// reduces the left tail of the binomial distribution but also
/// introduces a bias towards peers who we sample before others
/// (i.e. those who get a block before others).
pub const DEFAULT_RANDOM_SAMPLE_RATE: usize = crate::MIN_GOSSIP_PEERS;
/// The number of peers to randomly propagate messages to.
pub const DEFAULT_RANDOM_CIRCULATION: usize = 4;
/// Topology representation
pub struct SessionGridTopology {
/// Represent peers in the X axis
pub peers_x: HashSet<PeerId>,
/// Represent validators in the X axis
pub validator_indices_x: HashSet<ValidatorIndex>,
/// Represent peers in the Y axis
pub peers_y: HashSet<PeerId>,
/// Represent validators in the Y axis
pub validator_indices_y: HashSet<ValidatorIndex>,
}
impl SessionGridTopology {
/// Given the originator of a message, indicates the part of the topology
/// we're meant to send the message to.
pub fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting {
if local {
return RequiredRouting::GridXY
}
let grid_x = self.validator_indices_x.contains(&originator);
let grid_y = self.validator_indices_y.contains(&originator);
match (grid_x, grid_y) {
(false, false) => RequiredRouting::None,
(true, false) => RequiredRouting::GridY, // messages from X go to Y
(false, true) => RequiredRouting::GridX, // messages from Y go to X
(true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen.
}
}
/// Get a filter function based on this topology and the required routing
/// which returns `true` for peers that are within the required routing set
/// and false otherwise.
pub fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool {
match required_routing {
RequiredRouting::All => true,
RequiredRouting::GridX => self.peers_x.contains(peer),
RequiredRouting::GridY => self.peers_y.contains(peer),
RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer),
RequiredRouting::None | RequiredRouting::PendingTopology => false,
}
}
}
/// A set of topologies indexed by session
#[derive(Default)]
pub struct SessionGridTopologies {
inner: HashMap<SessionIndex, (Option<SessionGridTopology>, usize)>,
}
impl SessionGridTopologies {
/// Returns a topology for the specific session index
pub fn get_topology(&self, session: SessionIndex) -> Option<&SessionGridTopology> {
self.inner.get(&session).and_then(|val| val.0.as_ref())
}
/// Increase references counter for a specific topology
pub fn inc_session_refs(&mut self, session: SessionIndex) {
self.inner.entry(session).or_insert((None, 0)).1 += 1;
}
/// Decrease references counter for a specific topology
pub fn dec_session_refs(&mut self, session: SessionIndex) {
if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) {
occupied.get_mut().1 = occupied.get().1.saturating_sub(1);
if occupied.get().1 == 0 {
let _ = occupied.remove();
}
}
}
/// Insert a new topology, no-op if already present.
pub fn insert_topology(&mut self, session: SessionIndex, topology: SessionGridTopology) {
let entry = self.inner.entry(session).or_insert((None, 0));
if entry.0.is_none() {
entry.0 = Some(topology);
}
}
}
/// A representation of routing based on sample
#[derive(Debug, Clone, Copy)]
pub struct RandomRouting {
/// The number of peers to target.
target: usize,
/// The number of peers this has been sent to.
sent: usize,
/// Sampling rate
sample_rate: usize,
}
impl Default for RandomRouting {
fn default() -> Self {
RandomRouting {
target: DEFAULT_RANDOM_CIRCULATION,
sent: 0_usize,
sample_rate: DEFAULT_RANDOM_SAMPLE_RATE,
}
}
}
impl RandomRouting {
/// Perform random sampling for a specific peer
/// Returns `true` for a lucky peer
pub fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool {
if n_peers_total == 0 || self.sent >= self.target {
false
} else if self.sample_rate > n_peers_total {
true
} else {
rng.gen_ratio(self.sample_rate as _, n_peers_total as _)
}
}
/// Increase number of messages being sent
pub fn inc_sent(&mut self) {
self.sent += 1
}
}
/// Routing mode
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum RequiredRouting {
/// We don't know yet, because we're waiting for topology info
/// (race condition between learning about the first blocks in a new session
/// and getting the topology for that session)
PendingTopology,
/// Propagate to all peers of any kind.
All,
/// Propagate to all peers sharing either the X or Y dimension of the grid.
GridXY,
/// Propagate to all peers sharing the X dimension of the grid.
GridX,
/// Propagate to all peers sharing the Y dimension of the grid.
GridY,
/// No required propagation.
None,
}
impl RequiredRouting {
/// Whether the required routing set is definitely empty.
pub fn is_empty(self) -> bool {
match self {
RequiredRouting::PendingTopology | RequiredRouting::None => true,
_ => false,
}
}
}
@@ -40,6 +40,8 @@ pub mod request_response;
/// Accessing authority discovery service
pub mod authority_discovery;
/// Grid topology support module
pub mod grid_topology;
/// A version of the protocol.
pub type ProtocolVersion = u32;
+2
View File
@@ -1,4 +1,5 @@
150
2D
A&V
accessor/MS
AccountId
@@ -270,6 +271,7 @@ tera/M
teleports
timeframe
timestamp/MS
topologies
tradeoff
transitionary
trie/MS