Make sure we poll authority event stream until all events are processed (#5608)

* Make sure we poll authority event stream until all events are processed

* Add test
This commit is contained in:
Bastian Köcher
2020-04-11 22:12:38 +02:00
committed by GitHub
parent bd91e58a9a
commit c0219f40d1
2 changed files with 86 additions and 16 deletions
@@ -17,8 +17,10 @@
use std::{iter::FromIterator, sync::{Arc, Mutex}};
use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::executor::{block_on, LocalPool};
use futures::future::{poll_fn, FutureExt};
use futures::sink::SinkExt;
use futures::task::LocalSpawn;
use futures::poll;
use libp2p::{kad, PeerId};
@@ -319,7 +321,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses,
signature: signature,
signature,
}
.encode(&mut signed_addresses)
.unwrap();
@@ -380,3 +382,73 @@ fn terminate_when_event_stream_terminates() {
);
});
}
#[test]
fn dont_stop_polling_when_error_is_returned() {
#[derive(PartialEq, Debug)]
enum Event {
Processed,
End,
};
let (mut dht_event_tx, dht_event_rx) = channel(1000);
let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
});
let mut pool = LocalPool::new();
let mut authority_discovery = AuthorityDiscovery::new(
test_api,
network.clone(),
vec![],
key_store,
dht_event_rx.boxed(),
None,
);
// Spawn the authority discovery to make sure it is polled independently.
//
// As this is a local pool, only one future at a time will have the CPU and
// can make progress until the future returns `Pending`.
pool.spawner().spawn_local_obj(
futures::future::poll_fn(move |ctx| {
match std::pin::Pin::new(&mut authority_discovery).poll(ctx) {
Poll::Ready(()) => {},
Poll::Pending => {
discovery_update_tx.send(Event::Processed).now_or_never();
return Poll::Pending;
},
}
let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap();
Poll::Ready(())
}).boxed_local().into(),
).expect("Spawns authority discovery");
pool.run_until(
// The future that drives the event stream
async {
// Send an event that should generate an error
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
// Send the same event again to make sure that the event stream needs to be polled twice
// to be woken up again.
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
// Now we call `await` and give the control to the authority discovery future.
assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);
// Drop the event rx to stop the authority discovery. If it was polled correctly, it should
// end properly.
drop(dht_event_tx);
assert!(
discovery_update_rx.collect::<Vec<Event>>()
.await
.into_iter()
.any(|evt| evt == Event::End), "The authority should have ended",
);
}
);
}