Fix the race. (#1081)

This commit is contained in:
Sergey Pepyakin
2018-11-09 14:31:53 +01:00
committed by Gav Wood
parent 0ab26a9c4f
commit 5cbbd2284d
+22 -12
View File
@@ -166,8 +166,12 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
fn stop(&self) {
self.clear();
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();
{
// Perform storing the stop flag and signalling under a single lock.
let _queue_lock = self.data.queue.lock();
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();
}
let _ = handle.join();
}
@@ -220,12 +224,15 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>(
) {
trace!(target: "sync", "Starting import thread");
loop {
if qdata.is_stopping.load(Ordering::SeqCst) {
break;
}
let new_blocks = {
let mut queue_lock = qdata.queue.lock();
// We are holding the same lock that `stop` takes so here we either see that stop flag
// is active or wait for the signal. The latter one unlocks the mutex and this gives a chance
// to `stop` to generate the signal.
if qdata.is_stopping.load(Ordering::SeqCst) {
break;
}
if queue_lock.is_empty() {
qdata.signal.wait(&mut queue_lock);
}
@@ -779,11 +786,14 @@ pub mod tests {
#[test]
fn async_import_queue_drops() {
let verifier = Arc::new(PassThroughVerifier(true));
let queue = BasicQueue::new(verifier);
let service = Arc::new(DummyExecutor);
let chain = Arc::new(test_client::new());
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
drop(queue);
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = Arc::new(PassThroughVerifier(true));
let queue = BasicQueue::new(verifier);
let service = Arc::new(DummyExecutor);
let chain = Arc::new(test_client::new());
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
drop(queue);
}
}
}