diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 1a8a5c9f40..956e970f26 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -538,21 +538,19 @@ where Ok(()) }; - match inner() { - Ok(()) => {} + loop { + match inner() { + Ok(()) => return Poll::Pending, - // Handle fatal errors. - // - // Given that the network likely terminated authority discovery should do the same. - Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()), + // Handle fatal errors. + // + // Given that the network likely terminated authority discovery should do the same. + Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()), - // Handle non-fatal errors. - Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), - }; - - // Return Poll::Pending as this is a long running task with the same lifetime as the node - // itself. - Poll::Pending + // Handle non-fatal errors. + Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), + }; + } } } diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index 923b7ee0f2..78ff5f33c2 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -17,8 +17,10 @@ use std::{iter::FromIterator, sync::{Arc, Mutex}}; use futures::channel::mpsc::channel; -use futures::executor::block_on; -use futures::future::poll_fn; +use futures::executor::{block_on, LocalPool}; +use futures::future::{poll_fn, FutureExt}; +use futures::sink::SinkExt; +use futures::task::LocalSpawn; use futures::poll; use libp2p::{kad, PeerId}; @@ -319,7 +321,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() { let mut signed_addresses = vec![]; schema::SignedAuthorityAddresses { addresses: serialized_addresses, - signature: signature, + signature, } .encode(&mut signed_addresses) .unwrap(); @@ -380,3 +382,73 @@ fn terminate_when_event_stream_terminates() { ); }); } + +#[test] +fn dont_stop_polling_when_error_is_returned() { + #[derive(PartialEq, Debug)] + enum Event { + Processed, + End, + }; + + let (mut dht_event_tx, dht_event_rx) = channel(1000); + let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000); + let network: Arc = Arc::new(Default::default()); + let key_store = KeyStore::new(); + let test_api = Arc::new(TestApi { + authorities: vec![], + }); + let mut pool = LocalPool::new(); + + let mut authority_discovery = AuthorityDiscovery::new( + test_api, + network.clone(), + vec![], + key_store, + dht_event_rx.boxed(), + None, + ); + + // Spawn the authority discovery to make sure it is polled independently. + // + // As this is a local pool, only one future at a time will have the CPU and + // can make progress until the future returns `Pending`. + pool.spawner().spawn_local_obj( + futures::future::poll_fn(move |ctx| { + match std::pin::Pin::new(&mut authority_discovery).poll(ctx) { + Poll::Ready(()) => {}, + Poll::Pending => { + discovery_update_tx.send(Event::Processed).now_or_never(); + return Poll::Pending; + }, + } + let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap(); + Poll::Ready(()) + }).boxed_local().into(), + ).expect("Spawns authority discovery"); + + pool.run_until( + // The future that drives the event stream + async { + // Send an event that should generate an error + let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never(); + // Send the same event again to make sure that the event stream needs to be polled twice + // to be woken up again. + let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never(); + + // Now we call `await` and give the control to the authority discovery future. + assert_eq!(Some(Event::Processed), discovery_update_rx.next().await); + + // Drop the event rx to stop the authority discovery. If it was polled correctly, it should + // end properly. + drop(dht_event_tx); + + assert!( + discovery_update_rx.collect::>() + .await + .into_iter() + .any(|evt| evt == Event::End), "The authority should have ended", + ); + } + ); +}