diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index 216f8c26bf..751e3e76e9 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -40,17 +40,20 @@ pub enum Error { MatchingHashedAuthorityIdWithAuthorityId, /// Failed to set the authority discovery peerset priority group in the peerset module. SettingPeersetPriorityGroup(String), + /// The sender side of the dht event stream has been closed likely due to the network + /// terminating. + DhtEventStreamTerminated, /// Failed to encode a protobuf payload. EncodingProto(prost::EncodeError), /// Failed to decode a protobuf payload. DecodingProto(prost::DecodeError), - /// Failed to encode or decode scale payload + /// Failed to encode or decode scale payload. EncodingDecodingScale(codec::Error), /// Failed to parse a libp2p multi address. ParsingMultiaddress(libp2p::core::multiaddr::Error), - /// Failed to sign using a specific public key + /// Failed to sign using a specific public key. MissingSignature(CryptoTypePublicPair), - /// Failed to sign using all public keys + /// Failed to sign using all public keys. Signing, /// Failed to register Prometheus metric. Prometheus(prometheus_endpoint::PrometheusError), diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 45f05e1039..1a8a5c9f40 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -330,9 +330,9 @@ where } fn handle_dht_events(&mut self, cx: &mut Context) -> Result<()> { - while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) { - match event { - DhtEvent::ValueFound(v) => { + loop { + match self.dht_event_rx.poll_next_unpin(cx) { + Poll::Ready(Some(DhtEvent::ValueFound(v))) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_found"]).inc(); } @@ -347,7 +347,7 @@ where self.handle_dht_value_found_event(v)?; } - DhtEvent::ValueNotFound(hash) => { + Poll::Ready(Some(DhtEvent::ValueNotFound(hash))) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_not_found"]).inc(); } @@ -357,7 +357,7 @@ where "Value for hash '{:?}' not found on Dht.", hash ) }, - DhtEvent::ValuePut(hash) => { + Poll::Ready(Some(DhtEvent::ValuePut(hash))) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_put"]).inc(); } @@ -367,7 +367,7 @@ where "Successfully put hash '{:?}' on Dht.", hash, ) }, - DhtEvent::ValuePutFailed(hash) => { + Poll::Ready(Some(DhtEvent::ValuePutFailed(hash))) => { if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc(); } @@ -377,10 +377,12 @@ where "Failed to put hash '{:?}' on Dht.", hash ) }, + // The sender side of the dht event stream has been closed, likely due to the + // network terminating. + Poll::Ready(None) => return Err(Error::DhtEventStreamTerminated), + Poll::Pending => return Ok(()), } } - - Ok(()) } fn handle_dht_value_found_event( @@ -483,7 +485,6 @@ where } /// Update the peer set 'authority' priority group. - // fn update_peer_set_priority_group(&self) -> Result<()> { let addresses = self.addr_cache.get_subset(); @@ -539,11 +540,18 @@ where match inner() { Ok(()) => {} + + // Handle fatal errors. + // + // Given that the network likely terminated authority discovery should do the same. + Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()), + + // Handle non-fatal errors. Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), }; - // Make sure to always return NotReady as this is a long running task with the same lifetime - // as the node itself. + // Return Poll::Pending as this is a long running task with the same lifetime as the node + // itself. Poll::Pending } } diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index 358376e5db..923b7ee0f2 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -19,6 +19,7 @@ use std::{iter::FromIterator, sync::{Arc, Mutex}}; use futures::channel::mpsc::channel; use futures::executor::block_on; use futures::future::poll_fn; +use futures::poll; use libp2p::{kad, PeerId}; use sp_api::{ProvideRuntimeApi, ApiRef}; @@ -346,3 +347,36 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() { let _ = block_on(poll_fn(f)); } + +#[test] +fn terminate_when_event_stream_terminates() { + let (dht_event_tx, dht_event_rx) = channel(1000); + let network: Arc = Arc::new(Default::default()); + let key_store = KeyStore::new(); + let test_api = Arc::new(TestApi { + authorities: vec![], + }); + + let mut authority_discovery = AuthorityDiscovery::new( + test_api, + network.clone(), + vec![], + key_store, + dht_event_rx.boxed(), + None, + ); + + block_on(async { + assert_eq!(Poll::Pending, poll!(&mut authority_discovery)); + + // Simulate termination of the network through dropping the sender side of the dht event + // channel. + drop(dht_event_tx); + + assert_eq!( + Poll::Ready(()), poll!(&mut authority_discovery), + "Expect the authority discovery module to terminate once the sending side of the dht \ + event channel is terminated.", + ); + }); +}