Companion PR for #7247 (incremental priority group updates) (#1800)

* validator discovery: use incremental updates for priority_group

* validator discovery: fix compilation

* validator discovery: remove Sync bound on Net

* "Update Substrate"

Co-authored-by: parity-processbot <>
This commit is contained in:
Andronik Ordian
2020-10-09 16:34:17 +02:00
committed by GitHub
parent bd75a4ce18
commit e62b300f47
3 changed files with 173 additions and 159 deletions
+135 -135
View File
File diff suppressed because it is too large Load Diff
+6 -1
View File
@@ -840,8 +840,13 @@ mod tests {
} }
} }
#[async_trait]
impl validator_discovery::Network for TestNetwork { impl validator_discovery::Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { async fn add_to_priority_group(&mut self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
async fn remove_from_priority_group(&mut self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(()) Ok(())
} }
} }
@@ -31,11 +31,12 @@ use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators"; const PRIORITY_GROUP: &'static str = "parachain_validators";
/// An abstraction over networking for the purposes of validator discovery service. /// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
pub trait Network: Send + 'static { pub trait Network: Send + 'static {
/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group. /// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>; async fn add_to_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
// TODO (ordian): we might want to add `add_to_priority_group` and `remove_from_priority_group` /// Remove the peers from the priority group.
// https://github.com/paritytech/polkadot/issues/1763 async fn remove_from_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
} }
/// An abstraction over the authority discovery service. /// An abstraction over the authority discovery service.
@@ -47,9 +48,14 @@ pub trait AuthorityDiscovery: Send + 'static {
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>; async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
} }
#[async_trait]
impl Network for Arc<sc_network::NetworkService<Block, Hash>> { impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { async fn add_to_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::set_priority_group(&**self, group_id, multiaddresses) sc_network::NetworkService::add_to_priority_group(&**self, group_id, multiaddresses).await
}
async fn remove_from_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_from_priority_group(&**self, group_id, multiaddresses).await
} }
} }
@@ -118,8 +124,6 @@ pub(super) struct Service<N, AD> {
// in the `connected_validators` map. // in the `connected_validators` map.
// Invariant: the value > 0 for non-revoked requests. // Invariant: the value > 0 for non-revoked requests.
requested_validators: HashMap<AuthorityDiscoveryId, u64>, requested_validators: HashMap<AuthorityDiscoveryId, u64>,
// keep for the network priority_group updates
validator_multiaddresses: HashSet<Multiaddr>,
non_revoked_discovery_requests: Vec<NonRevokedConnectionRequestState>, non_revoked_discovery_requests: Vec<NonRevokedConnectionRequestState>,
// PhantomData used to make the struct generic instead of having generic methods // PhantomData used to make the struct generic instead of having generic methods
network: PhantomData<N>, network: PhantomData<N>,
@@ -131,7 +135,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
Self { Self {
connected_validators: HashMap::new(), connected_validators: HashMap::new(),
requested_validators: HashMap::new(), requested_validators: HashMap::new(),
validator_multiaddresses: HashSet::new(),
non_revoked_discovery_requests: Vec::new(), non_revoked_discovery_requests: Vec::new(),
network: PhantomData, network: PhantomData,
authority_discovery: PhantomData, authority_discovery: PhantomData,
@@ -150,7 +153,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids: Vec<AuthorityDiscoveryId>, validator_ids: Vec<AuthorityDiscoveryId>,
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>, revoke: oneshot::Receiver<()>,
network_service: N, mut network_service: N,
mut authority_discovery_service: AD, mut authority_discovery_service: AD,
) -> (N, AD) { ) -> (N, AD) {
const MAX_ADDR_PER_PEER: usize = 3; const MAX_ADDR_PER_PEER: usize = 3;
@@ -204,6 +207,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
} }
// collect multiaddress of validators // collect multiaddress of validators
let mut multiaddr_to_add = HashSet::new();
for authority in validator_ids.iter().cloned() { for authority in validator_ids.iter().cloned() {
let result = authority_discovery_service.get_addresses_by_authority_id(authority).await; let result = authority_discovery_service.get_addresses_by_authority_id(authority).await;
if let Some(addresses) = result { if let Some(addresses) = result {
@@ -213,7 +217,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
// They are going to be removed soon though: // They are going to be removed soon though:
// https://github.com/paritytech/substrate/issues/6845 // https://github.com/paritytech/substrate/issues/6845
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
self.validator_multiaddresses.insert(addr); multiaddr_to_add.insert(addr);
} }
} }
} }
@@ -238,25 +242,26 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
} }
// multiaddresses to remove // multiaddresses to remove
let mut multiaddr_to_remove = HashSet::new();
for id in revoked_validators.into_iter() { for id in revoked_validators.into_iter() {
let result = authority_discovery_service.get_addresses_by_authority_id(id).await; let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
if let Some(addresses) = result { if let Some(addresses) = result {
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
self.validator_multiaddresses.remove(&addr); multiaddr_to_remove.insert(addr);
} }
} }
} }
// ask the network to connect to these nodes and not disconnect // ask the network to connect to these nodes and not disconnect
// from them until removed from the priority group // from them until removed from the priority group
// TODO (ordian): this clones the whole set of multaddresses if let Err(e) = network_service.add_to_priority_group(
// TODO (ordian): use add_to_priority_group for incremental updates?
if let Err(e) = network_service.set_priority_group(
PRIORITY_GROUP.to_owned(), PRIORITY_GROUP.to_owned(),
self.validator_multiaddresses.clone(), multiaddr_to_add,
) { ).await {
log::warn!(target: super::TARGET, "AuthorityDiscoveryService returned an invalid multiaddress: {}", e); log::warn!(target: super::TARGET, "AuthorityDiscoveryService returned an invalid multiaddress: {}", e);
} }
// the addresses are known to be valid
let _ = network_service.remove_from_priority_group(PRIORITY_GROUP.to_owned(), multiaddr_to_remove).await;
let pending = validator_ids.iter() let pending = validator_ids.iter()
.cloned() .cloned()
@@ -311,8 +316,7 @@ mod tests {
#[derive(Default)] #[derive(Default)]
struct TestNetwork { struct TestNetwork {
// Mutex is used because of &self signature of set_priority_group priority_group: HashSet<Multiaddr>,
priority_group: std::sync::Mutex<HashSet<Multiaddr>>,
} }
struct TestAuthorityDiscovery { struct TestAuthorityDiscovery {
@@ -337,10 +341,15 @@ mod tests {
} }
} }
#[async_trait]
impl Network for TestNetwork { impl Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { async fn add_to_priority_group(&mut self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
let mut group = self.priority_group.lock().unwrap(); self.priority_group.extend(multiaddresses.into_iter());
*group = multiaddresses; Ok(())
}
async fn remove_from_priority_group(&mut self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.priority_group.retain(|elem| !multiaddresses.contains(elem));
Ok(()) Ok(())
} }
} }
@@ -570,7 +579,7 @@ mod tests {
let _ = receiver.next().await.unwrap(); let _ = receiver.next().await.unwrap();
assert_eq!(service.non_revoked_discovery_requests.len(), 1); assert_eq!(service.non_revoked_discovery_requests.len(), 1);
assert_eq!(ns.priority_group.lock().unwrap().len(), 2); assert_eq!(ns.priority_group.len(), 2);
// revoke the second request // revoke the second request
revoke_tx.send(()).unwrap(); revoke_tx.send(()).unwrap();
@@ -588,7 +597,7 @@ mod tests {
let _ = receiver.next().await.unwrap(); let _ = receiver.next().await.unwrap();
assert_eq!(service.non_revoked_discovery_requests.len(), 1); assert_eq!(service.non_revoked_discovery_requests.len(), 1);
assert_eq!(ns.priority_group.lock().unwrap().len(), 1); assert_eq!(ns.priority_group.len(), 1);
}); });
} }
} }