From f45cfb0d1643682598432f9b367e8257b5f24a79 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 21 Jun 2019 15:46:26 +0200 Subject: [PATCH] Use a background task instead of a thread for the import queue (#2909) * Use a background task instead of a thread for the import queue * Update Cargo.locks * Comment --- substrate/Cargo.lock | 2 +- substrate/core/consensus/common/Cargo.toml | 2 +- .../core/consensus/common/src/import_queue.rs | 143 ++++++++---------- substrate/core/consensus/common/src/lib.rs | 1 - substrate/core/test-runtime/wasm/Cargo.lock | 2 +- .../node-template/runtime/wasm/Cargo.lock | 2 +- substrate/node/runtime/wasm/Cargo.lock | 2 +- 7 files changed, 72 insertions(+), 82 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index b89743546f..6ca698df3b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4116,7 +4116,6 @@ dependencies = [ name = "substrate-consensus-common" version = "2.0.0" dependencies = [ - "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4129,6 +4128,7 @@ dependencies = [ "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index 2852f4aa0f..c1a847da74 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] derive_more = "0.14.0" -crossbeam-channel = "0.3.4" libp2p = { version = "0.9.0", default-features = false } log = "0.4" primitives = { package = "substrate-primitives", path= "../../primitives" } @@ -16,6 +15,7 @@ futures = "0.1" rstd = { package = "sr-std", path = "../../sr-std" } runtime_version = { package = "sr-version", path = "../../sr-version" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } +tokio-executor = "0.1.6" tokio-timer = "0.2" parity-codec = { version = "3.3", features = ["derive"] } parking_lot = "0.8.0" diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 1753d63ff0..6cbb8ee413 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -25,8 +25,7 @@ //! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial //! queues to be instantiated simply. -use std::{sync::Arc, thread, collections::HashMap}; -use crossbeam_channel::{self as channel, Sender}; +use std::{sync::Arc, collections::HashMap}; use futures::{prelude::*, sync::mpsc}; use runtime_primitives::{Justification, traits::{ Block as BlockT, Header as HeaderT, NumberFor, @@ -121,31 +120,25 @@ pub trait ImportQueue: Send { } /// Interface to a basic block import queue that is importing blocks sequentially in a separate -/// thread, with pluggable verification. +/// task, with pluggable verification. pub struct BasicQueue { - /// Channel to send messages to the background thread. - sender: Option>>, - /// Results coming from the worker thread. + /// Channel to send messages to the background task. + sender: mpsc::UnboundedSender>, + /// Results coming from the worker task. result_port: BufferedLinkReceiver, /// Sent through the link as soon as possible. finality_proof_request_builder: Option>, -} - -impl Drop for BasicQueue { - fn drop(&mut self) { - if let Some(sender) = self.sender.take() { - let (shutdown_sender, shutdown_receiver) = channel::unbounded(); - if sender.send(ToWorkerMsg::Shutdown(shutdown_sender)).is_ok() { - let _ = shutdown_receiver.recv(); - } - } - } + /// Since we have to be in a tokio context in order to spawn background tasks, we first store + /// the task to spawn here, then extract it as soon as we are in a tokio context. + /// If `Some`, contains the task to spawn in the background. If `None`, the future has already + /// been spawned. + future_to_spawn: Option + Send>>, } impl BasicQueue { /// Instantiate a new basic queue, with given verifier. /// - /// This creates a background thread, and calls `on_start` on the justification importer and + /// This creates a background task, and calls `on_start` on the justification importer and /// finality proof importer. pub fn new>( verifier: Arc, @@ -155,7 +148,7 @@ impl BasicQueue { finality_proof_request_builder: Option>, ) -> Self { let (result_sender, result_port) = buffered_link(); - let worker_sender = BlockImportWorker::new( + let (future, worker_sender) = BlockImportWorker::new( result_sender, verifier, block_import, @@ -164,9 +157,10 @@ impl BasicQueue { ); Self { - sender: Some(worker_sender), + sender: worker_sender, result_port, finality_proof_request_builder, + future_to_spawn: Some(Box::new(future)), } } @@ -176,9 +170,7 @@ impl BasicQueue { /// has synchronized with ImportQueue. #[cfg(any(test, feature = "test-helpers"))] pub fn synchronize(&self) { - if let Some(ref sender) = self.sender { - let _ = sender.send(ToWorkerMsg::Synchronize); - } + let _ = self.sender.unbounded_send(ToWorkerMsg::Synchronize); } } @@ -188,10 +180,8 @@ impl ImportQueue for BasicQueue { return; } - if let Some(ref sender) = self.sender { - trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let _ = sender.send(ToWorkerMsg::ImportBlocks(origin, blocks)); - } + trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); } fn import_justification( @@ -201,19 +191,19 @@ impl ImportQueue for BasicQueue { number: NumberFor, justification: Justification ) { - if let Some(ref sender) = self.sender { - let _ = sender.send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification)); - } + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification)); } fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - if let Some(ref sender) = self.sender { - trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let _ = sender.send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); - } + trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); } fn poll_actions(&mut self, link: &mut dyn Link) { + if let Some(future) = self.future_to_spawn.take() { + tokio_executor::spawn(future); + } + if let Some(fprb) = self.finality_proof_request_builder.take() { link.set_finality_proof_request_builder(fprb); } @@ -228,7 +218,6 @@ enum ToWorkerMsg { ImportBlocks(BlockOrigin, Vec>), ImportJustification(Origin, B::Hash, NumberFor, Justification), ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), - Shutdown(Sender<()>), #[cfg(any(test, feature = "test-helpers"))] Synchronize, } @@ -248,50 +237,52 @@ impl> BlockImportWorker { block_import: SharedBlockImport, justification_import: Option>, finality_proof_import: Option>, - ) -> Sender> { - let (sender, port) = channel::bounded(4); - let _ = thread::Builder::new() - .name("ImportQueueWorker".into()) - .spawn(move || { - let mut worker = BlockImportWorker { - result_sender, - verifier, - justification_import, - block_import, - finality_proof_import, + ) -> (impl Future + Send, mpsc::UnboundedSender>) { + let (sender, mut port) = mpsc::unbounded(); + + let mut worker = BlockImportWorker { + result_sender, + verifier, + justification_import, + block_import, + finality_proof_import, + }; + + if let Some(justification_import) = worker.justification_import.as_ref() { + justification_import.on_start(&mut worker.result_sender); + } + if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { + finality_proof_import.on_start(&mut worker.result_sender); + } + + let future = futures::future::poll_fn(move || { + loop { + let msg = match port.poll() { + Ok(Async::Ready(Some(msg))) => msg, + Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => return Ok(Async::NotReady), }; - if let Some(justification_import) = worker.justification_import.as_ref() { - justification_import.on_start(&mut worker.result_sender); - } - if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { - finality_proof_import.on_start(&mut worker.result_sender); - } - for msg in port.iter() { - // Working until all senders have been dropped... - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - worker.import_a_batch_of_blocks(origin, blocks); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - worker.import_finality_proof(who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - ToWorkerMsg::Shutdown(result_sender) => { - let _ = result_sender.send(()); - break; - }, - #[cfg(any(test, feature = "test-helpers"))] - ToWorkerMsg::Synchronize => { - trace!(target: "sync", "Sending sync message"); - worker.result_sender.synchronized(); - }, + + match msg { + ToWorkerMsg::ImportBlocks(origin, blocks) => { + worker.import_a_batch_of_blocks(origin, blocks); + }, + ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { + worker.import_finality_proof(who, hash, number, proof); + }, + ToWorkerMsg::ImportJustification(who, hash, number, justification) => { + worker.import_justification(who, hash, number, justification); } + #[cfg(any(test, feature = "test-helpers"))] + ToWorkerMsg::Synchronize => { + trace!(target: "sync", "Sending sync message"); + worker.result_sender.synchronized(); + }, } - }) - .expect("ImportQueueWorker thread spawning failed"); - sender + } + }); + + (future, sender) } fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 5982003c15..aa210b9f86 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -26,7 +26,6 @@ // our error-chain could potentially blow up otherwise #![recursion_limit="128"] -extern crate crossbeam_channel; #[macro_use] extern crate log; use std::sync::Arc; diff --git a/substrate/core/test-runtime/wasm/Cargo.lock b/substrate/core/test-runtime/wasm/Cargo.lock index 833c31c3e4..7cba86c8b6 100644 --- a/substrate/core/test-runtime/wasm/Cargo.lock +++ b/substrate/core/test-runtime/wasm/Cargo.lock @@ -2524,7 +2524,6 @@ dependencies = [ name = "substrate-consensus-common" version = "2.0.0" dependencies = [ - "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2536,6 +2535,7 @@ dependencies = [ "sr-version 2.0.0", "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/node-template/runtime/wasm/Cargo.lock b/substrate/node-template/runtime/wasm/Cargo.lock index ae323203f6..df4773de1e 100644 --- a/substrate/node-template/runtime/wasm/Cargo.lock +++ b/substrate/node-template/runtime/wasm/Cargo.lock @@ -2649,7 +2649,6 @@ dependencies = [ name = "substrate-consensus-common" version = "2.0.0" dependencies = [ - "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2661,6 +2660,7 @@ dependencies = [ "sr-version 2.0.0", "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/node/runtime/wasm/Cargo.lock b/substrate/node/runtime/wasm/Cargo.lock index 89c1bf9303..0d94d65476 100644 --- a/substrate/node/runtime/wasm/Cargo.lock +++ b/substrate/node/runtime/wasm/Cargo.lock @@ -2783,7 +2783,6 @@ dependencies = [ name = "substrate-consensus-common" version = "2.0.0" dependencies = [ - "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2795,6 +2794,7 @@ dependencies = [ "sr-version 2.0.0", "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ]