client/authority-discovery: Terminate when network does (#5562)

* client/authority-discovery: Add test ensure termination on network termi

* client/authority-discovery: Terminate when network does

When the dht event stream returns Poll::Ready(None) it is likely due to
the network terminating. When the network terminates due to the node
itself shutting down or due to a fatal error, there is no purpose in
continuing to run the authority discovery module.

* client/authority-discovery/src/lib: Apply suggestions

Co-Authored-By: André Silva <123550+andresilva@users.noreply.github.com>

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Max Inden
2020-04-09 10:34:51 +02:00
committed by GitHub
parent 8576937f44
commit 5913969f8d
3 changed files with 59 additions and 14 deletions
@@ -40,17 +40,20 @@ pub enum Error {
MatchingHashedAuthorityIdWithAuthorityId,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// The sender side of the dht event stream has been closed likely due to the network
/// terminating.
DhtEventStreamTerminated,
/// Failed to encode a protobuf payload.
EncodingProto(prost::EncodeError),
/// Failed to decode a protobuf payload.
DecodingProto(prost::DecodeError),
/// Failed to encode or decode scale payload
/// Failed to encode or decode scale payload.
EncodingDecodingScale(codec::Error),
/// Failed to parse a libp2p multi address.
ParsingMultiaddress(libp2p::core::multiaddr::Error),
/// Failed to sign using a specific public key
/// Failed to sign using a specific public key.
MissingSignature(CryptoTypePublicPair),
/// Failed to sign using all public keys
/// Failed to sign using all public keys.
Signing,
/// Failed to register Prometheus metric.
Prometheus(prometheus_endpoint::PrometheusError),
+19 -11
View File
@@ -330,9 +330,9 @@ where
}
fn handle_dht_events(&mut self, cx: &mut Context) -> Result<()> {
while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) {
match event {
DhtEvent::ValueFound(v) => {
loop {
match self.dht_event_rx.poll_next_unpin(cx) {
Poll::Ready(Some(DhtEvent::ValueFound(v))) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
@@ -347,7 +347,7 @@ where
self.handle_dht_value_found_event(v)?;
}
DhtEvent::ValueNotFound(hash) => {
Poll::Ready(Some(DhtEvent::ValueNotFound(hash))) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
@@ -357,7 +357,7 @@ where
"Value for hash '{:?}' not found on Dht.", hash
)
},
DhtEvent::ValuePut(hash) => {
Poll::Ready(Some(DhtEvent::ValuePut(hash))) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
@@ -367,7 +367,7 @@ where
"Successfully put hash '{:?}' on Dht.", hash,
)
},
DhtEvent::ValuePutFailed(hash) => {
Poll::Ready(Some(DhtEvent::ValuePutFailed(hash))) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
@@ -377,10 +377,12 @@ where
"Failed to put hash '{:?}' on Dht.", hash
)
},
// The sender side of the dht event stream has been closed, likely due to the
// network terminating.
Poll::Ready(None) => return Err(Error::DhtEventStreamTerminated),
Poll::Pending => return Ok(()),
}
}
Ok(())
}
fn handle_dht_value_found_event(
@@ -483,7 +485,6 @@ where
}
/// Update the peer set 'authority' priority group.
//
fn update_peer_set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_subset();
@@ -539,11 +540,18 @@ where
match inner() {
Ok(()) => {}
// Handle fatal errors.
//
// Given that the network likely terminated authority discovery should do the same.
Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()),
// Handle non-fatal errors.
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
// Make sure to always return NotReady as this is a long running task with the same lifetime
// as the node itself.
// Return Poll::Pending as this is a long running task with the same lifetime as the node
// itself.
Poll::Pending
}
}
@@ -19,6 +19,7 @@ use std::{iter::FromIterator, sync::{Arc, Mutex}};
use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::poll;
use libp2p::{kad, PeerId};
use sp_api::{ProvideRuntimeApi, ApiRef};
@@ -346,3 +347,36 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
let _ = block_on(poll_fn(f));
}
#[test]
fn terminate_when_event_stream_terminates() {
let (dht_event_tx, dht_event_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
});
let mut authority_discovery = AuthorityDiscovery::new(
test_api,
network.clone(),
vec![],
key_store,
dht_event_rx.boxed(),
None,
);
block_on(async {
assert_eq!(Poll::Pending, poll!(&mut authority_discovery));
// Simulate termination of the network through dropping the sender side of the dht event
// channel.
drop(dht_event_tx);
assert_eq!(
Poll::Ready(()), poll!(&mut authority_discovery),
"Expect the authority discovery module to terminate once the sending side of the dht \
event channel is terminated.",
);
});
}