Properly remove peers from sets and merge the two Network traits (#2821)

* Properly remove peers from sets

* Actually rename all, I guess

* Merge the two Network traits

* Rename function

* Update node/network/bridge/src/network.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* Fix erroneous change

* Update node/network/bridge/src/network.rs

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Pierre Krieger
2021-04-05 21:46:39 +02:00
committed by GitHub
parent 2ff5c9b995
commit fa0142ac8f
3 changed files with 55 additions and 46 deletions
@@ -16,10 +16,10 @@
//! A validator discovery service for the Network Bridge.
use crate::Network;
use core::marker::PhantomData;
use std::borrow::Cow;
use std::collections::{HashSet, HashMap, hash_map};
use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::mpsc;
@@ -27,20 +27,11 @@ use futures::channel::mpsc;
use sc_network::{config::parse_addr, multiaddr::Multiaddr};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
const LOG_TARGET: &str = "parachain::validator-discovery";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
pub trait Network: Send + 'static {
/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Remove the peers from the priority group.
async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
}
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + 'static {
@@ -50,17 +41,6 @@ pub trait AuthorityDiscovery: Send + 'static {
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}
#[async_trait]
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}
async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol, multiaddresses)
}
}
#[async_trait]
impl AuthorityDiscovery for AuthorityDiscoveryService {
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
@@ -300,14 +280,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service.add_peers_to_reserved_set(
if let Err(e) = network_service.add_to_peers_set(
peer_set.into_protocol_name(),
multiaddr_to_add.clone(),
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service.remove_peers_from_reserved_set(
let _ = network_service.remove_from_peers_set(
peer_set.into_protocol_name(),
multiaddr_to_remove.clone()
).await;
@@ -357,12 +337,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
#[cfg(test)]
mod tests {
use super::*;
use crate::network::{Network, NetworkAction};
use futures::stream::StreamExt as _;
use std::{borrow::Cow, pin::Pin};
use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}};
use sc_network::multiaddr::Protocol;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
@@ -372,7 +354,7 @@ mod tests {
(TestNetwork::default(), TestAuthorityDiscovery::new())
}
#[derive(Default)]
#[derive(Default, Clone)]
struct TestNetwork {
peers_set: HashSet<Multiaddr>,
}
@@ -402,15 +384,28 @@ mod tests {
#[async_trait]
impl Network for TestNetwork {
async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
panic!()
}
async fn add_to_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.peers_set.extend(multiaddresses.into_iter());
Ok(())
}
async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.peers_set.retain(|elem| !multiaddresses.contains(elem));
Ok(())
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = polkadot_subsystem::SubsystemError> + Send + 'a>>
{
panic!()
}
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
}
}
#[async_trait]