Use a background task instead of a thread for the import queue (#2909)

* Use a background task instead of a thread for the import queue

* Update Cargo.locks

* Comment
This commit is contained in:
Pierre Krieger
2019-06-21 15:46:26 +02:00
committed by Gavin Wood
parent 1af51f4234
commit f45cfb0d16
7 changed files with 72 additions and 82 deletions
+1 -1
View File
@@ -4116,7 +4116,6 @@ dependencies = [
name = "substrate-consensus-common"
version = "2.0.0"
dependencies = [
"crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4129,6 +4128,7 @@ dependencies = [
"substrate-inherents 2.0.0",
"substrate-primitives 2.0.0",
"substrate-test-runtime-client 2.0.0",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
+1 -1
View File
@@ -7,7 +7,6 @@ edition = "2018"
[dependencies]
derive_more = "0.14.0"
crossbeam-channel = "0.3.4"
libp2p = { version = "0.9.0", default-features = false }
log = "0.4"
primitives = { package = "substrate-primitives", path= "../../primitives" }
@@ -16,6 +15,7 @@ futures = "0.1"
rstd = { package = "sr-std", path = "../../sr-std" }
runtime_version = { package = "sr-version", path = "../../sr-version" }
runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" }
tokio-executor = "0.1.6"
tokio-timer = "0.2"
parity-codec = { version = "3.3", features = ["derive"] }
parking_lot = "0.8.0"
@@ -25,8 +25,7 @@
//! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial
//! queues to be instantiated simply.
use std::{sync::Arc, thread, collections::HashMap};
use crossbeam_channel::{self as channel, Sender};
use std::{sync::Arc, collections::HashMap};
use futures::{prelude::*, sync::mpsc};
use runtime_primitives::{Justification, traits::{
Block as BlockT, Header as HeaderT, NumberFor,
@@ -121,31 +120,25 @@ pub trait ImportQueue<B: BlockT>: Send {
}
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// thread, with pluggable verification.
/// task, with pluggable verification.
pub struct BasicQueue<B: BlockT> {
/// Channel to send messages to the background thread.
sender: Option<Sender<ToWorkerMsg<B>>>,
/// Results coming from the worker thread.
/// Channel to send messages to the background task.
sender: mpsc::UnboundedSender<ToWorkerMsg<B>>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
/// Sent through the link as soon as possible.
finality_proof_request_builder: Option<SharedFinalityProofRequestBuilder<B>>,
}
impl<B: BlockT> Drop for BasicQueue<B> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
let (shutdown_sender, shutdown_receiver) = channel::unbounded();
if sender.send(ToWorkerMsg::Shutdown(shutdown_sender)).is_ok() {
let _ = shutdown_receiver.recv();
}
}
}
/// Since we have to be in a tokio context in order to spawn background tasks, we first store
/// the task to spawn here, then extract it as soon as we are in a tokio context.
/// If `Some`, contains the task to spawn in the background. If `None`, the future has already
/// been spawned.
future_to_spawn: Option<Box<dyn Future<Item = (), Error = ()> + Send>>,
}
impl<B: BlockT> BasicQueue<B> {
/// Instantiate a new basic queue, with given verifier.
///
/// This creates a background thread, and calls `on_start` on the justification importer and
/// This creates a background task, and calls `on_start` on the justification importer and
/// finality proof importer.
pub fn new<V: 'static + Verifier<B>>(
verifier: Arc<V>,
@@ -155,7 +148,7 @@ impl<B: BlockT> BasicQueue<B> {
finality_proof_request_builder: Option<SharedFinalityProofRequestBuilder<B>>,
) -> Self {
let (result_sender, result_port) = buffered_link();
let worker_sender = BlockImportWorker::new(
let (future, worker_sender) = BlockImportWorker::new(
result_sender,
verifier,
block_import,
@@ -164,9 +157,10 @@ impl<B: BlockT> BasicQueue<B> {
);
Self {
sender: Some(worker_sender),
sender: worker_sender,
result_port,
finality_proof_request_builder,
future_to_spawn: Some(Box::new(future)),
}
}
@@ -176,9 +170,7 @@ impl<B: BlockT> BasicQueue<B> {
/// has synchronized with ImportQueue.
#[cfg(any(test, feature = "test-helpers"))]
pub fn synchronize(&self) {
if let Some(ref sender) = self.sender {
let _ = sender.send(ToWorkerMsg::Synchronize);
}
let _ = self.sender.unbounded_send(ToWorkerMsg::Synchronize);
}
}
@@ -188,10 +180,8 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
return;
}
if let Some(ref sender) = self.sender {
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let _ = sender.send(ToWorkerMsg::ImportBlocks(origin, blocks));
}
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
}
fn import_justification(
@@ -201,19 +191,19 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
number: NumberFor<B>,
justification: Justification
) {
if let Some(ref sender) = self.sender {
let _ = sender.send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification));
}
let _ = self.sender.unbounded_send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification));
}
fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
if let Some(ref sender) = self.sender {
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let _ = sender.send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof));
}
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let _ = self.sender.unbounded_send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof));
}
fn poll_actions(&mut self, link: &mut dyn Link<B>) {
if let Some(future) = self.future_to_spawn.take() {
tokio_executor::spawn(future);
}
if let Some(fprb) = self.finality_proof_request_builder.take() {
link.set_finality_proof_request_builder(fprb);
}
@@ -228,7 +218,6 @@ enum ToWorkerMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
Shutdown(Sender<()>),
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
@@ -248,50 +237,52 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
finality_proof_import: Option<SharedFinalityProofImport<B>>,
) -> Sender<ToWorkerMsg<B>> {
let (sender, port) = channel::bounded(4);
let _ = thread::Builder::new()
.name("ImportQueueWorker".into())
.spawn(move || {
let mut worker = BlockImportWorker {
result_sender,
verifier,
justification_import,
block_import,
finality_proof_import,
) -> (impl Future<Item = (), Error = ()> + Send, mpsc::UnboundedSender<ToWorkerMsg<B>>) {
let (sender, mut port) = mpsc::unbounded();
let mut worker = BlockImportWorker {
result_sender,
verifier,
justification_import,
block_import,
finality_proof_import,
};
if let Some(justification_import) = worker.justification_import.as_ref() {
justification_import.on_start(&mut worker.result_sender);
}
if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() {
finality_proof_import.on_start(&mut worker.result_sender);
}
let future = futures::future::poll_fn(move || {
loop {
let msg = match port.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
};
if let Some(justification_import) = worker.justification_import.as_ref() {
justification_import.on_start(&mut worker.result_sender);
}
if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() {
finality_proof_import.on_start(&mut worker.result_sender);
}
for msg in port.iter() {
// Working until all senders have been dropped...
match msg {
ToWorkerMsg::ImportBlocks(origin, blocks) => {
worker.import_a_batch_of_blocks(origin, blocks);
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
worker.import_finality_proof(who, hash, number, proof);
},
ToWorkerMsg::ImportJustification(who, hash, number, justification) => {
worker.import_justification(who, hash, number, justification);
}
ToWorkerMsg::Shutdown(result_sender) => {
let _ = result_sender.send(());
break;
},
#[cfg(any(test, feature = "test-helpers"))]
ToWorkerMsg::Synchronize => {
trace!(target: "sync", "Sending sync message");
worker.result_sender.synchronized();
},
match msg {
ToWorkerMsg::ImportBlocks(origin, blocks) => {
worker.import_a_batch_of_blocks(origin, blocks);
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
worker.import_finality_proof(who, hash, number, proof);
},
ToWorkerMsg::ImportJustification(who, hash, number, justification) => {
worker.import_justification(who, hash, number, justification);
}
#[cfg(any(test, feature = "test-helpers"))]
ToWorkerMsg::Synchronize => {
trace!(target: "sync", "Sending sync message");
worker.result_sender.synchronized();
},
}
})
.expect("ImportQueueWorker thread spawning failed");
sender
}
});
(future, sender)
}
fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
@@ -26,7 +26,6 @@
// our error-chain could potentially blow up otherwise
#![recursion_limit="128"]
extern crate crossbeam_channel;
#[macro_use] extern crate log;
use std::sync::Arc;
+1 -1
View File
@@ -2524,7 +2524,6 @@ dependencies = [
name = "substrate-consensus-common"
version = "2.0.0"
dependencies = [
"crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2536,6 +2535,7 @@ dependencies = [
"sr-version 2.0.0",
"substrate-inherents 2.0.0",
"substrate-primitives 2.0.0",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
+1 -1
View File
@@ -2649,7 +2649,6 @@ dependencies = [
name = "substrate-consensus-common"
version = "2.0.0"
dependencies = [
"crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2661,6 +2660,7 @@ dependencies = [
"sr-version 2.0.0",
"substrate-inherents 2.0.0",
"substrate-primitives 2.0.0",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
+1 -1
View File
@@ -2783,7 +2783,6 @@ dependencies = [
name = "substrate-consensus-common"
version = "2.0.0"
dependencies = [
"crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2795,6 +2794,7 @@ dependencies = [
"sr-version 2.0.0",
"substrate-inherents 2.0.0",
"substrate-primitives 2.0.0",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]