From b5e221f78f1fac1fe3dcd42929f3b6874c2c50f7 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Dec 2019 22:39:50 +0300 Subject: [PATCH] Fixes a flaky test (#675) * Fixes a flaky test * Renames a var * Do not unit the errors in tests --- polkadot/availability-store/src/worker.rs | 38 +++++++++++++++++++---- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/polkadot/availability-store/src/worker.rs b/polkadot/availability-store/src/worker.rs index 10a879582e..9f8011b9b7 100644 --- a/polkadot/availability-store/src/worker.rs +++ b/polkadot/availability-store/src/worker.rs @@ -794,13 +794,19 @@ mod tests { use super::*; use std::time::Duration; use futures::{stream, channel::mpsc, Stream}; - use std::sync::{Arc, Mutex}; + use std::sync::{Arc, Mutex, Condvar}; use std::pin::Pin; use tokio::runtime::Runtime; // Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls. struct TestGossipMessages { - messages: Arc>>>, + messages: Arc, Condvar)>, + mpsc::UnboundedReceiver<(Hash, Hash, ErasureChunk)>, + ), + >>>, } impl ProvideGossipMessages for TestGossipMessages { @@ -808,7 +814,13 @@ mod tests { -> Pin + Send>> { match self.messages.lock().unwrap().remove(&topic) { - Some(receiver) => receiver.boxed(), + Some((pair, receiver)) => { + let (lock, cvar) = &*pair; + let mut consumed = lock.lock().unwrap(); + *consumed = true; + cvar.notify_one(); + receiver.boxed() + }, None => stream::iter(vec![]).boxed(), } } @@ -851,9 +863,10 @@ mod tests { let topic = erasure_coding_topic(relay_parent, erasure_root, local_id); + let pair = Arc::new((Mutex::new(false), Condvar::new())); let messages = TestGossipMessages { messages: Arc::new(Mutex::new(vec![ - (topic, gossip_receiver) + (topic, (pair.clone(), gossip_receiver)) ].into_iter().collect())) }; @@ -961,11 +974,14 @@ mod tests { let topic_1 = erasure_coding_topic(relay_parent, erasure_root_1, local_id); let topic_2 = erasure_coding_topic(relay_parent, erasure_root_2, local_id); + let cvar_pair1 = Arc::new((Mutex::new(false), Condvar::new())); + let cvar_pair2 = Arc::new((Mutex::new(false), Condvar::new())); + let messages = TestGossipMessages { messages: Arc::new(Mutex::new( vec![ - (topic_1, gossip_receiver_1), - (topic_2, gossip_receiver_2), + (topic_1, (cvar_pair1.clone(), gossip_receiver_1)), + (topic_2, (cvar_pair2, gossip_receiver_2)), ].into_iter().collect())) }; @@ -1000,6 +1016,16 @@ mod tests { handle.sender.unbounded_send(listen_msg_1).unwrap(); runtime.block_on(r1).unwrap().unwrap(); + // Here, we are racing against the worker thread that might have not yet + // reached the point when it requests the gossip messages for `topic_2` + // which will get them removed from `TestGossipMessages`. Therefore, the + // `Condvar` is used to wait for that event. + let (lock, cvar1) = &*cvar_pair1; + let mut gossip_stream_consumed = lock.lock().unwrap(); + while !*gossip_stream_consumed { + gossip_stream_consumed = cvar1.wait(gossip_stream_consumed).unwrap(); + } + // The gossip sender taken => listener registered. assert!(!messages.messages.lock().unwrap().contains_key(&topic_1)); }