Fix import queue thread pool shutdown (#4929)

* Fix import queue thread pool shutdown

* Make sure runtime is disposed before telemetry

* Close channel istead of sending a message

* Fixed test
This commit is contained in:
Arkadiy Paronyan
2020-02-17 10:49:40 +01:00
committed by GitHub
parent c7b09b642a
commit c8fa6518bf
5 changed files with 78 additions and 14 deletions
@@ -14,9 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc};
use futures::{prelude::*, channel::mpsc, task::Context, task::Poll};
use futures_timer::Delay;
use parking_lot::{Mutex, Condvar};
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use crate::block_import::BlockOrigin;
@@ -40,9 +41,28 @@ pub struct BasicQueue<B: BlockT, Transaction> {
manual_poll: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// A thread pool where the background worker is being run.
pool: Option<futures::executor::ThreadPool>,
pool_guard: Arc<(Mutex<usize>, Condvar)>,
_phantom: PhantomData<Transaction>,
}
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
self.pool = None;
// Flush the queue and close the receiver to terminate the future.
self.sender.close_channel();
self.result_port.close();
// Make sure all pool threads terminate.
// https://github.com/rust-lang/futures-rs/issues/1470
// https://github.com/rust-lang/futures-rs/issues/1349
let (ref mutex, ref condvar) = *self.pool_guard;
let mut lock = mutex.lock();
while *lock != 0 {
condvar.wait(&mut lock);
}
}
}
impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
/// Instantiate a new basic queue, with given verifier.
///
@@ -63,9 +83,22 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
finality_proof_import,
);
let guard = Arc::new((Mutex::new(0usize), Condvar::new()));
let guard_start = guard.clone();
let guard_end = guard.clone();
let mut pool = futures::executor::ThreadPool::builder()
.name_prefix("import-queue-worker-")
.pool_size(1)
.after_start(move |_| *guard_start.0.lock() += 1)
.before_stop(move |_| {
let (ref mutex, ref condvar) = *guard_end;
let mut lock = mutex.lock();
*lock -= 1;
if *lock == 0 {
condvar.notify_one();
}
})
.create()
.ok();
@@ -82,6 +115,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
result_port,
manual_poll,
pool,
pool_guard: guard,
_phantom: PhantomData,
}
}
@@ -157,6 +157,11 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
}
}
}
/// Close the channel.
pub fn close(&mut self) {
self.rx.close()
}
}
#[cfg(test)]