mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-30 09:37:55 +00:00
validator-discovery: don't remove multiaddr of requested PeerIds (#4036)
* validator-discovery: remove from peer set before inserting * bump spec versions * rework into a companion * fmt * fix * fix * one more time * one more try * one more try * Revert "one more try" This reverts commit ab6568d3b828a33dc06f5650037597fc88dd06b1. * one more try * one more try * Revert "one more try" This reverts commit 8d7369f7b78633bd1b1c5ba3e0f2a0544bdd77a5. * fix a warning * fix another warn * correct log * fix compilation * ffs * less cloning * Apply suggestions from code review Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * add comments and a small refactoring * use set_reserved_peers * cargo update -p sp-io * rename added to num_peers * update Substrate Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: parity-processbot <>
This commit is contained in:
@@ -23,10 +23,13 @@ use std::collections::HashSet;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
|
||||
use sc_network::multiaddr::Multiaddr;
|
||||
use sc_network::multiaddr::{self, Multiaddr};
|
||||
|
||||
pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
|
||||
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
|
||||
use polkadot_node_network_protocol::{
|
||||
peer_set::{PeerSet, PerPeerSet},
|
||||
PeerId,
|
||||
};
|
||||
use polkadot_primitives::v1::AuthorityDiscoveryId;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::validator-discovery";
|
||||
@@ -39,7 +42,7 @@ pub(super) struct Service<N, AD> {
|
||||
|
||||
#[derive(Default)]
|
||||
struct StatePerPeerSet {
|
||||
previously_requested: HashSet<Multiaddr>,
|
||||
previously_requested: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
@@ -47,7 +50,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
Self { state: Default::default(), _phantom: PhantomData }
|
||||
}
|
||||
|
||||
/// Connect to already resolved addresses:
|
||||
/// Connect to already resolved addresses.
|
||||
pub async fn on_resolved_request(
|
||||
&mut self,
|
||||
newly_requested: HashSet<Multiaddr>,
|
||||
@@ -55,31 +58,32 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
mut network_service: N,
|
||||
) -> N {
|
||||
let state = &mut self.state[peer_set];
|
||||
// clean up revoked requests
|
||||
let multiaddr_to_remove: HashSet<_> =
|
||||
state.previously_requested.difference(&newly_requested).cloned().collect();
|
||||
let multiaddr_to_add: HashSet<_> =
|
||||
newly_requested.difference(&state.previously_requested).cloned().collect();
|
||||
state.previously_requested = newly_requested;
|
||||
let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
|
||||
let num_peers = new_peer_ids.len();
|
||||
|
||||
let peers_to_remove: Vec<PeerId> =
|
||||
state.previously_requested.difference(&new_peer_ids).cloned().collect();
|
||||
let removed = peers_to_remove.len();
|
||||
state.previously_requested = new_peer_ids;
|
||||
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
?peer_set,
|
||||
added = multiaddr_to_add.len(),
|
||||
removed = multiaddr_to_remove.len(),
|
||||
?num_peers,
|
||||
?removed,
|
||||
"New ConnectToValidators resolved request",
|
||||
);
|
||||
// ask the network to connect to these nodes and not disconnect
|
||||
// from them until removed from the set
|
||||
if let Err(e) = network_service
|
||||
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
|
||||
.set_reserved_peers(peer_set.into_protocol_name(), newly_requested)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
|
||||
}
|
||||
// the addresses are known to be valid
|
||||
let _ = network_service
|
||||
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
|
||||
.remove_from_peers_set(peer_set.into_protocol_name(), peers_to_remove)
|
||||
.await;
|
||||
|
||||
network_service
|
||||
@@ -136,6 +140,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_peer_ids(multiaddr: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
|
||||
multiaddr
|
||||
.filter_map(|mut addr| match addr.pop() {
|
||||
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -158,7 +171,7 @@ mod tests {
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct TestNetwork {
|
||||
peers_set: HashSet<Multiaddr>,
|
||||
peers_set: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
@@ -171,9 +184,14 @@ mod tests {
|
||||
fn new() -> Self {
|
||||
let peer_ids = known_peer_ids();
|
||||
let authorities = known_authorities();
|
||||
let multiaddr = known_multiaddr();
|
||||
let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
|
||||
|(mut addr, peer_id)| {
|
||||
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
|
||||
addr
|
||||
},
|
||||
);
|
||||
Self {
|
||||
by_authority_id: authorities.iter().cloned().zip(multiaddr.into_iter()).collect(),
|
||||
by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
|
||||
by_peer_id: peer_ids.into_iter().zip(authorities.into_iter()).collect(),
|
||||
}
|
||||
}
|
||||
@@ -185,22 +203,21 @@ mod tests {
|
||||
panic!()
|
||||
}
|
||||
|
||||
async fn add_to_peers_set(
|
||||
async fn set_reserved_peers(
|
||||
&mut self,
|
||||
_protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
self.peers_set.extend(multiaddresses.into_iter());
|
||||
self.peers_set = extract_peer_ids(multiaddresses.into_iter());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_from_peers_set(
|
||||
&mut self,
|
||||
_protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
self.peers_set.retain(|elem| !multiaddresses.contains(elem));
|
||||
Ok(())
|
||||
peers: Vec<PeerId>,
|
||||
) {
|
||||
self.peers_set.retain(|elem| !peers.contains(elem));
|
||||
}
|
||||
|
||||
async fn start_request<AD: AuthorityDiscovery>(
|
||||
@@ -281,9 +298,14 @@ mod tests {
|
||||
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert_eq!(state.previously_requested.len(), 1);
|
||||
assert!(state
|
||||
.previously_requested
|
||||
.contains(ads.by_authority_id.get(&authority_ids[1]).unwrap()));
|
||||
let peer_1 = extract_peer_ids(
|
||||
vec![ads.by_authority_id.get(&authority_ids[1]).unwrap().clone()].into_iter(),
|
||||
)
|
||||
.iter()
|
||||
.cloned()
|
||||
.next()
|
||||
.unwrap();
|
||||
assert!(state.previously_requested.contains(&peer_1));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -310,9 +332,14 @@ mod tests {
|
||||
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert_eq!(state.previously_requested.len(), 1);
|
||||
assert!(state
|
||||
.previously_requested
|
||||
.contains(ads.by_authority_id.get(&authority_ids[0]).unwrap()));
|
||||
let peer_0 = extract_peer_ids(
|
||||
vec![ads.by_authority_id.get(&authority_ids[0]).unwrap().clone()].into_iter(),
|
||||
)
|
||||
.iter()
|
||||
.cloned()
|
||||
.next()
|
||||
.unwrap();
|
||||
assert!(state.previously_requested.contains(&peer_0));
|
||||
|
||||
let failed = failed_rx.await.unwrap();
|
||||
assert_eq!(failed, 1);
|
||||
|
||||
Reference in New Issue
Block a user