cleanup validator discovery (#1992)

* use snake_case for log targets

* remove unused continue

* validator_discovery: when disconnecting, use all addresses

* validator_discovery: simplify request revokation

* fix a typo
This commit is contained in:
Andronik Ordian
2020-11-20 19:34:57 +01:00
committed by GitHub
parent 0a5bc82529
commit 97f5bd9047
9 changed files with 33 additions and 128 deletions
@@ -21,7 +21,7 @@ use std::collections::{HashSet, HashMap, hash_map};
use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use sc_network::multiaddr::{Multiaddr, Protocol};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
@@ -29,7 +29,7 @@ use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators";
const LOG_TARGET: &str = "ValidatorDiscovery";
const LOG_TARGET: &str = "validator_discovery";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
@@ -76,7 +76,6 @@ struct NonRevokedConnectionRequestState {
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
}
impl NonRevokedConnectionRequestState {
@@ -85,13 +84,11 @@ impl NonRevokedConnectionRequestState {
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
) -> Self {
Self {
requested,
pending,
sender,
revoke,
}
}
@@ -105,9 +102,7 @@ impl NonRevokedConnectionRequestState {
/// Returns `true` if the request is revoked.
pub fn is_revoked(&mut self) -> bool {
self.revoke
.try_recv()
.map_or(true, |r| r.is_some())
self.sender.is_closed()
}
pub fn requested(&self) -> &[AuthorityDiscoveryId] {
@@ -187,7 +182,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
if let Some(ids) = self.connected_peers.get_mut(&peer_id) {
ids.insert(id.clone());
result.insert(id.clone(), peer_id.clone());
continue;
}
}
}
@@ -203,12 +197,11 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
/// This method will also clean up all previously revoked requests.
/// it takes `network_service` and `authority_discovery_service` by value
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
#[tracing::instrument(level = "trace", skip(self, connected, revoke, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
pub async fn on_request(
&mut self,
validator_ids: Vec<AuthorityDiscoveryId>,
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
mut network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
@@ -276,7 +269,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
for id in revoked_validators.into_iter() {
let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
if let Some(addresses) = result {
multiaddr_to_remove.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER));
multiaddr_to_remove.extend(addresses.into_iter());
}
}
@@ -300,7 +293,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids,
pending,
connected,
revoke,
));
(network_service, authority_discovery_service)
@@ -418,39 +410,18 @@ mod tests {
}
#[test]
fn request_is_revoked_on_send() {
let (revoke_tx, revoke_rx) = oneshot::channel();
let (sender, _receiver) = mpsc::channel(0);
fn request_is_revoked_when_the_receiver_is_dropped() {
let (sender, receiver) = mpsc::channel(0);
let mut request = NonRevokedConnectionRequestState::new(
Vec::new(),
HashSet::new(),
sender,
revoke_rx,
);
assert!(!request.is_revoked());
revoke_tx.send(()).unwrap();
assert!(request.is_revoked());
}
#[test]
fn request_is_revoked_when_the_sender_is_dropped() {
let (revoke_tx, revoke_rx) = oneshot::channel();
let (sender, _receiver) = mpsc::channel(0);
let mut request = NonRevokedConnectionRequestState::new(
Vec::new(),
HashSet::new(),
sender,
revoke_rx,
);
assert!(!request.is_revoked());
drop(revoke_tx);
drop(receiver);
assert!(request.is_revoked());
}
@@ -467,14 +438,12 @@ mod tests {
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let (_revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
let _ = service.on_request(
req1,
sender,
revoke_rx,
ns,
ads,
).await;
@@ -499,12 +468,10 @@ mod tests {
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let (_, mut ads) = service.on_request(
req1,
sender,
revoke_rx,
ns,
ads,
).await;
@@ -534,7 +501,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
service.on_peer_connected(&peer_ids[1], &mut ads).await;
@@ -542,22 +508,19 @@ mod tests {
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let _ = service.on_request(
vec![authority_ids[1].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
@@ -581,7 +544,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
service.on_peer_connected(&peer_ids[1], &mut ads).await;
@@ -589,22 +551,19 @@ mod tests {
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[2].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the first request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[1].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
@@ -614,15 +573,13 @@ mod tests {
assert_eq!(ns.priority_group.len(), 2);
// revoke the second request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let (ns, _) = service.on_request(
vec![authority_ids[0].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
@@ -647,7 +604,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&validator_peer_id, &mut ads).await;
@@ -658,7 +614,6 @@ mod tests {
let _ = service.on_request(
vec![validator_id.clone()],
sender,
revoke_rx,
ns,
ads,
).await;