mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 13:51:11 +00:00
fixed AsyncImportQueue drop deadlock (#342)
This commit is contained in:
committed by
Arkadiy Paronyan
parent
e75d7d8fda
commit
91cb0fcfef
@@ -140,6 +140,8 @@ impl<B: BlockT> Drop for AsyncImportQueue<B> {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(handle) = self.handle.lock().take() {
|
if let Some(handle) = self.handle.lock().take() {
|
||||||
self.data.is_stopping.store(true, Ordering::SeqCst);
|
self.data.is_stopping.store(true, Ordering::SeqCst);
|
||||||
|
self.data.signal.notify_one();
|
||||||
|
|
||||||
let _ = handle.join();
|
let _ = handle.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -148,8 +150,11 @@ impl<B: BlockT> Drop for AsyncImportQueue<B> {
|
|||||||
/// Blocks import thread.
|
/// Blocks import thread.
|
||||||
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>, qdata: Arc<AsyncImportQueueData<B>>) {
|
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>, qdata: Arc<AsyncImportQueueData<B>>) {
|
||||||
trace!(target: "sync", "Starting import thread");
|
trace!(target: "sync", "Starting import thread");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if qdata.is_stopping.load(Ordering::SeqCst) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
let new_blocks = {
|
let new_blocks = {
|
||||||
let mut queue_lock = qdata.queue.lock();
|
let mut queue_lock = qdata.queue.lock();
|
||||||
if queue_lock.is_empty() {
|
if queue_lock.is_empty() {
|
||||||
@@ -161,9 +166,6 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<
|
|||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if qdata.is_stopping.load(Ordering::SeqCst) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
|
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
|
||||||
(Some(sync), Some(service), Some(chain)) => {
|
(Some(sync), Some(service), Some(chain)) => {
|
||||||
@@ -417,6 +419,7 @@ pub mod tests {
|
|||||||
use message;
|
use message;
|
||||||
use test_client::{self, TestClient};
|
use test_client::{self, TestClient};
|
||||||
use test_client::runtime::{Block, Hash};
|
use test_client::runtime::{Block, Hash};
|
||||||
|
use on_demand::tests::DummyExecutor;
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Blocks import queue that is importing blocks in the same thread.
|
/// Blocks import queue that is importing blocks in the same thread.
|
||||||
@@ -563,4 +566,13 @@ pub mod tests {
|
|||||||
qdata.is_stopping.store(true, Ordering::SeqCst);
|
qdata.is_stopping.store(true, Ordering::SeqCst);
|
||||||
assert!(!import_many_blocks(&mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block])));
|
assert!(!import_many_blocks(&mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block])));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn async_import_queue_drops() {
|
||||||
|
let queue = AsyncImportQueue::new();
|
||||||
|
let service = Arc::new(DummyExecutor);
|
||||||
|
let chain = Arc::new(test_client::new());
|
||||||
|
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
|
||||||
|
drop(queue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ impl<Block: BlockT> Request<Block> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
pub mod tests {
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@@ -331,7 +331,7 @@ mod tests {
|
|||||||
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
|
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
|
||||||
use test_client::runtime::{Block, Hash};
|
use test_client::runtime::{Block, Hash};
|
||||||
|
|
||||||
struct DummyExecutor;
|
pub struct DummyExecutor;
|
||||||
struct DummyFetchChecker { ok: bool }
|
struct DummyFetchChecker { ok: bool }
|
||||||
|
|
||||||
impl ExecuteInContext<Block> for DummyExecutor {
|
impl ExecuteInContext<Block> for DummyExecutor {
|
||||||
|
|||||||
Reference in New Issue
Block a user