diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index 751e3e76e9..b1358485c3 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -40,9 +40,6 @@ pub enum Error { MatchingHashedAuthorityIdWithAuthorityId, /// Failed to set the authority discovery peerset priority group in the peerset module. SettingPeersetPriorityGroup(String), - /// The sender side of the dht event stream has been closed likely due to the network - /// terminating. - DhtEventStreamTerminated, /// Failed to encode a protobuf payload. EncodingProto(prost::EncodeError), /// Failed to decode a protobuf payload. diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 8086cd959d..5142dd7259 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -51,7 +51,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use futures::task::{Context, Poll}; -use futures::{Future, FutureExt, Stream, StreamExt}; +use futures::{Future, FutureExt, ready, Stream, StreamExt}; use futures_timer::Delay; use codec::{Decode, Encode}; @@ -80,6 +80,8 @@ mod schema { type Interval = Box + Unpin + Send + Sync>; +const LOG_TARGET: &'static str = "sub-authority-discovery"; + /// Upper bound estimation on how long one should wait before accessing the Kademlia DHT. const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30); @@ -174,7 +176,7 @@ where match Metrics::register(®istry) { Ok(metrics) => Some(metrics), Err(e) => { - error!(target: "sub-authority-discovery", "Failed to register metrics: {:?}", e); + error!(target: LOG_TARGET, "Failed to register metrics: {:?}", e); None }, } @@ -282,10 +284,15 @@ where Ok(()) } - fn handle_dht_events(&mut self, cx: &mut Context) -> Result<()> { + /// Handle incoming Dht events. + /// + /// Returns either: + /// - Poll::Pending when there are no more events to handle or + /// - Poll::Ready(()) when the dht event stream terminated. + fn handle_dht_events(&mut self, cx: &mut Context) -> Poll<()>{ loop { - match self.dht_event_rx.poll_next_unpin(cx) { - Poll::Ready(Some(DhtEvent::ValueFound(v))) => { + match ready!(self.dht_event_rx.poll_next_unpin(cx)) { + Some(DhtEvent::ValueFound(v)) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_found"]).inc(); } @@ -293,47 +300,52 @@ where if log_enabled!(log::Level::Debug) { let hashes = v.iter().map(|(hash, _value)| hash.clone()); debug!( - target: "sub-authority-discovery", + target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes, ); } - self.handle_dht_value_found_event(v)?; + if let Err(e) = self.handle_dht_value_found_event(v) { + error!( + target: LOG_TARGET, + "Failed to handle Dht value found event: {:?}", e, + ); + } } - Poll::Ready(Some(DhtEvent::ValueNotFound(hash))) => { + Some(DhtEvent::ValueNotFound(hash)) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_not_found"]).inc(); } debug!( - target: "sub-authority-discovery", + target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash ) }, - Poll::Ready(Some(DhtEvent::ValuePut(hash))) => { + Some(DhtEvent::ValuePut(hash)) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_put"]).inc(); } debug!( - target: "sub-authority-discovery", + target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash, ) }, - Poll::Ready(Some(DhtEvent::ValuePutFailed(hash))) => { + Some(DhtEvent::ValuePutFailed(hash)) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc(); } warn!( - target: "sub-authority-discovery", + target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash ) }, - // The sender side of the dht event stream has been closed, likely due to the - // network terminating. - Poll::Ready(None) => return Err(Error::DhtEventStreamTerminated), - Poll::Pending => return Ok(()), + None => { + debug!(target: LOG_TARGET, "Dht event stream terminated."); + return Poll::Ready(()); + }, } } } @@ -442,7 +454,7 @@ where let addresses = self.addr_cache.get_subset(); debug!( - target: "sub-authority-discovery", + target: LOG_TARGET, "Applying priority group {:?} to peerset.", addresses, ); self.network @@ -464,46 +476,42 @@ where type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut inner = || -> Result<()> { - // Process incoming events before triggering new ones. - self.handle_dht_events(cx)?; - - if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) { - // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, - // in case one of the function calls within this block do a `return`, we don't call - // `interval.poll` again and thereby the underlying Tokio task is never registered - // with Tokio's Reactor to be woken up on the next interval tick. - while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {} - - self.publish_ext_addresses()?; - } - - if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { - // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, - // in case one of the function calls within this block do a `return`, we don't call - // `interval.poll` again and thereby the underlying Tokio task is never registered - // with Tokio's Reactor to be woken up on the next interval tick. - while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} - - self.request_addresses_of_others()?; - } - - Ok(()) - }; - - loop { - match inner() { - Ok(()) => return Poll::Pending, - - // Handle fatal errors. - // - // Given that the network likely terminated authority discovery should do the same. - Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()), - - // Handle non-fatal errors. - Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), - }; + // Process incoming events. + if let Poll::Ready(()) = self.handle_dht_events(cx) { + // `handle_dht_events` returns `Poll::Ready(())` when the Dht event stream terminated. + // Termination of the Dht event stream implies that the underlying network terminated, + // thus authority discovery should terminate as well. + return Poll::Ready(()); } + + + // Publish own addresses. + if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) { + // Register waker of underlying task for next interval. + while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {} + + if let Err(e) = self.publish_ext_addresses() { + error!( + target: LOG_TARGET, + "Failed to publish external addresses: {:?}", e, + ); + } + } + + // Request addresses of authorities. + if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { + // Register waker of underlying task for next interval. + while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} + + if let Err(e) = self.request_addresses_of_others() { + error!( + target: LOG_TARGET, + "Failed to request addresses of authorities: {:?}", e, + ); + } + } + + Poll::Pending } } diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index 78ff5f33c2..79c50818c4 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -286,6 +286,7 @@ fn request_addresses_of_others_triggers_dht_get_query() { #[test] fn handle_dht_events_with_value_found_should_call_set_priority_group() { let _ = ::env_logger::try_init(); + // Create authority discovery. let (mut dht_event_tx, dht_event_rx) = channel(1000); @@ -331,7 +332,9 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() { // Make authority discovery handle the event. let f = |cx: &mut Context<'_>| -> Poll<()> { - authority_discovery.handle_dht_events(cx).unwrap(); + if let Poll::Ready(e) = authority_discovery.handle_dht_events(cx) { + panic!("Unexpected error: {:?}", e); + } // Expect authority discovery to set the priority set. assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); @@ -439,15 +442,16 @@ fn dont_stop_polling_when_error_is_returned() { // Now we call `await` and give the control to the authority discovery future. assert_eq!(Some(Event::Processed), discovery_update_rx.next().await); - // Drop the event rx to stop the authority discovery. If it was polled correctly, it should - // end properly. + // Drop the event rx to stop the authority discovery. If it was polled correctly, it + // should end properly. drop(dht_event_tx); assert!( discovery_update_rx.collect::>() .await .into_iter() - .any(|evt| evt == Event::End), "The authority should have ended", + .any(|evt| evt == Event::End), + "The authority discovery should have ended", ); } );