mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 04:21:01 +00:00
Make sure validator discovery works with a delayed peer to validator mapping (#1886)
* Make sure validator discovery works with a delayed peer to validator mapping Currently the implementation checks on connect of a peer if this peer is a validator by asking the authority discovery. It can now happen that the authority discovery is not yet aware that a given peer is an authority. This can for example happen on start up of the node. This pr changes the behavior, to make it possible to later associate a peer to a validator id. Instead of just storing the connected validators, we now store all connected peers with a vector of associated validator ids. When we get a request to connect to a given given set of validators, we start by checking the connected peers. If we didn't find a validator id in the connected peers, we ask the authority discovery for the peerid of a given authority id. When the returned peerid is part of our connected peers set, we cache and return the authority id. * Update node/network/bridge/Cargo.toml Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Update node/network/bridge/src/validator_discovery.rs Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Update `Cargo.lock` Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
|
//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
|
||||||
|
|
||||||
#![deny(unused_crate_dependencies, unused_results)]
|
#![deny(unused_crate_dependencies)]
|
||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
|
|
||||||
|
|
||||||
@@ -703,7 +703,7 @@ where
|
|||||||
PeerSet::Collation => &mut collation_peers,
|
PeerSet::Collation => &mut collation_peers,
|
||||||
};
|
};
|
||||||
|
|
||||||
validator_discovery.on_peer_disconnected(&peer, &mut authority_discovery_service).await;
|
validator_discovery.on_peer_disconnected(&peer);
|
||||||
|
|
||||||
if peer_map.remove(&peer).is_some() {
|
if peer_map.remove(&peer).is_some() {
|
||||||
let res = match peer_set {
|
let res = match peer_set {
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ use std::sync::Arc;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::channel::{mpsc, oneshot};
|
use futures::channel::{mpsc, oneshot};
|
||||||
|
|
||||||
use sc_network::Multiaddr;
|
use sc_network::multiaddr::{Multiaddr, Protocol};
|
||||||
use sc_authority_discovery::Service as AuthorityDiscoveryService;
|
use sc_authority_discovery::Service as AuthorityDiscoveryService;
|
||||||
use polkadot_node_network_protocol::PeerId;
|
use polkadot_node_network_protocol::PeerId;
|
||||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
|
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
|
||||||
@@ -70,7 +70,6 @@ impl AuthorityDiscovery for AuthorityDiscoveryService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// This struct tracks the state for one `ConnectToValidators` request.
|
/// This struct tracks the state for one `ConnectToValidators` request.
|
||||||
struct NonRevokedConnectionRequestState {
|
struct NonRevokedConnectionRequestState {
|
||||||
requested: Vec<AuthorityDiscoveryId>,
|
requested: Vec<AuthorityDiscoveryId>,
|
||||||
@@ -115,39 +114,93 @@ impl NonRevokedConnectionRequestState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Will be called by [`Service::on_request`] when a request was revoked.
|
||||||
|
///
|
||||||
|
/// Takes the `map` of requested validators and the `id` of the validator that should be revoked.
|
||||||
|
///
|
||||||
|
/// Returns `Some(id)` iff the request counter is `0`.
|
||||||
|
fn on_revoke(map: &mut HashMap<AuthorityDiscoveryId, u64>, id: AuthorityDiscoveryId) -> Option<AuthorityDiscoveryId> {
|
||||||
|
if let hash_map::Entry::Occupied(mut entry) = map.entry(id) {
|
||||||
|
if entry.get_mut().saturating_sub(1) == 0 {
|
||||||
|
return Some(entry.remove_entry().0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option<PeerId> {
|
||||||
|
addr.iter().last().and_then(|protocol| if let Protocol::P2p(multihash) = protocol {
|
||||||
|
PeerId::from_multihash(multihash).ok()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) struct Service<N, AD> {
|
pub(super) struct Service<N, AD> {
|
||||||
// we assume one PeerId per AuthorityId is enough
|
// Peers that are connected to us and authority ids associated to them.
|
||||||
connected_validators: HashMap<AuthorityDiscoveryId, PeerId>,
|
connected_peers: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
|
||||||
// the `u64` counts the number of pending non-revoked requests for this validator
|
// The `u64` counts the number of pending non-revoked requests for this validator
|
||||||
// note: the validators in this map are not necessarily present
|
// note: the validators in this map are not necessarily present
|
||||||
// in the `connected_validators` map.
|
// in the `connected_validators` map.
|
||||||
// Invariant: the value > 0 for non-revoked requests.
|
// Invariant: the value > 0 for non-revoked requests.
|
||||||
requested_validators: HashMap<AuthorityDiscoveryId, u64>,
|
requested_validators: HashMap<AuthorityDiscoveryId, u64>,
|
||||||
non_revoked_discovery_requests: Vec<NonRevokedConnectionRequestState>,
|
non_revoked_discovery_requests: Vec<NonRevokedConnectionRequestState>,
|
||||||
// PhantomData used to make the struct generic instead of having generic methods
|
// PhantomData used to make the struct generic instead of having generic methods
|
||||||
network: PhantomData<N>,
|
_phantom: PhantomData<(N, AD)>,
|
||||||
authority_discovery: PhantomData<AD>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
connected_validators: HashMap::new(),
|
connected_peers: HashMap::new(),
|
||||||
requested_validators: HashMap::new(),
|
requested_validators: HashMap::new(),
|
||||||
non_revoked_discovery_requests: Vec::new(),
|
non_revoked_discovery_requests: Vec::new(),
|
||||||
network: PhantomData,
|
_phantom: PhantomData,
|
||||||
authority_discovery: PhantomData,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Find connected validators using the given `validator_ids`.
|
||||||
|
///
|
||||||
|
/// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s.
|
||||||
|
async fn find_connected_validators(
|
||||||
|
&mut self,
|
||||||
|
validator_ids: &[AuthorityDiscoveryId],
|
||||||
|
authority_discovery_service: &mut AD,
|
||||||
|
) -> HashMap<AuthorityDiscoveryId, PeerId> {
|
||||||
|
let mut result = HashMap::new();
|
||||||
|
|
||||||
|
for id in validator_ids {
|
||||||
|
// First check if we already cached the validator
|
||||||
|
if let Some(pid) = self.connected_peers
|
||||||
|
.iter()
|
||||||
|
.find_map(|(pid, ids)| if ids.contains(&id) { Some(pid) } else { None }) {
|
||||||
|
result.insert(id.clone(), pid.clone());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not ask the authority discovery
|
||||||
|
if let Some(addresses) = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await {
|
||||||
|
for peer_id in addresses.iter().filter_map(peer_id_from_multiaddr) {
|
||||||
|
if let Some(ids) = self.connected_peers.get_mut(&peer_id) {
|
||||||
|
ids.insert(id.clone());
|
||||||
|
result.insert(id.clone(), peer_id.clone());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
/// On a new connection request, a priority group update will be issued.
|
/// On a new connection request, a priority group update will be issued.
|
||||||
/// It will ask the network to connect to the validators and not disconnect
|
/// It will ask the network to connect to the validators and not disconnect
|
||||||
/// from them at least until all the pending requests containing them are revoked.
|
/// from them at least until all the pending requests containing them are revoked.
|
||||||
///
|
///
|
||||||
/// This method will also clean up all previously revoked requests.
|
/// This method will also clean up all previously revoked requests.
|
||||||
// it takes `network_service` and `authority_discovery_service` by value
|
/// it takes `network_service` and `authority_discovery_service` by value
|
||||||
// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
|
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
|
||||||
pub async fn on_request(
|
pub async fn on_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||||
@@ -158,37 +211,13 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
) -> (N, AD) {
|
) -> (N, AD) {
|
||||||
const MAX_ADDR_PER_PEER: usize = 3;
|
const MAX_ADDR_PER_PEER: usize = 3;
|
||||||
|
|
||||||
let already_connected = validator_ids.iter()
|
// Increment the counter of how many times the validators were requested.
|
||||||
.cloned()
|
validator_ids.iter().for_each(|id| *self.requested_validators.entry(id.clone()).or_default() += 1);
|
||||||
.filter_map(|id| {
|
let already_connected = self.find_connected_validators(&validator_ids, &mut authority_discovery_service).await;
|
||||||
let counter = self.requested_validators.entry(id.clone()).or_default();
|
|
||||||
// if the counter overflows, there is something really wrong going on
|
|
||||||
*counter += 1;
|
|
||||||
|
|
||||||
self.connected_validators
|
|
||||||
.get(&id)
|
|
||||||
.map(|peer| (id, peer.clone()))
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
let on_revoke = |map: &mut HashMap<AuthorityDiscoveryId, u64>, id: AuthorityDiscoveryId| -> Option<AuthorityDiscoveryId> {
|
|
||||||
match map.entry(id) {
|
|
||||||
hash_map::Entry::Occupied(mut entry) => {
|
|
||||||
*entry.get_mut() -= 1;
|
|
||||||
if *entry.get() == 0 {
|
|
||||||
return Some(entry.remove_entry().0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hash_map::Entry::Vacant(_) => {
|
|
||||||
// should be unreachable
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// try to send already connected peers
|
// try to send already connected peers
|
||||||
for (id, peer) in already_connected {
|
for (id, peer) in already_connected.iter() {
|
||||||
match connected.try_send((id, peer)) {
|
match connected.try_send((id.clone(), peer.clone())) {
|
||||||
Err(e) if e.is_disconnected() => {
|
Err(e) if e.is_disconnected() => {
|
||||||
// the request is already revoked
|
// the request is already revoked
|
||||||
for peer_id in validator_ids {
|
for peer_id in validator_ids {
|
||||||
@@ -208,17 +237,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
|
|
||||||
// collect multiaddress of validators
|
// collect multiaddress of validators
|
||||||
let mut multiaddr_to_add = HashSet::new();
|
let mut multiaddr_to_add = HashSet::new();
|
||||||
for authority in validator_ids.iter().cloned() {
|
for authority in validator_ids.iter() {
|
||||||
let result = authority_discovery_service.get_addresses_by_authority_id(authority).await;
|
let result = authority_discovery_service.get_addresses_by_authority_id(authority.clone()).await;
|
||||||
if let Some(addresses) = result {
|
if let Some(addresses) = result {
|
||||||
// We might have several `PeerId`s per `AuthorityId`
|
// We might have several `PeerId`s per `AuthorityId`
|
||||||
// depending on the number of sentry nodes,
|
// depending on the number of sentry nodes,
|
||||||
// so we limit the max number of sentries per node to connect to.
|
// so we limit the max number of sentries per node to connect to.
|
||||||
// They are going to be removed soon though:
|
// They are going to be removed soon though:
|
||||||
// https://github.com/paritytech/substrate/issues/6845
|
// https://github.com/paritytech/substrate/issues/6845
|
||||||
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
|
multiaddr_to_add.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER));
|
||||||
let _ = multiaddr_to_add.insert(addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,9 +273,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
for id in revoked_validators.into_iter() {
|
for id in revoked_validators.into_iter() {
|
||||||
let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
|
let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
|
||||||
if let Some(addresses) = result {
|
if let Some(addresses) = result {
|
||||||
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
|
multiaddr_to_remove.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER));
|
||||||
let _ = multiaddr_to_remove.insert(addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -265,7 +290,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
|
|
||||||
let pending = validator_ids.iter()
|
let pending = validator_ids.iter()
|
||||||
.cloned()
|
.cloned()
|
||||||
.filter(|id| !self.connected_validators.contains_key(id))
|
.filter(|id| !already_connected.contains_key(id))
|
||||||
.collect::<HashSet<_>>();
|
.collect::<HashSet<_>>();
|
||||||
|
|
||||||
self.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new(
|
self.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new(
|
||||||
@@ -278,6 +303,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
(network_service, authority_discovery_service)
|
(network_service, authority_discovery_service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Should be called when a peer connected.
|
||||||
pub async fn on_peer_connected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) {
|
pub async fn on_peer_connected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) {
|
||||||
// check if it's an authority we've been waiting for
|
// check if it's an authority we've been waiting for
|
||||||
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
|
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
|
||||||
@@ -285,15 +311,16 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
|||||||
for request in self.non_revoked_discovery_requests.iter_mut() {
|
for request in self.non_revoked_discovery_requests.iter_mut() {
|
||||||
let _ = request.on_authority_connected(&authority, peer_id);
|
let _ = request.on_authority_connected(&authority, peer_id);
|
||||||
}
|
}
|
||||||
let _ = self.connected_validators.insert(authority, peer_id.clone());
|
|
||||||
|
self.connected_peers.entry(peer_id.clone()).or_default().insert(authority);
|
||||||
|
} else {
|
||||||
|
self.connected_peers.insert(peer_id.clone(), Default::default());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn on_peer_disconnected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) {
|
/// Should be called when a peer disconnected.
|
||||||
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
|
pub fn on_peer_disconnected(&mut self, peer_id: &PeerId) {
|
||||||
if let Some(authority) = maybe_authority {
|
self.connected_peers.remove(peer_id);
|
||||||
let _ = self.connected_validators.remove(&authority);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -319,6 +346,7 @@ mod tests {
|
|||||||
priority_group: HashSet<Multiaddr>,
|
priority_group: HashSet<Multiaddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
struct TestAuthorityDiscovery {
|
struct TestAuthorityDiscovery {
|
||||||
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
|
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
|
||||||
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
|
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
|
||||||
@@ -600,4 +628,39 @@ mod tests {
|
|||||||
assert_eq!(ns.priority_group.len(), 1);
|
assert_eq!(ns.priority_group.len(), 1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A test for when a validator connects, but the authority discovery not yet knows that the connecting node
|
||||||
|
/// is a validator. This can happen for example at startup of a node.
|
||||||
|
#[test]
|
||||||
|
fn handle_validator_connect_without_authority_discovery_knowing_it() {
|
||||||
|
let mut service = new_service();
|
||||||
|
|
||||||
|
let ns = TestNetwork::default();
|
||||||
|
let mut ads = TestAuthorityDiscovery::default();
|
||||||
|
|
||||||
|
let validator_peer_id = PeerId::random();
|
||||||
|
let validator_id: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into();
|
||||||
|
|
||||||
|
futures::executor::block_on(async move {
|
||||||
|
let (sender, mut receiver) = mpsc::channel(1);
|
||||||
|
let (_revoke_tx, revoke_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
service.on_peer_connected(&validator_peer_id, &mut ads).await;
|
||||||
|
|
||||||
|
let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into()));
|
||||||
|
ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone());
|
||||||
|
ads.by_authority_id.insert(validator_id.clone(), address);
|
||||||
|
|
||||||
|
let _ = service.on_request(
|
||||||
|
vec![validator_id.clone()],
|
||||||
|
sender,
|
||||||
|
revoke_rx,
|
||||||
|
ns,
|
||||||
|
ads,
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_eq!((validator_id.clone(), validator_peer_id.clone()), receiver.next().await.unwrap());
|
||||||
|
assert!(service.connected_peers.get(&validator_peer_id).unwrap().contains(&validator_id));
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user