From 5cbbd2284dffdf52c714ce4a203aa8b5d9ea1beb Mon Sep 17 00:00:00 2001 From: Sergey Pepyakin Date: Fri, 9 Nov 2018 14:31:53 +0100 Subject: [PATCH] Fix the race. (#1081) --- substrate/core/network/src/import_queue.rs | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/substrate/core/network/src/import_queue.rs b/substrate/core/network/src/import_queue.rs index 955dc12321..b31dee6310 100644 --- a/substrate/core/network/src/import_queue.rs +++ b/substrate/core/network/src/import_queue.rs @@ -166,8 +166,12 @@ impl> ImportQueue for BasicQueue { fn stop(&self) { self.clear(); if let Some(handle) = self.handle.lock().take() { - self.data.is_stopping.store(true, Ordering::SeqCst); - self.data.signal.notify_one(); + { + // Perform storing the stop flag and signalling under a single lock. + let _queue_lock = self.data.queue.lock(); + self.data.is_stopping.store(true, Ordering::SeqCst); + self.data.signal.notify_one(); + } let _ = handle.join(); } @@ -220,12 +224,15 @@ fn import_thread, V: Verifier>( ) { trace!(target: "sync", "Starting import thread"); loop { - if qdata.is_stopping.load(Ordering::SeqCst) { - break; - } - let new_blocks = { let mut queue_lock = qdata.queue.lock(); + + // We are holding the same lock that `stop` takes so here we either see that stop flag + // is active or wait for the signal. The latter one unlocks the mutex and this gives a chance + // to `stop` to generate the signal. + if qdata.is_stopping.load(Ordering::SeqCst) { + break; + } if queue_lock.is_empty() { qdata.signal.wait(&mut queue_lock); } @@ -779,11 +786,14 @@ pub mod tests { #[test] fn async_import_queue_drops() { - let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier); - let service = Arc::new(DummyExecutor); - let chain = Arc::new(test_client::new()); - queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak>).unwrap(); - drop(queue); + // Perform this test multiple times since it exhibits non-deterministic behavior. + for _ in 0..100 { + let verifier = Arc::new(PassThroughVerifier(true)); + let queue = BasicQueue::new(verifier); + let service = Arc::new(DummyExecutor); + let chain = Arc::new(test_client::new()); + queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak>).unwrap(); + drop(queue); + } } }