explicit stop of AsyncImportThread (#360)

This commit is contained in:
Svyatoslav Nikolsky
2018-07-18 13:06:50 +03:00
committed by Gav Wood
parent 7ce2a8552f
commit 7b8463aaf1
4 changed files with 31 additions and 8 deletions
@@ -37,6 +37,8 @@ use sync::ChainSync;
pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Clear the queue when sync is restarting.
fn clear(&self);
/// Clears the import queue and stops importing.
fn stop(&self);
/// Get queue status.
fn status(&self) -> ImportQueueStatus<B>;
/// Is block with given hash is currently in the queue.
@@ -109,6 +111,16 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
*best_importing_number = Zero::zero();
}
fn stop(&self) {
self.clear();
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();
let _ = handle.join();
}
}
fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: self.data.queue_blocks.read().len(),
@@ -138,12 +150,7 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
impl<B: BlockT> Drop for AsyncImportQueue<B> {
fn drop(&mut self) {
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();
let _ = handle.join();
}
self.stop();
}
}
@@ -433,6 +440,8 @@ pub mod tests {
impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
fn clear(&self) { }
fn stop(&self) { }
fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: 0,
@@ -536,6 +536,15 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
handshaking_peers.clear();
}
pub fn stop(&self) {
// stop processing import requests first (without holding a sync lock)
let import_queue = self.sync.read().import_queue();
import_queue.stop();
// and then clear all the sync data
self.abort();
}
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce<B::Header>) {
let header = announce.header;
let hash = header.hash();
+1 -1
View File
@@ -213,7 +213,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}
fn stop(&self) {
self.handler.protocol.abort();
self.handler.protocol.stop();
self.network.stop();
}
}
+6 -1
View File
@@ -100,7 +100,12 @@ impl<B: BlockT> ChainSync<B> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}
/// Returns sync status
/// Returns import queue reference.
pub(crate) fn import_queue(&self) -> Arc<ImportQueue<B>> {
self.import_queue.clone()
}
/// Returns sync status.
pub(crate) fn status(&self) -> Status<B> {
let best_seen = self.best_seen_block();
let state = match &best_seen {