Substrate companion: Authority discovery multiple peer ids (#4295)

* Substrate companion: Authority discovery multiple peer ids

Authority discovery before had a fixed mapping from `PeerId` to
`AuthorityId`. This wasn't correct, as a `PeerId` can actually map to
multiple `AuthorityId`s. The linked Substrate pr fixes this.

https://github.com/paritytech/substrate/pull/10259

* Update node/network/availability-distribution/src/requester/mod.rs

* Update node/network/collator-protocol/src/validator_side/mod.rs

* Update node/network/statement-distribution/src/tests.rs

* Update guide

* Adapt to Substrate pr

* Update Substrate
This commit is contained in:
Bastian Köcher
2021-11-17 11:35:02 +01:00
committed by GitHub
parent 6664019e9d
commit 620a4e45de
17 changed files with 281 additions and 256 deletions
@@ -25,7 +25,7 @@
//! the `NetworkBridgeMessage::NewGossipTopology` message.
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt,
time::{Duration, Instant},
};
@@ -94,14 +94,14 @@ pub struct GossipSupport<AD> {
/// Successfully resolved connections
///
/// waiting for actual connection.
resolved_authorities: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
resolved_authorities: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
/// Actually connected authorities.
connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
/// By `PeerId`.
///
/// Needed for efficient handling of disconnect events.
connected_authorities_by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
connected_authorities_by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
/// Authority discovery service.
authority_discovery: AD,
}
@@ -299,14 +299,19 @@ where
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSuppportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => {
if let Some(authority) = o_authority {
self.connected_authorities.insert(authority.clone(), peer_id);
self.connected_authorities_by_peer_id.insert(peer_id, authority);
if let Some(authority_ids) = o_authority {
authority_ids.iter().for_each(|a| {
self.connected_authorities.insert(a.clone(), peer_id);
});
self.connected_authorities_by_peer_id.insert(peer_id, authority_ids);
}
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) {
self.connected_authorities.remove(&authority);
if let Some(authority_ids) = self.connected_authorities_by_peer_id.remove(&peer_id)
{
authority_ids.into_iter().for_each(|a| {
self.connected_authorities.remove(&a);
});
}
},
NetworkBridgeEvent::OurViewChange(_) => {},
@@ -474,7 +479,7 @@ struct PrettyAuthorities<I>(I);
impl<'a, I> fmt::Display for PrettyAuthorities<I>
where
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a Vec<Multiaddr>)> + Clone,
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a HashSet<Multiaddr>)> + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut authorities = self.0.clone().peekable();
@@ -16,7 +16,7 @@
//! Unit tests for Gossip Support Subsystem.
use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use async_trait::async_trait;
@@ -64,8 +64,8 @@ type VirtualOverseer = test_helpers::TestSubsystemContextHandle<GossipSupportMes
#[derive(Debug, Clone)]
struct MockAuthorityDiscovery {
addrs: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
authorities: HashMap<PeerId, AuthorityDiscoveryId>,
addrs: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
authorities: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
}
impl MockAuthorityDiscovery {
@@ -77,10 +77,13 @@ impl MockAuthorityDiscovery {
.into_iter()
.map(|(p, a)| {
let multiaddr = Multiaddr::empty().with(Protocol::P2p(p.into()));
(a, vec![multiaddr])
(a, HashSet::from([multiaddr]))
})
.collect();
Self { addrs, authorities }
Self {
addrs,
authorities: authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(),
}
}
}
@@ -89,18 +92,18 @@ impl AuthorityDiscovery for MockAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
authority: polkadot_primitives::v1::AuthorityDiscoveryId,
) -> Option<Vec<sc_network::Multiaddr>> {
) -> Option<HashSet<sc_network::Multiaddr>> {
self.addrs.get(&authority).cloned()
}
async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
peer_id: polkadot_node_network_protocol::PeerId,
) -> Option<polkadot_primitives::v1::AuthorityDiscoveryId> {
) -> Option<HashSet<polkadot_primitives::v1::AuthorityDiscoveryId>> {
self.authorities.get(&peer_id).cloned()
}
}
async fn get_other_authorities_addrs() -> Vec<Vec<Multiaddr>> {
async fn get_other_authorities_addrs() -> Vec<HashSet<Multiaddr>> {
let mut addrs = Vec::with_capacity(OTHER_AUTHORITIES.len());
let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone();
for authority in OTHER_AUTHORITIES.iter().cloned() {
@@ -111,7 +114,7 @@ async fn get_other_authorities_addrs() -> Vec<Vec<Multiaddr>> {
addrs
}
async fn get_other_authorities_addrs_map() -> HashMap<AuthorityDiscoveryId, Vec<Multiaddr>> {
async fn get_other_authorities_addrs_map() -> HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>> {
let mut addrs = HashMap::with_capacity(OTHER_AUTHORITIES.len());
let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone();
for authority in OTHER_AUTHORITIES.iter().cloned() {
@@ -332,11 +335,11 @@ fn test_log_output() {
let mut m = HashMap::new();
let peer_id = PeerId::random();
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let addrs = vec![addr.clone(), addr];
let addrs = HashSet::from([addr.clone(), addr]);
m.insert(alice, addrs);
let peer_id = PeerId::random();
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let addrs = vec![addr.clone(), addr];
let addrs = HashSet::from([addr.clone(), addr]);
m.insert(bob, addrs);
m
};
@@ -389,16 +392,14 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
mut validator_addrs,
validator_addrs,
peer_set,
}) => {
let mut expected = get_other_authorities_addrs_map().await;
expected.remove(&alice);
expected.remove(&bob);
let mut expected: Vec<Vec<Multiaddr>> = expected.into_iter().map(|(_,v)| v).collect();
validator_addrs.sort();
expected.sort();
assert_eq!(validator_addrs, expected);
let expected: HashSet<Multiaddr> = expected.into_iter().map(|(_,v)| v.into_iter()).flatten().collect();
assert_eq!(validator_addrs.into_iter().map(|v| v.into_iter()).flatten().collect::<HashSet<_>>(), expected);
assert_eq!(peer_set, PeerSet::Validation);
}
);
@@ -443,15 +444,13 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators {
mut validator_addrs,
validator_addrs,
peer_set,
}) => {
let mut expected = get_other_authorities_addrs_map().await;
expected.remove(&bob);
let mut expected: Vec<Vec<Multiaddr>> = expected.into_iter().map(|(_,v)| v).collect();
expected.sort();
validator_addrs.sort();
assert_eq!(validator_addrs, expected);
let expected: HashSet<Multiaddr> = expected.into_iter().map(|(_,v)| v.into_iter()).flatten().collect();
assert_eq!(validator_addrs.into_iter().map(|v| v.into_iter()).flatten().collect::<HashSet<_>>(), expected);
assert_eq!(peer_set, PeerSet::Validation);
}
);