diff --git a/substrate/core/consensus/common/src/block_import.rs b/substrate/core/consensus/common/src/block_import.rs index 363ba18713..6ce4acdf39 100644 --- a/substrate/core/consensus/common/src/block_import.rs +++ b/substrate/core/consensus/common/src/block_import.rs @@ -195,7 +195,7 @@ pub trait JustificationImport { type Error: ::std::error::Error + Send + 'static; /// Called by the import queue when it is started. - fn on_start(&self, _link: &dyn crate::import_queue::Link) { } + fn on_start(&self, _link: &mut dyn crate::import_queue::Link) { } /// Import a Block justification and finalize the given block. fn import_justification( @@ -211,7 +211,7 @@ pub trait FinalityProofImport { type Error: std::error::Error + Send + 'static; /// Called by the import queue when it is started. - fn on_start(&self, _link: &dyn crate::import_queue::Link) { } + fn on_start(&self, _link: &mut dyn crate::import_queue::Link) { } /// Import a Block justification and finalize the given block. Returns finalized block or error. fn import_finality_proof( diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index c2ffcd4239..1753d63ff0 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -26,8 +26,8 @@ //! queues to be instantiated simply. use std::{sync::Arc, thread, collections::HashMap}; -use crossbeam_channel::{self as channel, Receiver, Sender}; -use parking_lot::Mutex; +use crossbeam_channel::{self as channel, Sender}; +use futures::{prelude::*, sync::mpsc}; use runtime_primitives::{Justification, traits::{ Block as BlockT, Header as HeaderT, NumberFor, }}; @@ -90,154 +90,63 @@ pub trait Verifier: Send + Sync { } /// Blocks import queue API. -pub trait ImportQueue: Send + Sync { - /// Start background work for the queue as necessary. - /// - /// This is called automatically by the network service when synchronization - /// begins. - fn start(&self, _link: Box>) -> Result<(), std::io::Error> { - Ok(()) - } +/// +/// The `import_*` methods can be called in order to send elements for the import queue to verify. +/// Afterwards, call `poll_actions` to determine how to respond to these elements. +pub trait ImportQueue: Send { /// Import bunch of blocks. - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>); /// Import a block justification. - fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification); + fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ); /// Import block finality proof. - fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec); + fn import_finality_proof( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + finality_proof: Vec + ); + /// Polls for actions to perform on the network. + /// + /// This method should behave in a way similar to `Future::poll`. It can register the current + /// task and notify later when more actions are ready to be polled. To continue the comparison, + /// it is as if this method always returned `Ok(Async::NotReady)`. + fn poll_actions(&mut self, link: &mut dyn Link); } -/// Basic block import queue that performs import in the caller thread. -pub struct BasicSyncQueue> { - data: Arc>, -} - -struct BasicSyncQueueData> { - link: Mutex>>>, - block_import: SharedBlockImport, - verifier: Arc, - justification_import: Option>, - finality_proof_import: Option>, -} - -impl> BasicSyncQueue { - pub fn new( - block_import: SharedBlockImport, - verifier: Arc, - justification_import: Option>, - finality_proof_import: Option>, - ) -> Self { - BasicSyncQueue { - data: Arc::new(BasicSyncQueueData { - link: Mutex::new(None), - block_import, - verifier, - justification_import, - finality_proof_import, - }), - } - } -} - -impl> ImportQueue for BasicSyncQueue { - fn start(&self, link: Box>) -> Result<(), std::io::Error> { - if let Some(justification_import) = self.data.justification_import.as_ref() { - justification_import.on_start(&*link); - } - *self.data.link.lock() = Some(link); - Ok(()) - } - - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { - if blocks.is_empty() { - return; - } - - let (imported, count, results) = import_many_blocks( - &*self.data.block_import, - origin, - blocks, - self.data.verifier.clone(), - ); - - let link_ref = self.data.link.lock(); - let link = match link_ref.as_ref() { - Some(link) => link, - None => { - trace!(target: "sync", "Trying to import blocks before starting import queue"); - return; - }, - }; - - process_import_results(&**link, results); - - trace!(target: "sync", "Imported {} of {}", imported, count); - } - - fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { - import_single_justification( - &*self.data.link.lock(), - &self.data.justification_import, - who, - hash, - number, - justification, - ) - } - - fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let result = import_single_finality_proof( - &self.data.finality_proof_import, - &*self.data.verifier, - &who, - hash, - number, - finality_proof, - ); - if let Some(link) = self.data.link.lock().as_ref() { - link.finality_proof_imported(who, (hash, number), result); - } - } -} - -/// Interface to a basic block import queue that is importing blocks -/// sequentially in a separate thread, with pluggable verification. -#[derive(Clone)] +/// Interface to a basic block import queue that is importing blocks sequentially in a separate +/// thread, with pluggable verification. pub struct BasicQueue { - sender: Option>>, + /// Channel to send messages to the background thread. + sender: Option>>, + /// Results coming from the worker thread. + result_port: BufferedLinkReceiver, + /// Sent through the link as soon as possible. + finality_proof_request_builder: Option>, } impl Drop for BasicQueue { fn drop(&mut self) { if let Some(sender) = self.sender.take() { let (shutdown_sender, shutdown_receiver) = channel::unbounded(); - if sender.send(BlockImportMsg::Shutdown(shutdown_sender)).is_ok() { + if sender.send(ToWorkerMsg::Shutdown(shutdown_sender)).is_ok() { let _ = shutdown_receiver.recv(); } } } } -/// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter". -/// "BasicQueue" itself does not keep any state or do any importing work, and -/// can therefore be send to other threads. -/// -/// "BasicQueue" implements "ImportQueue" by sending messages to the -/// "BlockImporter", which runs in it's own thread. -/// -/// The "BlockImporter" is responsible for handling incoming requests from the -/// "BasicQueue". Some of these requests are handled by the "BlockImporter" -/// itself, such as "is_importing", "status", and justifications. -/// -/// The "import block" work will be offloaded to a single "BlockImportWorker", -/// running in another thread. Offloading the work is done via a channel, -/// ensuring blocks in this implementation are imported sequentially and in -/// order (as received by the "BlockImporter"). -/// -/// As long as the "BasicQueue" is not dropped, the "BlockImporter" will keep -/// running. The "BlockImporter" owns a sender to the "BlockImportWorker", -/// ensuring that the worker is kept alive until that sender is dropped. impl BasicQueue { /// Instantiate a new basic queue, with given verifier. + /// + /// This creates a background thread, and calls `on_start` on the justification importer and + /// finality proof importer. pub fn new>( verifier: Arc, block_import: SharedBlockImport, @@ -245,24 +154,19 @@ impl BasicQueue { finality_proof_import: Option>, finality_proof_request_builder: Option>, ) -> Self { - let (result_sender, result_port) = channel::unbounded(); + let (result_sender, result_port) = buffered_link(); let worker_sender = BlockImportWorker::new( result_sender, - verifier.clone(), - block_import, - finality_proof_import.clone(), - ); - let importer_sender = BlockImporter::new( - result_port, - worker_sender, verifier, + block_import, justification_import, finality_proof_import, - finality_proof_request_builder, ); Self { - sender: Some(importer_sender), + sender: Some(worker_sender), + result_port, + finality_proof_request_builder, } } @@ -273,291 +177,116 @@ impl BasicQueue { #[cfg(any(test, feature = "test-helpers"))] pub fn synchronize(&self) { if let Some(ref sender) = self.sender { - let _ = sender.send(BlockImportMsg::Synchronize); + let _ = sender.send(ToWorkerMsg::Synchronize); } } } impl ImportQueue for BasicQueue { - fn start(&self, link: Box>) -> Result<(), std::io::Error> { - let connect_err = || Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to connect import queue threads", - )); - if let Some(ref sender) = self.sender { - let (start_sender, start_port) = channel::unbounded(); - let _ = sender.send(BlockImportMsg::Start(link, start_sender)); - start_port.recv().unwrap_or_else(|_| connect_err()) - } else { - connect_err() - } - } - - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return; } if let Some(ref sender) = self.sender { - let _ = sender.send(BlockImportMsg::ImportBlocks(origin, blocks)); + trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); + let _ = sender.send(ToWorkerMsg::ImportBlocks(origin, blocks)); } } - fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { + fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ) { if let Some(ref sender) = self.sender { - let _ = sender.send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification)); + let _ = sender.send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification)); } } - fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { if let Some(ref sender) = self.sender { - let _ = sender.send(BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof)); + trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); + let _ = sender.send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); } } + + fn poll_actions(&mut self, link: &mut dyn Link) { + if let Some(fprb) = self.finality_proof_request_builder.take() { + link.set_finality_proof_request_builder(fprb); + } + + self.result_port.poll_actions(link); + } } -pub enum BlockImportMsg { +/// Message destinated to the background worker. +#[derive(Debug)] +enum ToWorkerMsg { ImportBlocks(BlockOrigin, Vec>), ImportJustification(Origin, B::Hash, NumberFor, Justification), ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), - Start(Box>, Sender>), Shutdown(Sender<()>), #[cfg(any(test, feature = "test-helpers"))] Synchronize, } -#[cfg_attr(test, derive(Debug))] -pub enum BlockImportWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportedBlocks( - Vec<( - Result>, BlockImportError>, - B::Hash, - )>, - ), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), - ImportedFinalityProof(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), - Shutdown(Sender<()>), - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, -} - -enum ImportMsgType { - FromWorker(BlockImportWorkerMsg), - FromNetwork(BlockImportMsg), -} - -struct BlockImporter { - port: Receiver>, - result_port: Receiver>, - worker_sender: Option>>, - link: Option>>, - verifier: Arc>, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, -} - -impl BlockImporter { - fn new( - result_port: Receiver>, - worker_sender: Sender>, - verifier: Arc>, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, - ) -> Sender> { - trace!(target: "block_import", "Creating new Block Importer!"); - let (sender, port) = channel::bounded(4); - let _ = thread::Builder::new() - .name("ImportQueue".into()) - .spawn(move || { - let mut importer = BlockImporter { - port, - result_port, - worker_sender: Some(worker_sender), - link: None, - verifier, - justification_import, - finality_proof_import, - finality_proof_request_builder, - }; - while importer.run() { - // Importing until all senders have been dropped... - } - }) - .expect("ImportQueue thread spawning failed"); - sender - } - - fn run(&mut self) -> bool { - trace!(target: "import_queue", "Running import queue"); - let msg = select! { - recv(self.port) -> msg => { - match msg { - // Our sender has been dropped, quitting. - Err(_) => return false, - Ok(msg) => ImportMsgType::FromNetwork(msg) - } - }, - recv(self.result_port) -> msg => { - match msg { - Err(_) => unreachable!("1. We hold a sender to the Worker, 2. it should not quit until that sender is dropped; qed"), - Ok(msg) => ImportMsgType::FromWorker(msg), - } - } - }; - match msg { - ImportMsgType::FromNetwork(msg) => self.handle_network_msg(msg), - ImportMsgType::FromWorker(msg) => self.handle_worker_msg(msg), - } - } - - fn handle_network_msg(&mut self, msg: BlockImportMsg) -> bool { - match msg { - BlockImportMsg::ImportBlocks(origin, incoming_blocks) => { - self.handle_import_blocks(origin, incoming_blocks) - }, - BlockImportMsg::ImportJustification(who, hash, number, justification) => { - import_single_justification( - &self.link, - &self.justification_import, - who, - hash, - number, - justification, - ); - }, - BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof) => { - self.handle_import_finality_proof(who, hash, number, finality_proof) - }, - BlockImportMsg::Start(link, sender) => { - if let Some(finality_proof_request_builder) = self.finality_proof_request_builder.take() { - link.set_finality_proof_request_builder(finality_proof_request_builder); - } - if let Some(justification_import) = self.justification_import.as_ref() { - justification_import.on_start(&*link); - } - if let Some(finality_proof_import) = self.finality_proof_import.as_ref() { - finality_proof_import.on_start(&*link); - } - self.link = Some(link); - let _ = sender.send(Ok(())); - }, - BlockImportMsg::Shutdown(result_sender) => { - // stop worker thread - if let Some(worker_sender) = self.worker_sender.take() { - let (sender, receiver) = channel::unbounded(); - if worker_sender.send(BlockImportWorkerMsg::Shutdown(sender)).is_ok() { - let _ = receiver.recv(); - } - } - // send shutdown notification - let _ = result_sender.send(()); - return false; - }, - #[cfg(any(test, feature = "test-helpers"))] - BlockImportMsg::Synchronize => { - trace!(target: "sync", "Received synchronization message"); - if let Some(ref worker_sender) = self.worker_sender { - let _ = worker_sender.send(BlockImportWorkerMsg::Synchronize); - } - }, - } - true - } - - fn handle_worker_msg(&mut self, msg: BlockImportWorkerMsg) -> bool { - let link = match self.link.as_ref() { - Some(link) => link, - None => { - trace!(target: "sync", "Received import result while import-queue has no link"); - return true; - }, - }; - - let results = match msg { - BlockImportWorkerMsg::ImportedBlocks(results) => (results), - BlockImportWorkerMsg::ImportedFinalityProof(who, request_block, finalization_result) => { - link.finality_proof_imported(who, request_block, finalization_result); - return true; - }, - #[cfg(any(test, feature = "test-helpers"))] - BlockImportWorkerMsg::Synchronize => { - trace!(target: "sync", "Synchronizing link"); - link.synchronized(); - return true; - }, - BlockImportWorkerMsg::ImportBlocks(_, _) - | BlockImportWorkerMsg::ImportFinalityProof(_, _, _, _) - | BlockImportWorkerMsg::Shutdown(_) - => unreachable!("Import Worker does not send Import*/Shutdown messages; qed"), - }; - - process_import_results(&**link, results); - true - } - - fn handle_import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - if let Some(ref worker_sender) = self.worker_sender { - trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let _ = worker_sender.send(BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); - } - } - - fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - if let Some(ref worker_sender) = self.worker_sender { - trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let _ = worker_sender.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks)); - } - } -} - struct BlockImportWorker> { - result_sender: Sender>, + result_sender: BufferedLinkSender, block_import: SharedBlockImport, + justification_import: Option>, finality_proof_import: Option>, verifier: Arc, } impl> BlockImportWorker { - pub fn new( - result_sender: Sender>, + fn new( + result_sender: BufferedLinkSender, verifier: Arc, block_import: SharedBlockImport, + justification_import: Option>, finality_proof_import: Option>, - ) -> Sender> { + ) -> Sender> { let (sender, port) = channel::bounded(4); let _ = thread::Builder::new() .name("ImportQueueWorker".into()) .spawn(move || { - let worker = BlockImportWorker { + let mut worker = BlockImportWorker { result_sender, verifier, + justification_import, block_import, finality_proof_import, }; + if let Some(justification_import) = worker.justification_import.as_ref() { + justification_import.on_start(&mut worker.result_sender); + } + if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { + finality_proof_import.on_start(&mut worker.result_sender); + } for msg in port.iter() { // Working until all senders have been dropped... match msg { - BlockImportWorkerMsg::ImportBlocks(origin, blocks) => { + ToWorkerMsg::ImportBlocks(origin, blocks) => { worker.import_a_batch_of_blocks(origin, blocks); }, - BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { + ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { worker.import_finality_proof(who, hash, number, proof); }, - BlockImportWorkerMsg::Shutdown(result_sender) => { + ToWorkerMsg::ImportJustification(who, hash, number, justification) => { + worker.import_justification(who, hash, number, justification); + } + ToWorkerMsg::Shutdown(result_sender) => { let _ = result_sender.send(()); break; }, #[cfg(any(test, feature = "test-helpers"))] - BlockImportWorkerMsg::Synchronize => { + ToWorkerMsg::Synchronize => { trace!(target: "sync", "Sending sync message"); - let _ = worker.result_sender.send(BlockImportWorkerMsg::Synchronize); + worker.result_sender.synchronized(); }, - BlockImportWorkerMsg::ImportedBlocks(_) - | BlockImportWorkerMsg::ImportedFinalityProof(_, _, _) - => unreachable!("Import Worker does not receive the Imported* messages; qed"), } } }) @@ -565,7 +294,7 @@ impl> BlockImportWorker { sender } - fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { let (imported, count, results) = import_many_blocks( &*self.block_import, origin, @@ -573,26 +302,124 @@ impl> BlockImportWorker { self.verifier.clone(), ); - let _ = self - .result_sender - .send(BlockImportWorkerMsg::ImportedBlocks(results)); - trace!(target: "sync", "Imported {} of {}", imported, count); + + let mut has_error = false; + let mut hashes = vec![]; + for (result, hash) in results { + hashes.push(hash); + + if has_error { + continue; + } + + if result.is_err() { + has_error = true; + } + + match result { + Ok(BlockImportResult::ImportedKnown(number)) => self.result_sender.block_imported(&hash, number), + Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { + self.result_sender.block_imported(&hash, number); + + if aux.clear_justification_requests { + trace!( + target: "sync", + "Block imported clears all pending justification requests {}: {:?}", + number, + hash + ); + self.result_sender.clear_justification_requests(); + } + + if aux.needs_justification { + trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + self.result_sender.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(peer) = who { + info!("Sent block with bad justification to import"); + self.result_sender.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); + } + } + + if aux.needs_finality_proof { + trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); + self.result_sender.request_finality_proof(&hash, number); + } + }, + Err(BlockImportError::IncompleteHeader(who)) => { + if let Some(peer) = who { + info!("Peer sent block with incomplete header to import"); + self.result_sender.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::VerificationFailed(who, e)) => { + if let Some(peer) = who { + info!("Verification failed from peer: {}", e); + self.result_sender.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::BadBlock(who)) => { + if let Some(peer) = who { + info!("Bad block"); + self.result_sender.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { + self.result_sender.restart(); + }, + }; + } + + self.result_sender.blocks_processed(hashes, has_error); } - fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let result = import_single_finality_proof( - &self.finality_proof_import, - &*self.verifier, - &who, - hash, - number, - finality_proof, - ); + fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + let result = self.finality_proof_import.as_ref().map(|finality_proof_import| { + finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier) + .map_err(|e| { + debug!( + "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + }) + }).unwrap_or(Err(())); - let _ = self - .result_sender - .send(BlockImportWorkerMsg::ImportedFinalityProof(who, (hash, number), result)); + trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); + self.result_sender.finality_proof_imported(who, (hash, number), result); + } + + fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ) { + let success = self.justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification) + .map_err(|e| { + debug!( + target: "sync", + "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + e + }).is_ok() + }).unwrap_or(false); + + self.result_sender.justification_imported(who, &hash, number, success); } } @@ -600,37 +427,177 @@ impl> BlockImportWorker { /// algorithm. pub trait Link: Send { /// Block imported. - fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) {} + fn block_imported(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Batch of blocks imported, with or without error. - fn blocks_processed(&self, _processed_blocks: Vec, _has_error: bool) {} + fn blocks_processed(&mut self, _processed_blocks: Vec, _has_error: bool) {} /// Justification import result. - fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor, _success: bool) {} + fn justification_imported(&mut self, _who: Origin, _hash: &B::Hash, _number: NumberFor, _success: bool) {} /// Clear all pending justification requests. - fn clear_justification_requests(&self) {} + fn clear_justification_requests(&mut self) {} /// Request a justification for the given block. - fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) {} + fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Finality proof import result. /// /// Even though we have asked for finality proof of block A, provider could return proof of /// some earlier block B, if the proof for A was too large. The sync module should continue /// asking for proof of A in this case. fn finality_proof_imported( - &self, + &mut self, _who: Origin, _request_block: (B::Hash, NumberFor), _finalization_result: Result<(B::Hash, NumberFor), ()>, ) {} /// Request a finality proof for the given block. - fn request_finality_proof(&self, _hash: &B::Hash, _number: NumberFor) {} + fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Remember finality proof request builder on start. - fn set_finality_proof_request_builder(&self, _request_builder: SharedFinalityProofRequestBuilder) {} + fn set_finality_proof_request_builder(&mut self, _request_builder: SharedFinalityProofRequestBuilder) {} /// Adjusts the reputation of the given peer. - fn report_peer(&self, _who: Origin, _reputation_change: i32) {} + fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {} /// Restart sync. - fn restart(&self) {} + fn restart(&mut self) {} /// Synchronization request has been processed. #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&self) {} + fn synchronized(&mut self) {} +} + +/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and +/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer +/// them to another link. +pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { + let (tx, rx) = mpsc::unbounded(); + let tx = BufferedLinkSender { tx }; + let rx = BufferedLinkReceiver { rx }; + (tx, rx) +} + +/// See [`buffered_link`]. +pub struct BufferedLinkSender { + tx: mpsc::UnboundedSender>, +} + +/// Internal buffered message. +enum BlockImportWorkerMsg { + BlockImported(B::Hash, NumberFor), + BlocksProcessed(Vec, bool), + JustificationImported(Origin, B::Hash, NumberFor, bool), + ClearJustificationRequests, + RequestJustification(B::Hash, NumberFor), + FinalityProofImported(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), + RequestFinalityProof(B::Hash, NumberFor), + SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), + ReportPeer(Origin, i32), + Restart, + #[cfg(any(test, feature = "test-helpers"))] + Synchronized, +} + +impl Link for BufferedLinkSender { + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlockImported(hash.clone(), number)); + } + + fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(processed_blocks, has_error)); + } + + fn justification_imported( + &mut self, + who: Origin, + hash: &B::Hash, + number: NumberFor, + success: bool + ) { + let msg = BlockImportWorkerMsg::JustificationImported(who, hash.clone(), number, success); + let _ = self.tx.unbounded_send(msg); + } + + fn clear_justification_requests(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ClearJustificationRequests); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestJustification(hash.clone(), number)); + } + + fn finality_proof_imported( + &mut self, + who: Origin, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let msg = BlockImportWorkerMsg::FinalityProofImported(who, request_block, finalization_result); + let _ = self.tx.unbounded_send(msg); + } + + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number)); + } + + fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::SetFinalityProofRequestBuilder(request_builder)); + } + + fn report_peer(&mut self, who: Origin, reputation_change: i32) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ReportPeer(who, reputation_change)); + } + + fn restart(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart); + } + + #[cfg(any(test, feature = "test-helpers"))] + fn synchronized(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Synchronized); + } +} + +/// See [`buffered_link`]. +pub struct BufferedLinkReceiver { + rx: mpsc::UnboundedReceiver>, +} + +impl BufferedLinkReceiver { + /// Polls for the buffered link actions. Any enqueued action will be propagated to the link + /// passed as parameter. + /// + /// This method should behave in a way similar to `Future::poll`. It can register the current + /// task and notify later when more actions are ready to be polled. To continue the comparison, + /// it is as if this method always returned `Ok(Async::NotReady)`. + pub fn poll_actions(&mut self, link: &mut dyn Link) { + loop { + let msg = if let Ok(Async::Ready(Some(msg))) = self.rx.poll() { + msg + } else { + break + }; + + match msg { + BlockImportWorkerMsg::BlockImported(hash, number) => + link.block_imported(&hash, number), + BlockImportWorkerMsg::BlocksProcessed(blocks, has_error) => + link.blocks_processed(blocks, has_error), + BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => + link.justification_imported(who, &hash, number, success), + BlockImportWorkerMsg::ClearJustificationRequests => + link.clear_justification_requests(), + BlockImportWorkerMsg::RequestJustification(hash, number) => + link.request_justification(&hash, number), + BlockImportWorkerMsg::FinalityProofImported(who, block, result) => + link.finality_proof_imported(who, block, result), + BlockImportWorkerMsg::RequestFinalityProof(hash, number) => + link.request_finality_proof(&hash, number), + BlockImportWorkerMsg::SetFinalityProofRequestBuilder(builder) => + link.set_finality_proof_request_builder(builder), + BlockImportWorkerMsg::ReportPeer(who, reput) => + link.report_peer(who, reput), + BlockImportWorkerMsg::Restart => + link.restart(), + #[cfg(any(test, feature = "test-helpers"))] + BlockImportWorkerMsg::Synchronized => + link.synchronized(), + } + } + } } /// Block import successful result. @@ -657,140 +624,6 @@ pub enum BlockImportError { Error, } -/// Imports single notification and send notification to the link (if provided). -fn import_single_justification( - link: &Option>>, - justification_import: &Option>, - who: Origin, - hash: B::Hash, - number: NumberFor, - justification: Justification, -) { - let success = justification_import.as_ref().map(|justification_import| { - justification_import.import_justification(hash, number, justification) - .map_err(|e| { - debug!( - target: "sync", - "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", - e, - hash, - number, - who, - ); - e - }).is_ok() - }).unwrap_or(false); - - if let Some(ref link) = link { - link.justification_imported(who, &hash, number, success); - } -} - -/// Imports single finality_proof. -fn import_single_finality_proof>( - finality_proof_import: &Option>, - verifier: &V, - who: &Origin, - hash: B::Hash, - number: NumberFor, - finality_proof: Vec, -) -> Result<(B::Hash, NumberFor), ()> { - let result = finality_proof_import.as_ref().map(|finality_proof_import| { - finality_proof_import.import_finality_proof(hash, number, finality_proof, verifier) - .map_err(|e| { - debug!( - "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", - e, - hash, - number, - who, - ); - }) - }).unwrap_or(Err(())); - - trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); - - result -} - -/// Process result of block(s) import. -fn process_import_results( - link: &dyn Link, - results: Vec<( - Result>, BlockImportError>, - B::Hash, - )>, -) -{ - let mut has_error = false; - let mut hashes = vec![]; - for (result, hash) in results { - hashes.push(hash); - - if has_error { - continue; - } - - if result.is_err() { - has_error = true; - } - - match result { - Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), - Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { - link.block_imported(&hash, number); - - if aux.clear_justification_requests { - trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash); - link.clear_justification_requests(); - } - - if aux.needs_justification { - trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); - link.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(peer) = who { - info!("Sent block with bad justification to import"); - link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); - } - } - - if aux.needs_finality_proof { - trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); - link.request_finality_proof(&hash, number); - } - }, - Err(BlockImportError::IncompleteHeader(who)) => { - if let Some(peer) = who { - info!("Peer sent block with incomplete header to import"); - link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::VerificationFailed(who, e)) => { - if let Some(peer) = who { - info!("Verification failed from peer: {}", e); - link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::BadBlock(who)) => { - if let Some(peer) = who { - info!("Bad block"); - link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); - link.restart(); - } - }, - Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - link.restart(); - }, - }; - } - link.blocks_processed(hashes, has_error); -} - /// Import several blocks at once, returning import result for each block. fn import_many_blocks>( import_handle: &dyn BlockImport, @@ -915,193 +748,3 @@ pub fn import_single_block>( import_error(import_handle.import_block(import_block, cache)) } - -#[cfg(test)] -mod tests { - use super::*; - use crate::block_import::ForkChoiceStrategy; - use libp2p::PeerId; - use test_client::runtime::{Block, Hash}; - - #[derive(Debug, PartialEq)] - enum LinkMsg { - BlockImported, - FinalityProofImported, - Disconnected, - Restarted, - } - - #[derive(Clone)] - struct TestLink { - sender: Sender, - } - - impl TestLink { - fn new(sender: Sender) -> TestLink { - TestLink { - sender, - } - } - } - - impl Link for TestLink { - fn block_imported(&self, _hash: &Hash, _number: NumberFor) { - let _ = self.sender.send(LinkMsg::BlockImported); - } - fn finality_proof_imported( - &self, - _: Origin, - _: (Hash, NumberFor), - _: Result<(Hash, NumberFor), ()>, - ) { - let _ = self.sender.send(LinkMsg::FinalityProofImported); - } - fn report_peer(&self, _: Origin, _: i32) { - let _ = self.sender.send(LinkMsg::Disconnected); - } - fn restart(&self) { - let _ = self.sender.send(LinkMsg::Restarted); - } - } - - impl Verifier for () { - fn verify( - &self, - origin: BlockOrigin, - header: B::Header, - justification: Option, - body: Option>, - ) -> Result<(ImportBlock, Option)>>), String> { - Ok((ImportBlock { - origin, - header, - body, - finalized: false, - justification, - post_digests: vec![], - auxiliary: Vec::new(), - fork_choice: ForkChoiceStrategy::LongestChain, - }, None)) - } - } - - #[test] - fn process_import_result_works() { - let (result_sender, result_port) = channel::unbounded(); - let (worker_sender, _) = channel::unbounded(); - let (link_sender, link_port) = channel::unbounded(); - let importer_sender = BlockImporter::::new(result_port, worker_sender, Arc::new(()), None, None, None); - let link = TestLink::new(link_sender); - let (ack_sender, start_ack_port) = channel::bounded(4); - let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()), ack_sender)); - - // Ensure the importer handles Start before any result messages. - let _ = start_ack_port.recv(); - - // Send a known - let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); - - // Send a second known - let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); - - // Send an unknown - let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default(), Default::default(), None)), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); - - // Send an unknown with peer and bad justification - let peer_id = PeerId::random(); - let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default(), - ImportedAux { - needs_justification: true, - clear_justification_requests: false, - bad_justification: true, - needs_finality_proof: false, - }, - Some(peer_id.clone()))), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); - assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); - - // Send an incomplete header - let results = vec![(Err(BlockImportError::IncompleteHeader(Some(peer_id.clone()))), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); - assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); - - // Send an unknown parent - let results = vec![(Err(BlockImportError::UnknownParent), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); - - // Send a verification failed - let results = vec![(Err(BlockImportError::VerificationFailed(Some(peer_id.clone()), String::new())), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); - assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); - - // Send an error - let results = vec![(Err(BlockImportError::Error), Default::default())]; - let _ = result_sender.send(BlockImportWorkerMsg::ImportedBlocks(results)).ok().unwrap(); - assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); - - // Drop the importer sender first, ensuring graceful shutdown. - drop(importer_sender); - } - - #[test] - fn process_finality_proof_import_result_works() { - let (result_sender, result_port) = channel::unbounded(); - let (worker_sender, worker_receiver) = channel::unbounded(); - let (link_sender, link_port) = channel::unbounded(); - let importer_sender = BlockImporter::::new(result_port, worker_sender, Arc::new(()), None, None, None); - let link = TestLink::new(link_sender); - let (ack_sender, start_ack_port) = channel::bounded(4); - let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()), ack_sender)); - let who = Origin::random(); - - // Ensure the importer handles Start before any result messages. - start_ack_port.recv().unwrap().unwrap(); - - // Send finality proof import request to BlockImporter - importer_sender.send(BlockImportMsg::ImportFinalityProof( - who.clone(), - Default::default(), - 1, - vec![42], - )).unwrap(); - - // Wait until this request is redirected to the BlockImportWorker - match worker_receiver.recv().unwrap() { - BlockImportWorkerMsg::ImportFinalityProof( - cwho, - chash, - 1, - cproof, - ) => { - assert_eq!(cwho, who); - assert_eq!(chash, Default::default()); - assert_eq!(cproof, vec![42]); - }, - _ => unreachable!("Unexpected work request received"), - } - - // Send ack of proof import from BlockImportWorker to BlockImporter - result_sender.send(BlockImportWorkerMsg::ImportedFinalityProof( - who.clone(), - (Default::default(), 0), - Ok((Default::default(), 0)), - )).unwrap(); - - // Wait for finality proof import result - assert_eq!(link_port.recv(), Ok(LinkMsg::FinalityProofImported)); - - // Drop the importer sender first, ensuring graceful shutdown. - drop(importer_sender); - } -} - diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index e29c4b2089..5982003c15 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -26,7 +26,7 @@ // our error-chain could potentially blow up otherwise #![recursion_limit="128"] -#[macro_use] extern crate crossbeam_channel; +extern crate crossbeam_channel; #[macro_use] extern crate log; use std::sync::Arc; diff --git a/substrate/core/finality-grandpa/src/import.rs b/substrate/core/finality-grandpa/src/import.rs index ed57c68250..227daff552 100644 --- a/substrate/core/finality-grandpa/src/import.rs +++ b/substrate/core/finality-grandpa/src/import.rs @@ -76,7 +76,7 @@ impl, RA, PRA, SC> JustificationImport { type Error = ConsensusError; - fn on_start(&self, link: &dyn consensus_common::import_queue::Link) { + fn on_start(&self, link: &mut dyn consensus_common::import_queue::Link) { let chain_info = self.inner.info().chain; // request justifications for all pending changes for which change blocks have already been imported diff --git a/substrate/core/finality-grandpa/src/light_import.rs b/substrate/core/finality-grandpa/src/light_import.rs index 4fb93ae303..25a3f84f6d 100644 --- a/substrate/core/finality-grandpa/src/light_import.rs +++ b/substrate/core/finality-grandpa/src/light_import.rs @@ -144,7 +144,7 @@ impl, RA> FinalityProofImport { type Error = ConsensusError; - fn on_start(&self, link: &dyn consensus_common::import_queue::Link) { + fn on_start(&self, link: &mut dyn consensus_common::import_queue::Link) { let chain_info = self.client.info().chain; let data = self.data.read(); @@ -572,7 +572,7 @@ pub mod tests { { type Error = ConsensusError; - fn on_start(&self, link: &dyn consensus_common::import_queue::Link) { + fn on_start(&self, link: &mut dyn consensus_common::import_queue::Link) { self.0.on_start(link) } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index a825becbe5..83ac5c9778 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -74,79 +74,6 @@ pub trait TransactionPool: Send + Sync { fn on_broadcasted(&self, propagations: HashMap>); } -/// A link implementation that connects to the network. -#[derive(Clone)] -pub struct NetworkLink> { - /// The protocol sender - pub(crate) protocol_sender: mpsc::UnboundedSender>, - /// The network sender - pub(crate) network_sender: mpsc::UnboundedSender>, -} - -impl> Link for NetworkLink { - fn block_imported(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); - } - - fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); - } - - fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); - if !success { - info!("Invalid justification provided by {} for #{}", who, hash); - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); - } - } - - fn clear_justification_requests(&self) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests); - } - - fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); - } - - fn request_finality_proof(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestFinalityProof( - hash.clone(), - number, - )); - } - - fn finality_proof_imported( - &self, - who: PeerId, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - let success = finalization_result.is_ok(); - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::FinalityProofImportResult( - request_block, - finalization_result, - )); - if !success { - info!("Invalid finality proof provided by {} for #{}", who, request_block.0); - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); - } - } - - fn report_peer(&self, who: PeerId, reputation_change: i32) { - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change)); - } - - fn restart(&self) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync); - } - - fn set_finality_proof_request_builder(&self, request_builder: SharedFinalityProofRequestBuilder) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::SetFinalityProofRequestBuilder(request_builder)); - } -} - /// A cloneable handle for reporting cost/benefits of peers. #[derive(Clone)] pub struct ReportHandle { @@ -197,13 +124,6 @@ impl, H: ExHashT> NetworkWorker let (protocol_sender, protocol_rx) = mpsc::unbounded(); let status_sinks = Arc::new(Mutex::new(Vec::new())); - // connect the import-queue to the network service. - let link = NetworkLink { - protocol_sender: protocol_sender.clone(), - network_sender: network_chan.clone(), - }; - params.import_queue.start(Box::new(link))?; - // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); @@ -592,7 +512,7 @@ impl, H: ExHashT> Future for Ne type Error = io::Error; fn poll(&mut self) -> Poll { - // Implementation of `protocol::NetworkOut` using the available local variables. + // Implementation of `protocol::NetworkOut` trait using the available local variables. struct Context<'a, B: BlockT>(&'a mut Swarm, &'a PeersetHandle); impl<'a, B: BlockT> NetworkOut for Context<'a, B> { fn report_peer(&mut self, who: PeerId, reputation: i32) { @@ -606,11 +526,74 @@ impl, H: ExHashT> Future for Ne } } + // Implementation of `import_queue::Link` trait using the available local variables. + struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { + protocol: &'a mut Protocol, + context: Context<'a, B>, + } + impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.block_imported(&hash, number) + } + fn blocks_processed(&mut self, hashes: Vec, has_error: bool) { + self.protocol.blocks_processed(&mut self.context, hashes, has_error) + } + fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { + self.protocol.justification_import_result(hash.clone(), number, success); + if !success { + info!("Invalid justification provided by {} for #{}", who, hash); + self.context.0.user_protocol_mut().disconnect_peer(&who); + self.context.1.report_peer(who, i32::min_value()); + } + } + fn clear_justification_requests(&mut self) { + self.protocol.clear_justification_requests() + } + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.request_justification(&mut self.context, hash, number) + } + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.request_finality_proof(&mut self.context, hash, number) + } + fn finality_proof_imported( + &mut self, + who: PeerId, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let success = finalization_result.is_ok(); + self.protocol.finality_proof_import_result(request_block, finalization_result); + if !success { + info!("Invalid finality proof provided by {} for #{}", who, request_block.0); + self.context.0.user_protocol_mut().disconnect_peer(&who); + self.context.1.report_peer(who, i32::min_value()); + } + } + fn report_peer(&mut self, who: PeerId, reputation_change: i32) { + self.context.1.report_peer(who, reputation_change) + } + fn restart(&mut self) { + self.protocol.restart(&mut self.context) + } + fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { + self.protocol.set_finality_proof_request_builder(builder) + } + } + while let Ok(Async::Ready(_)) = self.status_interval.poll() { let status = self.protocol.status(); self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); } + { + let mut network_service = self.network_service.lock(); + let mut link = NetworkLink { + protocol: &mut self.protocol, + context: Context(&mut network_service, &self.peerset), + }; + self.import_queue.poll_actions(&mut link); + } + while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() { let infos = self.protocol.peers_info().map(|(id, info)| { (id.clone(), ConnectedPeer { peer_info: info.clone() }) @@ -618,10 +601,14 @@ impl, H: ExHashT> Future for Ne *self.peers.write() = infos; } - match self.protocol.poll(&mut Context(&mut self.network_service.lock(), &self.peerset), &*self.transaction_pool) { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => {} - Err(err) => void::unreachable(err), + { + let mut network_service = self.network_service.lock(); + let mut ctxt = Context(&mut *network_service, &self.peerset); + match self.protocol.poll(&mut ctxt, &*self.transaction_pool) { + Ok(Async::Ready(v)) => void::unreachable(v), + Ok(Async::NotReady) => {} + Err(err) => void::unreachable(err), + } } // Check for new incoming on-demand requests. diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index f10338a62d..b5a03ae23a 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -77,8 +77,7 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None); - queue.start(Box::new(TestLink{})).unwrap(); + let mut queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None); drop(queue); } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 216944acc2..95646cd74b 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -41,6 +41,7 @@ use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as Cac use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; use futures::{prelude::*, sync::{mpsc, oneshot}}; +use log::info; use crate::message::Message; use libp2p::PeerId; use parking_lot::{Mutex, RwLock}; @@ -49,7 +50,7 @@ use crate::protocol::{Context, Protocol, ProtocolConfig, ProtocolStatus, CustomM use runtime_primitives::generic::{BlockId, OpaqueDigestItemId}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; -use crate::service::{NetworkLink, NetworkMsg, ProtocolMsg, TransactionPool}; +use crate::service::{NetworkMsg, ProtocolMsg, TransactionPool}; use crate::specialization::NetworkSpecialization; use test_client::{self, AccountKeyring}; @@ -97,6 +98,79 @@ pub struct NoopLink { } impl Link for NoopLink { } +/// A link implementation that connects to the network. +#[derive(Clone)] +pub struct NetworkLink> { + /// The protocol sender + pub(crate) protocol_sender: mpsc::UnboundedSender>, + /// The network sender + pub(crate) network_sender: mpsc::UnboundedSender>, +} + +impl> Link for NetworkLink { + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); + } + + fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); + } + + fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); + if !success { + info!("Invalid justification provided by {} for #{}", who, hash); + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); + } + } + + fn clear_justification_requests(&mut self) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); + } + + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestFinalityProof( + hash.clone(), + number, + )); + } + + fn finality_proof_imported( + &mut self, + who: PeerId, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let success = finalization_result.is_ok(); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::FinalityProofImportResult( + request_block, + finalization_result, + )); + if !success { + info!("Invalid finality proof provided by {} for #{}", who, request_block.0); + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); + } + } + + fn report_peer(&mut self, who: PeerId, reputation_change: i32) { + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change)); + } + + fn restart(&mut self) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync); + } + + fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::SetFinalityProofRequestBuilder(request_builder)); + } +} + /// The test specialization. #[derive(Clone)] pub struct DummySpecialization; @@ -232,24 +306,24 @@ impl> TestLink { } impl> Link for TestLink { - fn block_imported(&self, hash: &Hash, number: NumberFor) { + fn block_imported(&mut self, hash: &Hash, number: NumberFor) { self.link.block_imported(hash, number); } - fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { + fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { self.link.blocks_processed(processed_blocks, has_error); } - fn justification_imported(&self, who: PeerId, hash: &Hash, number:NumberFor, success: bool) { + fn justification_imported(&mut self, who: PeerId, hash: &Hash, number:NumberFor, success: bool) { self.link.justification_imported(who, hash, number, success); } - fn request_justification(&self, hash: &Hash, number: NumberFor) { + fn request_justification(&mut self, hash: &Hash, number: NumberFor) { self.link.request_justification(hash, number); } fn finality_proof_imported( - &self, + &mut self, who: PeerId, request_block: (Hash, NumberFor), finalization_result: Result<(Hash, NumberFor), ()>, @@ -257,19 +331,19 @@ impl> Link for TestLink { self.link.finality_proof_imported(who, request_block, finalization_result); } - fn request_finality_proof(&self, hash: &Hash, number: NumberFor) { + fn request_finality_proof(&mut self, hash: &Hash, number: NumberFor) { self.link.request_finality_proof(hash, number); } - fn set_finality_proof_request_builder(&self, request_builder: SharedFinalityProofRequestBuilder) { + fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { self.link.set_finality_proof_request_builder(request_builder); } - fn report_peer(&self, who: PeerId, reputation_change: i32) { + fn report_peer(&mut self, who: PeerId, reputation_change: i32) { self.link.report_peer(who, reputation_change); } - fn restart(&self) { + fn restart(&mut self) { self.link.restart(); } @@ -278,7 +352,7 @@ impl> Link for TestLink { /// The caller should wait for the `Link::synchronized` call to ensure that it has synchronized /// with `ImportQueue`. #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&self) { + fn synchronized(&mut self) { drop(self.network_to_protocol_sender.unbounded_send(FromNetworkMsg::Synchronize)) } } @@ -292,7 +366,7 @@ pub struct Peer> { /// we allow it to be unused. #[cfg_attr(not(test), allow(unused))] protocol_status: Arc>>, - import_queue: Box>, + import_queue: Arc>>>, pub data: D, best_hash: Mutex>, finalized_hash: Mutex>, @@ -437,7 +511,7 @@ impl> Peer { fn new( protocol_status: Arc>>, client: PeersClient, - import_queue: Box>, + import_queue: Arc>>>, use_tokio: bool, network_to_protocol_sender: mpsc::UnboundedSender>, protocol_sender: mpsc::UnboundedSender>, @@ -451,12 +525,6 @@ impl> Peer { protocol_sender.clone(), network_port, ); - let network_link = TestLink::new( - protocol_sender.clone(), - network_to_protocol_sender.clone(), - network_sender.clone(), - ); - import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts"); Peer { protocol_status, peer_id: PeerId::random(), @@ -535,7 +603,7 @@ impl> Peer { /// Synchronize with import queue. #[cfg(any(test, feature = "test-helpers"))] pub fn import_queue_sync(&self) { - self.import_queue.synchronize(); + self.import_queue.lock().synchronize(); let _ = self.net_proto_channel.wait_sync(); } @@ -663,7 +731,7 @@ impl> Peer { ); let header = block.header.clone(); at = hash; - self.import_queue.import_blocks( + self.import_queue.lock().import_blocks( origin, vec![IncomingBlock { origin: None, @@ -806,10 +874,12 @@ pub trait TestNetFactory: Sized { fn add_peer( &mut self, protocol_status: Arc>>, - import_queue: Box>, + import_queue: Arc>>>, tx_pool: EmptyTransactionPool, finality_proof_provider: Option>>, mut protocol: Protocol, + protocol_sender: mpsc::UnboundedSender>, + network_to_protocol_sender: mpsc::UnboundedSender>, network_sender: mpsc::UnboundedSender>, mut network_to_protocol_rx: mpsc::UnboundedReceiver>, mut protocol_rx: mpsc::UnboundedReceiver>, @@ -831,6 +901,12 @@ pub trait TestNetFactory: Sized { } tokio::runtime::current_thread::run(futures::future::poll_fn(move || { + import_queue.lock().poll_actions(&mut TestLink::new( + protocol_sender.clone(), + network_to_protocol_sender.clone(), + network_sender.clone(), + )); + while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { let outcome = match msg { Some(FromNetworkMsg::PeerConnected(peer_id)) => { @@ -858,11 +934,11 @@ pub trait TestNetFactory: Sized { match outcome { CustomMessageOutcome::BlockImport(origin, blocks) => - import_queue.import_blocks(origin, blocks), + import_queue.lock().import_blocks(origin, blocks), CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - import_queue.import_justification(origin, hash, nb, justification), + import_queue.lock().import_justification(origin, hash, nb, justification), CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => - import_queue.import_finality_proof(origin, hash, nb, proof), + import_queue.lock().import_finality_proof(origin, hash, nb, proof), CustomMessageOutcome::None => {} } } @@ -959,13 +1035,13 @@ pub trait TestNetFactory: Sized { = self.make_block_import(PeersClient::Full(client.clone())); let (network_sender, network_port) = mpsc::unbounded(); - let import_queue = Box::new(BasicQueue::new( + let import_queue = Arc::new(Mutex::new(Box::new(BasicQueue::new( verifier, block_import, justification_import, finality_proof_import, finality_proof_request_builder, - )); + )))); let specialization = self::SpecializationFactory::create(); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); @@ -985,6 +1061,8 @@ pub trait TestNetFactory: Sized { EmptyTransactionPool, self.make_finality_proof_provider(PeersClient::Full(client.clone())), protocol, + protocol_sender.clone(), + network_to_protocol_sender.clone(), network_sender.clone(), network_to_protocol_rx, protocol_rx, @@ -1013,13 +1091,13 @@ pub trait TestNetFactory: Sized { = self.make_block_import(PeersClient::Light(client.clone())); let (network_sender, network_port) = mpsc::unbounded(); - let import_queue = Box::new(BasicQueue::new( + let import_queue = Arc::new(Mutex::new(Box::new(BasicQueue::new( verifier, block_import, justification_import, finality_proof_import, finality_proof_request_builder, - )); + )))); let specialization = self::SpecializationFactory::create(); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); @@ -1039,6 +1117,8 @@ pub trait TestNetFactory: Sized { EmptyTransactionPool, self.make_finality_proof_provider(PeersClient::Light(client.clone())), protocol, + protocol_sender.clone(), + network_to_protocol_sender.clone(), network_sender.clone(), network_to_protocol_rx, protocol_rx, diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index a0d17bd7a5..6c4a03ee7b 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -17,7 +17,7 @@ //! Chain utilities. use std::{self, io::{Read, Write}}; -use futures::Future; +use futures::prelude::*; use log::{info, warn}; use runtime_primitives::generic::{SignedBlock, BlockId}; @@ -99,21 +99,20 @@ pub fn export_blocks( } struct WaitLink { - wait_send: std::sync::mpsc::Sender<()>, + imported_blocks: u64, } impl WaitLink { - fn new(wait_send: std::sync::mpsc::Sender<()>) -> WaitLink { + fn new() -> WaitLink { WaitLink { - wait_send, + imported_blocks: 0, } } } impl Link for WaitLink { - fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) { - self.wait_send.send(()) - .expect("Unable to notify main process; if the main process panicked then this thread would already be dead as well. qed."); + fn block_imported(&mut self, _hash: &B::Hash, _number: NumberFor) { + self.imported_blocks += 1; } } @@ -128,11 +127,7 @@ pub fn import_blocks( let client = new_client::(&config)?; // FIXME #1134 this shouldn't need a mutable config. let select_chain = components::FullComponents::::build_select_chain(&mut config, client.clone())?; - let queue = components::FullComponents::::build_import_queue(&mut config, client.clone(), select_chain)?; - - let (wait_send, wait_recv) = std::sync::mpsc::channel(); - let wait_link = WaitLink::new(wait_send); - queue.start(Box::new(wait_link))?; + let mut queue = components::FullComponents::::build_import_queue(&mut config, client.clone(), select_chain)?; let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { @@ -179,19 +174,23 @@ pub fn import_blocks( } } - let mut blocks_imported = 0; - while blocks_imported < count { - wait_recv.recv() - .expect("Importing thread has panicked. Then the main process will die before this can be reached. qed."); - blocks_imported += 1; - if blocks_imported % 1000 == 0 { + let mut link = WaitLink::new(); + tokio::run(futures::future::poll_fn(move || { + let blocks_before = link.imported_blocks; + queue.poll_actions(&mut link); + if link.imported_blocks / 1000 != blocks_before / 1000 { info!( "#{} blocks were imported (#{} left)", - blocks_imported, - count - blocks_imported + link.imported_blocks, + count - link.imported_blocks ); } - } + if link.imported_blocks >= count { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + })); info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number);