diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index e8801b6376..4d12fdad75 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3778,12 +3778,12 @@ dependencies = [ name = "substrate-consensus-common" version = "0.1.0" dependencies = [ + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "sr-version 0.1.0", "substrate-inherents 0.1.0", diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 8cb63c0464..e3a87e8ac9 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -615,7 +615,7 @@ impl Verifier for AuraVerifier where } /// The Aura import queue type. -pub type AuraImportQueue = BasicQueue>; +pub type AuraImportQueue = BasicQueue; /// Register the aura inherent data provider, if not registered already. fn register_aura_inherent_data_provider( @@ -639,12 +639,12 @@ pub fn import_queue( client: Arc, extra: E, inherent_data_providers: InherentDataProviders, -) -> Result, consensus_common::Error> where +) -> Result, consensus_common::Error> where B: Block, - C: Authorities + ProvideRuntimeApi + Send + Sync, + C: 'static + Authorities + ProvideRuntimeApi + Send + Sync, C::Api: BlockBuilderApi, DigestItemFor: CompatibleDigestItem + DigestItem, - E: ExtraVerification, + E: 'static + ExtraVerification, { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; @@ -699,10 +699,7 @@ mod tests { const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); pub struct AuraTestNet { - peers: Vec, ()>>>, + peers: Vec>>, started: bool, } @@ -737,15 +734,15 @@ mod tests { }) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>>)>(&mut self, closure: F) { closure(&mut self.peers); } diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index 0225f8645e..0979cc45de 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -6,8 +6,8 @@ description = "Common utilities for substrate consensus" edition = "2018" [dependencies] +crossbeam-channel = "0.3.4" log = "0.4" -parking_lot = "0.7.1" primitives = { package = "substrate-primitives", path= "../../primitives" } inherents = { package = "substrate-inherents", path = "../../inherents" } error-chain = "0.12" diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index e8054d04a2..cb09f57caf 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -25,19 +25,21 @@ //! instantiated simply. use crate::block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin}; -use std::collections::{HashSet, VecDeque}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use parking_lot::{Condvar, Mutex, RwLock}; -use log::{trace, debug}; +use crossbeam_channel::{self as channel, Receiver, Sender}; +use std::collections::HashSet; +use std::sync::Arc; +use std::thread; + +use runtime_primitives::traits::{ + AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor, Zero, +}; use runtime_primitives::Justification; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero, AuthorityIdFor}; use crate::error::Error as ConsensusError; /// Shared block import struct used by the queue. -pub type SharedBlockImport = Arc + Send + Sync>; +pub type SharedBlockImport = Arc + Send + Sync>; /// Shared justification import struct used by the queue. pub type SharedJustificationImport = Arc + Send + Sync>; @@ -70,20 +72,17 @@ pub trait Verifier: Send + Sync + Sized { origin: BlockOrigin, header: B::Header, justification: Option, - body: Option> + body: Option>, ) -> Result<(ImportBlock, Option>>), String>; } /// Blocks import queue API. -pub trait ImportQueue: Send + Sync { +pub trait ImportQueue: Send + Sync + ImportQueueClone { /// Start background work for the queue as necessary. /// /// This is called automatically by the network service when synchronization /// begins. - fn start(&self, _link: L) -> Result<(), std::io::Error> where - Self: Sized, - L: 'static + Link, - { + fn start(&self, _link: Box>) -> Result<(), std::io::Error> { Ok(()) } /// Clear the queue when sync is restarting. @@ -97,199 +96,433 @@ pub trait ImportQueue: Send + Sync { /// Import bunch of blocks. fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); /// Import a block justification. - fn import_justification(&self, hash: B::Hash, number: NumberFor, justification: Justification) -> bool; + fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification); +} + +pub trait ImportQueueClone { + fn clone_box(&self) -> Box>; +} + +impl Clone for Box> { + fn clone(&self) -> Box> { + self.clone_box() + } } /// Import queue status. It isn't completely accurate. +#[derive(Debug)] pub struct ImportQueueStatus { /// Number of blocks that are currently in the queue. pub importing_count: usize, /// The number of the best block that was ever in the queue since start/last failure. - pub best_importing_number: <::Header as HeaderT>::Number, + pub best_importing_number: NumberFor, } -/// Basic block import queue that is importing blocks sequentially in a separate thread, +/// Interface to a basic block import queue that is importing blocks sequentially in a separate thread, /// with pluggable verification. -pub struct BasicQueue> { - handle: Mutex>>, - data: Arc>, - verifier: Arc, - block_import: SharedBlockImport, - justification_import: Option>, +#[derive(Clone)] +pub struct BasicQueue { + sender: Sender>, } -/// Locks order: queue, queue_blocks, best_importing_number -pub struct AsyncImportQueueData { - signal: Condvar, - queue: Mutex>)>>, - queue_blocks: RwLock>, - best_importing_number: RwLock<<::Header as HeaderT>::Number>, - is_stopping: AtomicBool, +impl ImportQueueClone for BasicQueue { + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } } -impl> BasicQueue { - /// Instantiate a new basic queue, with given verifier and justification import. - pub fn new(verifier: Arc, block_import: SharedBlockImport, justification_import: Option>) -> Self { +/// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter". +/// "BasicQueue" itself does not keep any state or do any importing work, and can therefore be send to other threads. +/// +/// "BasiqQueue" implements "ImportQueue" by sending messages to the "BlockImporter", which runs in it's own thread. +/// +/// The "BlockImporter" is responsible for handling incoming requests from the "BasicQueue", +/// some of these requests are handled by the "BlockImporter" itself, such as "is_importing" or "status", +/// and justifications are also imported by the "BlockImporter". +/// +/// The "import block" work will be offloaded to a single "BlockImportWorker", running in another thread. +/// Offloading the work is done via a channel, +/// ensuring blocks in this implementation are imported sequentially and in order(as received by the "BlockImporter") +/// +/// As long as the "BasicQueue" is not dropped, the "BlockImporter" will keep running. +/// The "BlockImporter" owns a sender to the "BlockImportWorker", ensuring that the worker is kept alive until that sender is dropped. +impl BasicQueue { + /// Instantiate a new basic queue, with given verifier. + pub fn new>( + verifier: Arc, + block_import: SharedBlockImport, + justification_import: Option> + ) -> Self { + let (result_sender, result_port) = channel::unbounded(); + let worker_sender = BlockImportWorker::new(result_sender, verifier, block_import); + let importer_sender = BlockImporter::new(result_port, worker_sender, justification_import); + Self { - handle: Mutex::new(None), - data: Arc::new(AsyncImportQueueData::new()), - verifier, - block_import, - justification_import, + sender: importer_sender, } } } -impl AsyncImportQueueData { - /// Instantiate a new async import queue data. - pub fn new() -> Self { - Self { - signal: Default::default(), - queue: Mutex::new(VecDeque::new()), - queue_blocks: RwLock::new(HashSet::new()), - best_importing_number: RwLock::new(Zero::zero()), - is_stopping: Default::default(), - } - } - - // Signals to stop importing new blocks. - pub fn stop(&self) { - self.is_stopping.store(true, Ordering::SeqCst); - } -} - -impl> ImportQueue for BasicQueue { - fn start>( - &self, - link: L, - ) -> Result<(), std::io::Error> { - debug_assert!(self.handle.lock().is_none()); - - let qdata = self.data.clone(); - let verifier = self.verifier.clone(); - let block_import = self.block_import.clone(); - let justification_import = self.justification_import.clone(); - *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { - if let Some(justification_import) = justification_import.as_ref() { - justification_import.on_start(&link); - } - import_thread(block_import, link, qdata, verifier) - })?); +impl ImportQueue for BasicQueue { + fn start(&self, link: Box>) -> Result<(), std::io::Error> { + let _ = self + .sender + .send(BlockImportMsg::Start(link)) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); Ok(()) } fn clear(&self) { - let mut queue = self.data.queue.lock(); - let mut queue_blocks = self.data.queue_blocks.write(); - let mut best_importing_number = self.data.best_importing_number.write(); - queue_blocks.clear(); - queue.clear(); - *best_importing_number = Zero::zero(); + let _ = self + .sender + .send(BlockImportMsg::Clear) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } fn stop(&self) { - self.clear(); - if let Some(handle) = self.handle.lock().take() { - { - // Perform storing the stop flag and signalling under a single lock. - let _queue_lock = self.data.queue.lock(); - self.data.stop(); - self.data.signal.notify_one(); - } - - let _ = handle.join(); - } + let _ = self + .sender + .send(BlockImportMsg::Stop) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } fn status(&self) -> ImportQueueStatus { - ImportQueueStatus { - importing_count: self.data.queue_blocks.read().len(), - best_importing_number: *self.data.best_importing_number.read(), - } + let (sender, port) = channel::unbounded(); + let _ = self + .sender + .send(BlockImportMsg::Status(sender)) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") } fn is_importing(&self, hash: &B::Hash) -> bool { - self.data.queue_blocks.read().contains(hash) + let (sender, port) = channel::unbounded(); + let _ = self + .sender + .send(BlockImportMsg::IsImporting(hash.clone(), sender)) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") } fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return; } - - trace!(target:"sync", "Scheduling {} blocks for import", blocks.len()); - - let mut queue = self.data.queue.lock(); - let mut queue_blocks = self.data.queue_blocks.write(); - let mut best_importing_number = self.data.best_importing_number.write(); - let new_best_importing_number = blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero()); - queue_blocks.extend(blocks.iter().map(|b| b.hash.clone())); - if new_best_importing_number > *best_importing_number { - *best_importing_number = new_best_importing_number; - } - queue.push_back((origin, blocks)); - self.data.signal.notify_one(); + let _ = self + .sender + .send(BlockImportMsg::ImportBlocks(origin, blocks)) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } - fn import_justification(&self, hash: B::Hash, number: NumberFor, justification: Justification) -> bool { - self.justification_import.as_ref().map(|justification_import| { - justification_import.import_justification(hash, number, justification).is_ok() - }).unwrap_or(false) + fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { + let _ = self + .sender + .send(BlockImportMsg::ImportJustification(who, hash, number, justification)) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } } -impl> Drop for BasicQueue { - fn drop(&mut self) { - self.stop(); - } +pub enum BlockImportMsg { + ImportBlocks(BlockOrigin, Vec>), + Clear, + Status(Sender>), + IsImporting(B::Hash, Sender), + ImportJustification(Origin, B::Hash, NumberFor, Justification), + Start(Box>), + Stop, } -/// Blocks import thread. -fn import_thread, V: Verifier>( - block_import: SharedBlockImport, - link: L, - qdata: Arc>, - verifier: Arc -) { - trace!(target: "sync", "Starting import thread"); - loop { - let new_blocks = { - let mut queue_lock = qdata.queue.lock(); +pub enum BlockImportWorkerMsg { + ImportBlocks(BlockOrigin, Vec>), + Imported( + Vec<( + Result>, BlockImportError>, + B::Hash, + )>, + ), +} - // We are holding the same lock that `stop` takes so here we either see that stop flag - // is active or wait for the signal. The latter one unlocks the mutex and this gives a chance - // to `stop` to generate the signal. - if qdata.is_stopping.load(Ordering::SeqCst) { - break; - } - if queue_lock.is_empty() { - qdata.signal.wait(&mut queue_lock); - } +enum ImportMsgType { + FromWorker(BlockImportWorkerMsg), + FromNetwork(BlockImportMsg), +} - match queue_lock.pop_front() { - Some(new_blocks) => new_blocks, - None => break, +struct BlockImporter { + port: Receiver>, + result_port: Receiver>, + worker_sender: Sender>, + queue_blocks: HashSet, + best_importing_number: NumberFor, + link: Option>>, + justification_import: Option>, +} + +impl BlockImporter { + fn new( + result_port: Receiver>, + worker_sender: Sender>, + justification_import: Option>, + ) -> Sender> { + let (sender, port) = channel::unbounded(); + let _ = thread::Builder::new() + .name("ImportQueue".into()) + .spawn(move || { + let mut importer = BlockImporter { + port, + result_port, + worker_sender, + queue_blocks: HashSet::new(), + best_importing_number: Zero::zero(), + link: None, + justification_import, + }; + while importer.run() { + // Importing until all senders have been dropped... + } + }) + .expect("ImportQueue thread spawning failed"); + sender + } + + fn run(&mut self) -> bool { + let msg = select! { + recv(self.port) -> msg => { + match msg { + // Our sender has been dropped, quitting. + Err(_) => return false, + Ok(msg) => ImportMsgType::FromNetwork(msg) + } + }, + recv(self.result_port) -> msg => { + match msg { + Err(_) => unreachable!("1. We hold a sender to the Worker, 2. it should not quit until that sender is dropped; qed"), + Ok(msg) => ImportMsgType::FromWorker(msg), + } } }; - - let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.hash.clone()).collect(); - if !import_many_blocks( - &*block_import, - &link, - Some(&*qdata), - new_blocks, - verifier.clone(), - ) { - break; - } - - let mut queue_blocks = qdata.queue_blocks.write(); - for blocks_hash in blocks_hashes { - queue_blocks.remove(&blocks_hash); + match msg { + ImportMsgType::FromNetwork(msg) => self.handle_network_msg(msg), + ImportMsgType::FromWorker(msg) => self.handle_worker_msg(msg), } } - trace!(target: "sync", "Stopping import thread"); + fn handle_network_msg(&mut self, msg: BlockImportMsg) -> bool { + match msg { + BlockImportMsg::ImportBlocks(origin, incoming_blocks) => { + self.handle_import_blocks(origin, incoming_blocks) + } + BlockImportMsg::Clear => self.handle_clear(), + BlockImportMsg::Status(reply_sender) => self.handle_status(reply_sender), + BlockImportMsg::IsImporting(hash, reply_sender) => { + self.handle_is_importing(hash, reply_sender) + } + BlockImportMsg::ImportJustification(who, hash, number, justification) => { + self.handle_import_justification(who, hash, number, justification) + } + BlockImportMsg::Start(link) => { + if let Some(justification_import) = self.justification_import.as_ref() { + justification_import.on_start(&*link); + } + self.link = Some(link); + } + BlockImportMsg::Stop => { + self.handle_clear(); + return false; + } + } + true + } + + fn handle_worker_msg(&mut self, msg: BlockImportWorkerMsg) -> bool { + let results = match msg { + BlockImportWorkerMsg::Imported(results) => (results), + _ => unreachable!("Import Worker does not send ImportBlocks message; qed"), + }; + let mut has_error = false; + for (result, hash) in results { + self.queue_blocks.remove(&hash); + + if has_error { + continue; + } + + if result.is_err() { + self.best_importing_number = Zero::zero(); + has_error = true; + } + + let link = match self.link.as_ref() { + Some(link) => link, + None => { + trace!(target: "sync", "Received import result for {} while import-queue has no link", hash); + return true; + }, + }; + + match result { + Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), + Ok(BlockImportResult::ImportedUnknown(number)) => { + link.block_imported(&hash, number) + } + Ok(BlockImportResult::ImportedUnjustified(number)) => { + link.block_imported(&hash, number); + link.request_justification(&hash, number); + }, + Err(BlockImportError::IncompleteHeader(who)) => { + if let Some(peer) = who { + link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import"); + } + } + Err(BlockImportError::VerificationFailed(who, e)) => { + if let Some(peer) = who { + link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e)); + } + } + Err(BlockImportError::BadBlock(who)) => { + if let Some(peer) = who { + link.note_useless_and_restart_sync(peer, "Sent us a bad block"); + } + } + Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { + link.restart() + } + }; + } + if let Some(link) = self.link.as_ref() { + link.maintain_sync(); + } + true + } + + fn handle_clear(&mut self) { + self.queue_blocks.clear(); + self.best_importing_number = Zero::zero(); + } + + fn handle_status(&self, reply_sender: Sender>) { + let status = ImportQueueStatus { + importing_count: self.queue_blocks.len(), + best_importing_number: self.best_importing_number, + }; + let _ = reply_sender.send(status); + } + + fn handle_is_importing(&self, hash: B::Hash, reply_sender: Sender) { + let _ = reply_sender.send(self.queue_blocks.contains(&hash)); + } + + fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { + let success = self.justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification) + .map_err(|e| { + debug!("Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who); + e + }).is_ok() + }).unwrap_or(false); + if let Some(link) = self.link.as_ref() { + link.justification_imported(who, &hash, number, success); + } + } + + fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + trace!(target:"sync", "Scheduling {} blocks for import", blocks.len()); + + let new_best_importing_number = blocks + .last() + .and_then(|b| b.header.as_ref().map(|h| h.number().clone())) + .unwrap_or_else(|| Zero::zero()); + self.queue_blocks + .extend(blocks.iter().map(|b| b.hash.clone())); + if new_best_importing_number > self.best_importing_number { + self.best_importing_number = new_best_importing_number; + } + self.worker_sender + .send(BlockImportWorkerMsg::ImportBlocks(origin, blocks)) + .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); + } +} + +struct BlockImportWorker> { + result_sender: Sender>, + block_import: SharedBlockImport, + verifier: Arc, +} + +impl> BlockImportWorker { + pub fn new( + result_sender: Sender>, + verifier: Arc, + block_import: SharedBlockImport, + ) -> Sender> { + let (sender, port) = channel::unbounded(); + let _ = thread::Builder::new() + .name("ImportQueueWorker".into()) + .spawn(move || { + let worker = BlockImportWorker { + result_sender, + verifier, + block_import, + }; + for msg in port.iter() { + // Working until all senders have been dropped... + match msg { + BlockImportWorkerMsg::ImportBlocks(origin, blocks) => { + worker.import_a_batch_of_blocks(origin, blocks) + } + _ => unreachable!("Import Worker does not receive the Imported message; qed"), + } + } + }) + .expect("ImportQueueWorker thread spawning failed"); + sender + } + + fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + let count = blocks.len(); + let mut imported = 0; + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range); + + let mut results = vec![]; + + let mut has_error = false; + + // Blocks in the response/drain should be in ascending order. + for block in blocks { + let import_result = if has_error { + Err(BlockImportError::Error) + } else { + import_single_block( + &*self.block_import, + origin.clone(), + block.clone(), + self.verifier.clone(), + ) + }; + let was_ok = import_result.is_ok(); + results.push((import_result, block.hash)); + if was_ok { + imported += 1; + } else { + has_error = true; + } + } + + let _ = self + .result_sender + .send(BlockImportWorkerMsg::Imported(results)); + + trace!(target: "sync", "Imported {} of {}", imported, count); + } } /// Hooks that the verification queue can use to influence the synchronization @@ -297,27 +530,29 @@ fn import_thread, V: Verifier>( pub trait Link: Send { /// Block imported. fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) { } + /// Justification import result. + fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor, _success: bool) { } /// Request a justification for the given block. fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) { } /// Maintain sync. - fn maintain_sync(&self) { } + fn maintain_sync(&self) {} /// Disconnect from peer. - fn useless_peer(&self, _who: Origin, _reason: &str) { } + fn useless_peer(&self, _who: Origin, _reason: &str) {} /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) { } + fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) {} /// Restart sync. - fn restart(&self) { } + fn restart(&self) {} } /// Block import successful result. #[derive(Debug, PartialEq)] -pub enum BlockImportResult { +pub enum BlockImportResult { /// Imported known block. - ImportedKnown(H, N), + ImportedKnown(N), /// Imported unknown block. - ImportedUnknown(H, N), + ImportedUnknown(N), /// Imported unjustified block that requires one. - ImportedUnjustified(H, N), + ImportedUnjustified(N), } /// Block import error. @@ -335,62 +570,13 @@ pub enum BlockImportError { Error, } -/// Import a bunch of blocks. -pub fn import_many_blocks<'a, B: BlockT, V: Verifier>( - import_handle: &BlockImport, - link: &Link, - qdata: Option<&AsyncImportQueueData>, - blocks: (BlockOrigin, Vec>), - verifier: Arc -) -> bool -{ - let (blocks_origin, blocks) = blocks; - let count = blocks.len(); - let mut imported = 0; - - let blocks_range = match ( - blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), - blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range); - - // Blocks in the response/drain should be in ascending order. - for block in blocks { - let import_result = import_single_block( - import_handle, - blocks_origin.clone(), - block, - verifier.clone(), - ); - let is_import_failed = import_result.is_err(); - imported += process_import_result(link, import_result); - if is_import_failed { - qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero()); - return true; - } - - if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() { - return false; - } - } - - trace!(target: "sync", "Imported {} of {}", imported, count); - link.maintain_sync(); - true -} - /// Single block import function. pub fn import_single_block>( - import_handle: &BlockImport, + import_handle: &BlockImport, block_origin: BlockOrigin, block: IncomingBlock, - verifier: Arc -) -> Result::Header as HeaderT>::Number>, BlockImportError> -{ + verifier: Arc, +) -> Result>, BlockImportError> { let peer = block.origin; let (header, justification) = match (block.header, block.justification) { @@ -413,18 +599,18 @@ pub fn import_single_block>( match e { Ok(ImportResult::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedKnown(hash, number)) + Ok(BlockImportResult::ImportedKnown(number)) }, Ok(ImportResult::AlreadyQueued) => { trace!(target: "sync", "Block already queued {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedKnown(hash, number)) + Ok(BlockImportResult::ImportedKnown(number)) }, Ok(ImportResult::Queued) => { - Ok(BlockImportResult::ImportedUnknown(hash, number)) + Ok(BlockImportResult::ImportedUnknown(number)) }, Ok(ImportResult::NeedsJustification) => { trace!(target: "sync", "Block queued but requires justification {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedUnjustified(hash, number)) + Ok(BlockImportResult::ImportedUnjustified(number)) }, Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); @@ -442,7 +628,7 @@ pub fn import_single_block>( }; match import_error(import_handle.check_block(hash, parent))? { - BlockImportResult::ImportedUnknown(_, _) => (), + BlockImportResult::ImportedUnknown(_) => (), r @ _ => return Ok(r), // Any other successfull result means that the block is already imported. } @@ -459,47 +645,102 @@ pub fn import_single_block>( import_error(import_handle.import_block(import_block, new_authorities)) } -/// Process single block import result. -pub fn process_import_result( - link: &Link, - result: Result::Header as HeaderT>::Number>, BlockImportError> -) -> usize -{ - match result { - Ok(BlockImportResult::ImportedKnown(hash, number)) => { - link.block_imported(&hash, number); - 1 - }, - Ok(BlockImportResult::ImportedUnknown(hash, number)) => { - link.block_imported(&hash, number); - 1 - }, - Ok(BlockImportResult::ImportedUnjustified(hash, number)) => { - link.block_imported(&hash, number); - link.request_justification(&hash, number); - 1 - }, - Err(BlockImportError::IncompleteHeader(who)) => { - if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import"); +#[cfg(test)] +mod tests { + use super::*; + use test_client::runtime::{Block, Hash}; + + #[derive(Debug, PartialEq)] + enum LinkMsg { + BlockImported, + Disconnected, + Restarted, + } + + #[derive(Clone)] + struct TestLink { + sender: Sender, + } + + impl TestLink { + fn new(sender: Sender) -> TestLink { + TestLink { + sender, } - 0 - }, - Err(BlockImportError::VerificationFailed(who, e)) => { - if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e)); - } - 0 - }, - Err(BlockImportError::BadBlock(who)) => { - if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, "Sent us a bad block"); - } - 0 - }, - Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - link.restart(); - 0 - }, + } + } + + impl Link for TestLink { + fn block_imported(&self, _hash: &Hash, _number: NumberFor) { + let _ = self.sender.send(LinkMsg::BlockImported); + } + fn maintain_sync(&self) { + } + fn useless_peer(&self, _: Origin, _: &str) { + let _ = self.sender.send(LinkMsg::Disconnected); + } + fn note_useless_and_restart_sync(&self, id: Origin, r: &str) { + self.useless_peer(id, r); + self.restart(); + } + fn restart(&self) { + let _ = self.sender.send(LinkMsg::Restarted); + } + } + + #[test] + fn process_import_result_works() { + let (result_sender, result_port) = channel::unbounded(); + let (worker_sender, _) = channel::unbounded(); + let (link_sender, link_port) = channel::unbounded(); + let importer_sender = BlockImporter::::new(result_port, worker_sender, None); + let link = TestLink::new(link_sender); + let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()))); + + // Ensure the importer handles Start before any result messages. + let (ack_sender, ack_port) = channel::unbounded(); + let _ = importer_sender.send(BlockImportMsg::Status(ack_sender)); + let _ = ack_port.recv(); + + // Send a known + let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); + + // Send a second known + let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); + + // Send an unknown + let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default())), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); + + // Send an incomplete header + let results = vec![(Err(BlockImportError::IncompleteHeader(Some(Default::default()))), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); + assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); + + // Send an unknown parent + let results = vec![(Err(BlockImportError::UnknownParent), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); + + // Send a verification failed + let results = vec![(Err(BlockImportError::VerificationFailed(Some(0), String::new())), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); + assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); + + // Send an error + let results = vec![(Err(BlockImportError::Error), Default::default())]; + let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); + assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); + + // Drop the importer sender first, ensuring graceful shutdown. + drop(importer_sender); } } + diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 904e013256..53ffe1fec3 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -26,6 +26,9 @@ // our error-chain could potentially blow up otherwise #![recursion_limit="128"] +#[macro_use] extern crate crossbeam_channel; +#[macro_use] extern crate log; + use std::sync::Arc; use std::time::Duration; diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 270761aed3..8e10d9c9a9 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -52,7 +52,7 @@ type PeerData = > > >; -type GrandpaPeer = Peer; +type GrandpaPeer = Peer; struct GrandpaTestNet { peers: Vec>, diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 1035f35d25..c7e5dd9d20 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -258,6 +258,8 @@ pub enum ProtocolMsg,> { BlockImportedSync(B::Hash, NumberFor), /// Tell protocol to request justification for a block. RequestJustification(B::Hash, NumberFor), + /// Inform protocol whether a justification was successfully imported. + JustificationImportResult(B::Hash, NumberFor, bool), /// A block has been imported (sent by the client). BlockImported(B::Hash, B::Header), /// A block has been finalized (sent by the client). @@ -274,11 +276,11 @@ pub enum ProtocolMsg,> { impl, H: ExHashT> Protocol { /// Create a new instance. - pub fn new>( + pub fn new( network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, - import_queue: Arc, + import_queue: Box>, on_demand: Option>>, transaction_pool: Arc>, specialization: S, @@ -403,6 +405,7 @@ impl, H: ExHashT> Protocol { ProtocolContext::new(&mut self.context_data, &self.network_chan); self.sync.request_justification(&hash, number, &mut context); }, + ProtocolMsg::JustificationImportResult(hash, number, success) => self.sync.justification_import_result(hash, number, success), ProtocolMsg::PropagateExtrinsics => self.propagate_extrinsics(), ProtocolMsg::Tick => self.tick(), #[cfg(any(test, feature = "test-helpers"))] @@ -893,8 +896,7 @@ impl, H: ExHashT> Protocol { fn stop(&mut self) { // stop processing import requests first (without holding a sync lock) - let import_queue = self.sync.import_queue(); - import_queue.stop(); + self.sync.stop(); // and then clear all the sync data self.abort(); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index ed26dd68d4..f857e712d3 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -80,6 +80,14 @@ impl> Link for NetworkLink { let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); } + fn justification_imported(&self, who: NodeIndex, hash: &B::Hash, number: NumberFor, success: bool) { + let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); + if !success { + let reason = Severity::Bad(format!("Invalid justification provided for #{}", hash).to_string()); + let _ = self.network_sender.send(NetworkMsg::ReportPeer(who, reason)); + } + } + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } @@ -119,10 +127,10 @@ pub struct Service> { impl> Service { /// Creates and register protocol with the network service - pub fn new, H: ExHashT>( + pub fn new( params: Params, protocol_id: ProtocolId, - import_queue: Arc, + import_queue: Box>, ) -> Result<(Arc>, NetworkChan), Error> { let (network_chan, network_port) = network_channel(protocol_id); let protocol_sender = Protocol::new( @@ -155,7 +163,7 @@ impl> Service { network_sender: network_chan.clone(), }; - import_queue.start(link)?; + import_queue.start(Box::new(link))?; Ok((service, network_chan)) } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 9193051322..213b1b6586 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -15,7 +15,6 @@ // along with Substrate. If not, see . use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::Arc; use std::time::{Duration, Instant}; use log::{trace, debug}; use crate::protocol::Context; @@ -30,6 +29,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberF use runtime_primitives::generic::BlockId; use crate::message::{self, generic::Message as GenericMessage}; use crate::config::Roles; +use std::sync::atomic::{AtomicBool, Ordering}; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -197,14 +197,25 @@ impl PendingJustifications { } } + /// Process the import of a justification. + /// Queues a retry in case the import failed. + fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + let request = (hash, number); + if success { + self.justifications.remove(&request); + self.previous_requests.remove(&request); + return; + } + self.pending_requests.push_front(request); + } + /// Processes the response for the request previously sent to the given - /// peer. Queues a retry in case the import fails or the given justification + /// peer. Queues a retry in case the given justification /// was `None`. fn on_response( &mut self, who: NodeIndex, justification: Option, - protocol: &mut Context, import_queue: &ImportQueue, ) { // we assume that the request maps to the given response, this is @@ -212,23 +223,13 @@ impl PendingJustifications { // messages to chain sync. if let Some(request) = self.peer_requests.remove(&who) { if let Some(justification) = justification { - if import_queue.import_justification(request.0, request.1, justification) { - self.justifications.remove(&request); - self.previous_requests.remove(&request); - return; - } else { - protocol.report_peer( - who, - Severity::Bad(format!("Invalid justification provided for #{}", request.0)), - ); - } - } else { - self.previous_requests - .entry(request) - .or_insert(Vec::new()) - .push((who, Instant::now())); + import_queue.import_justification(who.clone(), request.0, request.1, justification); + return } - + self.previous_requests + .entry(request) + .or_insert(Vec::new()) + .push((who, Instant::now())); self.pending_requests.push_front(request); } } @@ -251,8 +252,9 @@ pub struct ChainSync { best_queued_number: NumberFor, best_queued_hash: B::Hash, required_block_attributes: message::BlockAttributes, - import_queue: Arc>, justifications: PendingJustifications, + import_queue: Box>, + is_stopping: AtomicBool, } /// Reported sync state. @@ -293,7 +295,7 @@ impl Status { impl ChainSync { /// Create a new instance. - pub(crate) fn new(role: Roles, info: &ClientInfo, import_queue: Arc>) -> Self { + pub(crate) fn new(role: Roles, info: &ClientInfo, import_queue: Box>) -> Self { let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; if role.intersects(Roles::FULL | Roles::AUTHORITY) { required_block_attributes |= message::BlockAttributes::BODY; @@ -308,6 +310,7 @@ impl ChainSync { justifications: PendingJustifications::new(), required_block_attributes, import_queue, + is_stopping: Default::default(), } } @@ -316,7 +319,7 @@ impl ChainSync { } /// Returns import queue reference. - pub(crate) fn import_queue(&self) -> Arc> { + pub(crate) fn import_queue(&self) -> Box> { self.import_queue.clone() } @@ -536,7 +539,6 @@ impl ChainSync { self.justifications.on_response( who, response.justification, - protocol, &*self.import_queue, ); }, @@ -558,6 +560,9 @@ impl ChainSync { /// Maintain the sync process (download new blocks, fetch justifications). pub fn maintain_sync(&mut self, protocol: &mut Context) { + if self.is_stopping.load(Ordering::SeqCst) { + return + } let peers: Vec = self.peers.keys().map(|p| *p).collect(); for peer in peers { self.download_new(protocol, peer); @@ -578,6 +583,15 @@ impl ChainSync { self.justifications.dispatch(&mut self.peers, protocol); } + pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + self.justifications.justification_import_result(hash, number, success); + } + + pub fn stop(&self) { + self.is_stopping.store(true, Ordering::SeqCst); + self.import_queue.stop(); + } + /// Notify about successful import of the given block. pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { trace!(target: "sync", "Block imported successfully {} ({})", number, hash); diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index eba2237da8..aca789ce2b 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -16,55 +16,15 @@ //! Testing block import logic. -use consensus::import_queue::{import_single_block, process_import_result}; -use consensus::import_queue::{AsyncImportQueueData, BasicQueue, BlockImportError, BlockImportResult}; +use consensus::import_queue::{import_single_block, BasicQueue, BlockImportError, BlockImportResult}; use test_client::{self, TestClient}; use test_client::runtime::{Block, Hash}; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::NumberFor; -use std::cell::Cell; use super::*; -struct TestLink { - imported: Cell, - maintains: Cell, - disconnects: Cell, - restarts: Cell, -} +struct TestLink {} -impl TestLink { - fn new() -> TestLink { - TestLink { - imported: Cell::new(0), - maintains: Cell::new(0), - disconnects: Cell::new(0), - restarts: Cell::new(0), - } - } - - fn total(&self) -> usize { - self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get() - } -} - -impl Link for TestLink { - fn block_imported(&self, _hash: &Hash, _number: NumberFor) { - self.imported.set(self.imported.get() + 1); - } - fn maintain_sync(&self) { - self.maintains.set(self.maintains.get() + 1); - } - fn useless_peer(&self, _: NodeIndex, _: &str) { - self.disconnects.set(self.disconnects.get() + 1); - } - fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) { - self.useless_peer(id, r); - self.restart(); - } - fn restart(&self) { - self.restarts.set(self.restarts.get() + 1); - } -} +impl Link for TestLink {} fn prepare_good_block() -> (client::Client, Hash, u64, IncomingBlock) { let client = test_client::new(); @@ -85,19 +45,19 @@ fn prepare_good_block() -> (client::Client(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - assert_eq!(link.imported.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - assert_eq!(link.imported.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); - assert_eq!(link.total(), 2); - assert_eq!(link.disconnects.get(), 1); - assert_eq!(link.restarts.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::UnknownParent)), 0); - assert_eq!(link.total(), 1); - assert_eq!(link.restarts.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::Error)), 0); - assert_eq!(link.total(), 1); - assert_eq!(link.restarts.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::VerificationFailed(Some(0), String::new()))), 0); - assert_eq!(link.total(), 2); - assert_eq!(link.restarts.get(), 1); - assert_eq!(link.disconnects.get(), 1); -} - -#[test] -fn import_many_blocks_stops_when_stopping() { - let (_, _, _, block) = prepare_good_block(); - let qdata = AsyncImportQueueData::new(); - let verifier = Arc::new(PassThroughVerifier(true)); - qdata.stop(); - let client = test_client::new(); - assert!(!import_many_blocks( - &client, - &mut TestLink::new(), - Some(&qdata), - (BlockOrigin::File, vec![block.clone(), block]), - verifier - )); -} - #[test] fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None); - queue.start(TestLink::new()).unwrap(); + queue.start(Box::new(TestLink{})).unwrap(); drop(queue); } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 838e614146..ade77ee51d 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -23,13 +23,14 @@ mod sync; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::thread; use std::time::Duration; use log::trace; use client; use client::block_builder::BlockBuilder; use crate::config::ProtocolConfig; -use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock}; +use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock}; use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier}; use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; @@ -45,7 +46,7 @@ use parking_lot::Mutex; use primitives::{H256, Ed25519AuthorityId}; use crate::protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus}; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, Zero, NumberFor}; +use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::Justification; use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, TransactionPool}; use crate::specialization::NetworkSpecialization; @@ -54,34 +55,6 @@ use test_client; pub use test_client::runtime::{Block, Extrinsic, Hash, Transfer}; pub use test_client::TestClient; -#[cfg(any(test, feature = "test-helpers"))] -use std::cell::RefCell; - -#[cfg(any(test, feature = "test-helpers"))] -struct ImportCB(RefCell>) -> bool>>>); - -#[cfg(any(test, feature = "test-helpers"))] -impl ImportCB { - fn new() -> Self { - ImportCB(RefCell::new(None)) - } - fn set(&self, cb: Box) - where F: 'static + Fn(BlockOrigin, Vec>) -> bool, - { - *self.0.borrow_mut() = Some(cb); - } - fn call(&self, origin: BlockOrigin, data: Vec>) -> bool { - let b = self.0.borrow(); - b.as_ref().expect("The Callback has been set before. qed.")(origin, data) - } -} - -#[cfg(any(test, feature = "test-helpers"))] -unsafe impl Send for ImportCB {} -#[cfg(any(test, feature = "test-helpers"))] -unsafe impl Sync for ImportCB {} - - #[cfg(any(test, feature = "test-helpers"))] /// A Verifier that accepts all blocks and passes them on with the configured /// finality to be imported. @@ -114,101 +87,10 @@ impl Verifier for PassThroughVerifier { } /// A link implementation that does nothing. -pub struct NoopLink; +pub struct NoopLink { } impl Link for NoopLink { } -/// Blocks import queue that is importing blocks in the same thread. -/// The boolean value indicates whether blocks should be imported without instant finality. -#[cfg(any(test, feature = "test-helpers"))] -pub struct SyncImportQueue> { - verifier: Arc, - link: ImportCB, - block_import: SharedBlockImport, - justification_import: Option>, -} - -#[cfg(any(test, feature = "test-helpers"))] -impl> SyncImportQueue { - /// Create a new SyncImportQueue wrapping the given Verifier and block import - /// handle. - pub fn new(verifier: Arc, block_import: SharedBlockImport, justification_import: Option>) -> Self { - let queue = SyncImportQueue { - verifier, - link: ImportCB::new(), - block_import, - justification_import, - }; - - let v = queue.verifier.clone(); - let import_handle = queue.block_import.clone(); - queue.link.set(Box::new(move |origin, new_blocks| { - let verifier = v.clone(); - import_many_blocks( - &*import_handle, - &NoopLink, - None, - (origin, new_blocks), - verifier, - ) - })); - - queue - } -} - -#[cfg(any(test, feature = "test-helpers"))] -impl> ImportQueue for SyncImportQueue -{ - fn start>( - &self, - link: L, - ) -> Result<(), std::io::Error> { - let v = self.verifier.clone(); - let import_handle = self.block_import.clone(); - self.link.set(Box::new(move |origin, new_blocks| { - let verifier = v.clone(); - import_many_blocks( - &*import_handle, - &link, - None, - (origin, new_blocks), - verifier - ) - })); - Ok(()) - } - fn clear(&self) { } - - fn stop(&self) { } - - fn status(&self) -> ImportQueueStatus { - ImportQueueStatus { - importing_count: 0, - best_importing_number: Zero::zero(), - } - } - - fn is_importing(&self, _hash: &B::Hash) -> bool { - false - } - - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { - self.link.call(origin, blocks); - } - - fn import_justification( - &self, - hash: B::Hash, - number: NumberFor, - justification: Justification, - ) -> bool { - self.justification_import.as_ref().map(|justification_import| { - justification_import.import_justification(hash, number, justification).is_ok() - }).unwrap_or(false) - } -} - /// The test specialization. pub struct DummySpecialization { } @@ -234,19 +116,20 @@ impl NetworkSpecialization for DummySpecialization { pub type PeersClient = client::Client; -pub struct Peer, D> { +pub struct Peer { client: Arc, pub protocol_sender: Sender>, + network_port: Mutex>, - import_queue: Arc>, + pub import_queue: Box>, network_sender: NetworkChan, pub data: D, } -impl, D> Peer { +impl Peer { fn new( client: Arc, - import_queue: Arc>, + import_queue: Box>, protocol_sender: Sender>, network_sender: NetworkChan, network_port: NetworkPort, @@ -276,7 +159,7 @@ impl, D> Peer { network_sender: self.network_sender.clone(), }; - self.import_queue.start(network_link).expect("Test ImportQueue always starts"); + self.import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts"); let _ = self .protocol_sender .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); @@ -327,7 +210,8 @@ impl, D> Peer { /// Whether this peer is done syncing (has no messages to send). fn is_done(&self) -> bool { - self.network_port.lock().receiver().is_empty() + self.import_queue.status().importing_count == 0 && + self.network_port.lock().receiver().is_empty() } /// Execute a "sync step". This is called for each peer after it sends a packet. @@ -437,8 +321,6 @@ impl, D> Peer { let header = block.header.clone(); at = BlockId::Hash(hash); - // NOTE: if we use a non-synchronous queue in the test-net in the future, - // this may not work. self.import_queue.import_blocks( origin, vec![IncomingBlock { @@ -447,8 +329,12 @@ impl, D> Peer { header: Some(header), body: Some(block.extrinsics), justification: None, - } - ]); + }], + ); + // Simulate a synchronous import. + while self.import_queue.status().importing_count > 0 { + thread::sleep(Duration::from_millis(20)); + } } } @@ -516,9 +402,9 @@ pub trait TestNetFactory: Sized { fn make_verifier(&self, client: Arc, config: &ProtocolConfig) -> Arc; /// Get reference to peer. - fn peer(&self, i: usize) -> &Peer; - fn peers(&self) -> &Vec>>; - fn mut_peers>>)>(&mut self, closure: F); + fn peer(&self, i: usize) -> &Peer; + fn peers(&self) -> &Vec>>; + fn mut_peers>>)>(&mut self, closure: F); fn started(&self) -> bool; fn set_started(&mut self, now: bool); @@ -553,8 +439,8 @@ pub trait TestNetFactory: Sized { let (block_import, justification_import, data) = self.make_block_import(client.clone()); let (network_sender, network_port) = network_channel(ProtocolId::default()); - let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import, justification_import)); - let specialization = DummySpecialization { }; + let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); + let specialization = DummySpecialization {}; let protocol_sender = Protocol::new( network_sender.clone(), config.clone(), @@ -727,8 +613,8 @@ pub trait TestNetFactory: Sized { } pub struct TestNet { - peers: Vec>>, - started: bool + peers: Vec>>, + started: bool, } impl TestNetFactory for TestNet { @@ -749,15 +635,15 @@ impl TestNetFactory for TestNet { Arc::new(PassThroughVerifier(false)) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer<()> { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F ) { + fn mut_peers>>)>(&mut self, closure: F) { closure(&mut self.peers); } @@ -802,15 +688,15 @@ impl TestNetFactory for JustificationTestNet { self.0.make_verifier(client, config) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { self.0.peer(i) } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { self.0.peers() } - fn mut_peers>>)>(&mut self, closure: F ) { + fn mut_peers>>)>(&mut self, closure: F ) { self.0.mut_peers(closure) } diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index edf670e826..64d74b002f 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -20,7 +20,6 @@ use crate::config::Roles; use consensus::BlockOrigin; use network_libp2p::NodeIndex; use crate::sync::SyncState; -use std::{thread, time}; use std::collections::HashSet; use super::*; @@ -55,11 +54,6 @@ fn sync_long_chain_works() { let mut net = TestNet::new(2); net.peer(1).push_blocks(500, false); net.sync(); - // Wait for peer 0 to import blocks received over the network. - thread::sleep(time::Duration::from_millis(1000)); - net.sync(); - // Wait for peers to get up to speed. - thread::sleep(time::Duration::from_millis(1000)); assert!(net.peer(0).client.backend().as_in_memory().blockchain() .equals_to(net.peer(1).client.backend().as_in_memory().blockchain())); } @@ -149,6 +143,7 @@ fn own_blocks_are_announced() { let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap(); net.peer(0).on_block_imported(header.hash(), &header); net.sync(); + assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1); assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1); let peer0_chain = net.peer(0).client.backend().as_in_memory().blockchain().clone(); diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index 953bb8c375..487c7ac0a6 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -131,7 +131,7 @@ pub fn import_blocks( let (wait_send, wait_recv) = std::sync::mpsc::channel(); let wait_link = WaitLink::new(wait_send); - queue.start(wait_link)?; + queue.start(Box::new(wait_link))?; let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index f4d8daead0..6f7e5a5e9c 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -123,7 +123,7 @@ impl Service { }; let (client, on_demand) = Components::build_client(&config, executor)?; - let import_queue = Arc::new(Components::build_import_queue(&mut config, client.clone())?); + let import_queue = Box::new(Components::build_import_queue(&mut config, client.clone())?); let best_header = client.best_block_header()?; let version = config.full_version(); diff --git a/substrate/node-template/src/service.rs b/substrate/node-template/src/service.rs index b1fb0faf7f..cc51fba51c 100644 --- a/substrate/node-template/src/service.rs +++ b/substrate/node-template/src/service.rs @@ -84,8 +84,6 @@ construct_service_factory! { { |config, executor| >::new(config, executor) }, FullImportQueue = AuraImportQueue< Self::Block, - FullClient, - NothingExtra, > { |config: &mut FactoryFullConfiguration , client: Arc>| import_queue( @@ -99,8 +97,6 @@ construct_service_factory! { }, LightImportQueue = AuraImportQueue< Self::Block, - LightClient, - NothingExtra, > { |config: &mut FactoryFullConfiguration, client: Arc>| import_queue( diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index a773e3cf95..ff241a4c98 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -120,12 +120,8 @@ construct_service_factory! { }, LightService = LightComponents { |config, executor| >::new(config, executor) }, - FullImportQueue = AuraImportQueue< - Self::Block, - FullClient, - NothingExtra, - > - { |config: &mut FactoryFullConfiguration, client: Arc>| { + FullImportQueue = AuraImportQueue + { |config: &mut FactoryFullConfiguration , client: Arc>| { let slot_duration = SlotDuration::get_or_compute(&*client)?; let (block_import, link_half) = grandpa::block_import::<_, _, _, RuntimeApi, FullClient>( @@ -145,11 +141,7 @@ construct_service_factory! { config.custom.inherent_data_providers.clone(), ).map_err(Into::into) }}, - LightImportQueue = AuraImportQueue< - Self::Block, - LightClient, - NothingExtra, - > + LightImportQueue = AuraImportQueue { |config: &FactoryFullConfiguration, client: Arc>| { import_queue( SlotDuration::get_or_compute(&*client)?,