client/authority-discovery: Rework error handling (#5631)

* client/authority-discovery: Rework error handling

Instead of `handle_dht_events` returning a `Result<(), Error>`, return
a `Poll<Error>` where `Poll::Pending` signals that there are no more
events to handle and `Poll::Ready(Error)` signals that a fatal error
occured. Non fatal errors are handled within `handle_dht_events`
directly, thus looping in `poll` is not necessary anymore.

* client/authority-discovery: Return () instead of error on termiantion

* Update client/authority-discovery/src/lib.rs

Co-Authored-By: André Silva <123550+andresilva@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Max Inden
2020-04-16 17:09:13 +02:00
committed by GitHub
parent 239d0998ea
commit 6bd93825ca
3 changed files with 73 additions and 64 deletions
@@ -40,9 +40,6 @@ pub enum Error {
MatchingHashedAuthorityIdWithAuthorityId,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// The sender side of the dht event stream has been closed likely due to the network
/// terminating.
DhtEventStreamTerminated,
/// Failed to encode a protobuf payload.
EncodingProto(prost::EncodeError),
/// Failed to decode a protobuf payload.
+65 -57
View File
@@ -51,7 +51,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::task::{Context, Poll};
use futures::{Future, FutureExt, Stream, StreamExt};
use futures::{Future, FutureExt, ready, Stream, StreamExt};
use futures_timer::Delay;
use codec::{Decode, Encode};
@@ -80,6 +80,8 @@ mod schema {
type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;
const LOG_TARGET: &'static str = "sub-authority-discovery";
/// Upper bound estimation on how long one should wait before accessing the Kademlia DHT.
const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
@@ -174,7 +176,7 @@ where
match Metrics::register(&registry) {
Ok(metrics) => Some(metrics),
Err(e) => {
error!(target: "sub-authority-discovery", "Failed to register metrics: {:?}", e);
error!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
None
},
}
@@ -282,10 +284,15 @@ where
Ok(())
}
fn handle_dht_events(&mut self, cx: &mut Context) -> Result<()> {
/// Handle incoming Dht events.
///
/// Returns either:
/// - Poll::Pending when there are no more events to handle or
/// - Poll::Ready(()) when the dht event stream terminated.
fn handle_dht_events(&mut self, cx: &mut Context) -> Poll<()>{
loop {
match self.dht_event_rx.poll_next_unpin(cx) {
Poll::Ready(Some(DhtEvent::ValueFound(v))) => {
match ready!(self.dht_event_rx.poll_next_unpin(cx)) {
Some(DhtEvent::ValueFound(v)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
@@ -293,47 +300,52 @@ where
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(
target: "sub-authority-discovery",
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
);
}
self.handle_dht_value_found_event(v)?;
if let Err(e) = self.handle_dht_value_found_event(v) {
error!(
target: LOG_TARGET,
"Failed to handle Dht value found event: {:?}", e,
);
}
}
Poll::Ready(Some(DhtEvent::ValueNotFound(hash))) => {
Some(DhtEvent::ValueNotFound(hash)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
debug!(
target: "sub-authority-discovery",
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
},
Poll::Ready(Some(DhtEvent::ValuePut(hash))) => {
Some(DhtEvent::ValuePut(hash)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
debug!(
target: "sub-authority-discovery",
target: LOG_TARGET,
"Successfully put hash '{:?}' on Dht.", hash,
)
},
Poll::Ready(Some(DhtEvent::ValuePutFailed(hash))) => {
Some(DhtEvent::ValuePutFailed(hash)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
warn!(
target: "sub-authority-discovery",
target: LOG_TARGET,
"Failed to put hash '{:?}' on Dht.", hash
)
},
// The sender side of the dht event stream has been closed, likely due to the
// network terminating.
Poll::Ready(None) => return Err(Error::DhtEventStreamTerminated),
Poll::Pending => return Ok(()),
None => {
debug!(target: LOG_TARGET, "Dht event stream terminated.");
return Poll::Ready(());
},
}
}
}
@@ -442,7 +454,7 @@ where
let addresses = self.addr_cache.get_subset();
debug!(
target: "sub-authority-discovery",
target: LOG_TARGET,
"Applying priority group {:?} to peerset.", addresses,
);
self.network
@@ -464,46 +476,42 @@ where
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = || -> Result<()> {
// Process incoming events before triggering new ones.
self.handle_dht_events(cx)?;
if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise,
// in case one of the function calls within this block do a `return`, we don't call
// `interval.poll` again and thereby the underlying Tokio task is never registered
// with Tokio's Reactor to be woken up on the next interval tick.
while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
self.publish_ext_addresses()?;
}
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise,
// in case one of the function calls within this block do a `return`, we don't call
// `interval.poll` again and thereby the underlying Tokio task is never registered
// with Tokio's Reactor to be woken up on the next interval tick.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
self.request_addresses_of_others()?;
}
Ok(())
};
loop {
match inner() {
Ok(()) => return Poll::Pending,
// Handle fatal errors.
//
// Given that the network likely terminated authority discovery should do the same.
Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()),
// Handle non-fatal errors.
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
// Process incoming events.
if let Poll::Ready(()) = self.handle_dht_events(cx) {
// `handle_dht_events` returns `Poll::Ready(())` when the Dht event stream terminated.
// Termination of the Dht event stream implies that the underlying network terminated,
// thus authority discovery should terminate as well.
return Poll::Ready(());
}
// Publish own addresses.
if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
if let Err(e) = self.publish_ext_addresses() {
error!(
target: LOG_TARGET,
"Failed to publish external addresses: {:?}", e,
);
}
}
// Request addresses of authorities.
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
if let Err(e) = self.request_addresses_of_others() {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
);
}
}
Poll::Pending
}
}
@@ -286,6 +286,7 @@ fn request_addresses_of_others_triggers_dht_get_query() {
#[test]
fn handle_dht_events_with_value_found_should_call_set_priority_group() {
let _ = ::env_logger::try_init();
// Create authority discovery.
let (mut dht_event_tx, dht_event_rx) = channel(1000);
@@ -331,7 +332,9 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
// Make authority discovery handle the event.
let f = |cx: &mut Context<'_>| -> Poll<()> {
authority_discovery.handle_dht_events(cx).unwrap();
if let Poll::Ready(e) = authority_discovery.handle_dht_events(cx) {
panic!("Unexpected error: {:?}", e);
}
// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
@@ -439,15 +442,16 @@ fn dont_stop_polling_when_error_is_returned() {
// Now we call `await` and give the control to the authority discovery future.
assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);
// Drop the event rx to stop the authority discovery. If it was polled correctly, it should
// end properly.
// Drop the event rx to stop the authority discovery. If it was polled correctly, it
// should end properly.
drop(dht_event_tx);
assert!(
discovery_update_rx.collect::<Vec<Event>>()
.await
.into_iter()
.any(|evt| evt == Event::End), "The authority should have ended",
.any(|evt| evt == Event::End),
"The authority discovery should have ended",
);
}
);