diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index e0a576d7fa..cc090a9b91 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -26,23 +26,17 @@ //! queues to be instantiated simply. use std::{sync::Arc, collections::HashMap}; -use futures::{prelude::*, future::Executor, sync::mpsc}; -use runtime_primitives::{Justification, traits::{ - Block as BlockT, Header as HeaderT, NumberFor, -}}; -use crate::{error::Error as ConsensusError, well_known_cache_keys::Id as CacheKeyId, block_import::{ - BlockImport, BlockOrigin, ImportBlock, ImportedAux, ImportResult, JustificationImport, +use runtime_primitives::{Justification, traits::{Block as BlockT, Header as _, NumberFor}}; +use crate::{error::Error as ConsensusError, well_known_cache_keys::Id as CacheKeyId}; +use crate::block_import::{ + BlockImport, BlockOrigin, ImportBlock, ImportedAux, JustificationImport, ImportResult, FinalityProofImport, FinalityProofRequestBuilder, -}}; +}; -/// Reputation change for peers which send us a block with an incomplete header. -const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20); -/// Reputation change for peers which send us a block which we fail to verify. -const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20); -/// Reputation change for peers which send us a bad block. -const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29); -/// Reputation change for peers which send us a block with bad justifications. -const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16); +pub use basic_queue::BasicQueue; + +mod basic_queue; +pub mod buffered_link; /// Shared block import struct used by the queue. pub type SharedBlockImport = Arc + Send + Sync>; @@ -119,324 +113,6 @@ pub trait ImportQueue: Send { fn poll_actions(&mut self, link: &mut dyn Link); } -/// Interface to a basic block import queue that is importing blocks sequentially in a separate -/// task, with pluggable verification. -pub struct BasicQueue { - /// Channel to send messages to the background task. - sender: mpsc::UnboundedSender>, - /// Results coming from the worker task. - result_port: BufferedLinkReceiver, - /// Sent through the link as soon as possible. - finality_proof_request_builder: Option>, - /// Since we have to be in a tokio context in order to spawn background tasks, we first store - /// the task to spawn here, then extract it as soon as we are in a tokio context. - /// If `Some`, contains the task to spawn in the background. If `None`, the future has already - /// been spawned. - future_to_spawn: Option + Send>>, - /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in - /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from - /// `poll_actions`. - manual_poll: Option + Send>>, -} - -impl BasicQueue { - /// Instantiate a new basic queue, with given verifier. - /// - /// This creates a background task, and calls `on_start` on the justification importer and - /// finality proof importer. - pub fn new>( - verifier: Arc, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, - ) -> Self { - let (result_sender, result_port) = buffered_link(); - let (future, worker_sender) = BlockImportWorker::new( - result_sender, - verifier, - block_import, - justification_import, - finality_proof_import, - ); - - Self { - sender: worker_sender, - result_port, - finality_proof_request_builder, - future_to_spawn: Some(Box::new(future)), - manual_poll: None, - } - } - - /// Send synchronization request to the block import channel. - /// - /// The caller should wait for Link::synchronized() call to ensure that it - /// has synchronized with ImportQueue. - #[cfg(any(test, feature = "test-helpers"))] - pub fn synchronize(&self) { - let _ = self.sender.unbounded_send(ToWorkerMsg::Synchronize); - } -} - -impl ImportQueue for BasicQueue { - fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - if blocks.is_empty() { - return; - } - - trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); - } - - fn import_justification( - &mut self, - who: Origin, - hash: B::Hash, - number: NumberFor, - justification: Justification - ) { - let _ = self.sender.unbounded_send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification)); - } - - fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let _ = self.sender.unbounded_send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); - } - - fn poll_actions(&mut self, link: &mut dyn Link) { - // Try to spawn the future in `future_to_spawn`. - if let Some(future) = self.future_to_spawn.take() { - if let Err(err) = tokio_executor::DefaultExecutor::current().execute(future) { - debug_assert!(self.manual_poll.is_none()); - self.manual_poll = Some(err.into_future()); - } - } - - // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll - // manually here. - if let Some(manual_poll) = self.manual_poll.as_mut() { - match manual_poll.poll() { - Ok(Async::NotReady) => {} - _ => self.manual_poll = None, - } - } - - if let Some(fprb) = self.finality_proof_request_builder.take() { - link.set_finality_proof_request_builder(fprb); - } - - self.result_port.poll_actions(link); - } -} - -/// Message destinated to the background worker. -#[derive(Debug)] -enum ToWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportJustification(Origin, B::Hash, NumberFor, Justification), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, -} - -struct BlockImportWorker> { - result_sender: BufferedLinkSender, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - verifier: Arc, -} - -impl> BlockImportWorker { - fn new( - result_sender: BufferedLinkSender, - verifier: Arc, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - ) -> (impl Future + Send, mpsc::UnboundedSender>) { - let (sender, mut port) = mpsc::unbounded(); - - let mut worker = BlockImportWorker { - result_sender, - verifier, - justification_import, - block_import, - finality_proof_import, - }; - - if let Some(justification_import) = worker.justification_import.as_ref() { - for (hash, number) in justification_import.on_start() { - worker.result_sender.request_justification(&hash, number); - } - } - - if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { - for (hash, number) in finality_proof_import.on_start() { - worker.result_sender.request_finality_proof(&hash, number); - } - } - - let future = futures::future::poll_fn(move || { - loop { - let msg = match port.poll() { - Ok(Async::Ready(Some(msg))) => msg, - Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => return Ok(Async::NotReady), - }; - - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - worker.import_a_batch_of_blocks(origin, blocks); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - worker.import_finality_proof(who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - #[cfg(any(test, feature = "test-helpers"))] - ToWorkerMsg::Synchronize => { - trace!(target: "sync", "Sending sync message"); - worker.result_sender.synchronized(); - }, - } - } - }); - - (future, sender) - } - - fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - let (imported, count, results) = import_many_blocks( - &*self.block_import, - origin, - blocks, - self.verifier.clone(), - ); - - trace!(target: "sync", "Imported {} of {}", imported, count); - - let mut has_error = false; - let mut hashes = vec![]; - for (result, hash) in results { - hashes.push(hash); - - if has_error { - continue; - } - - if result.is_err() { - has_error = true; - } - - match result { - Ok(BlockImportResult::ImportedKnown(number)) => self.result_sender.block_imported(&hash, number), - Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { - self.result_sender.block_imported(&hash, number); - - if aux.clear_justification_requests { - trace!( - target: "sync", - "Block imported clears all pending justification requests {}: {:?}", - number, - hash - ); - self.result_sender.clear_justification_requests(); - } - - if aux.needs_justification { - trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); - self.result_sender.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(peer) = who { - info!("Sent block with bad justification to import"); - self.result_sender.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); - } - } - - if aux.needs_finality_proof { - trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); - self.result_sender.request_finality_proof(&hash, number); - } - }, - Err(BlockImportError::IncompleteHeader(who)) => { - if let Some(peer) = who { - info!("Peer sent block with incomplete header to import"); - self.result_sender.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::VerificationFailed(who, e)) => { - if let Some(peer) = who { - info!("Verification failed from peer: {}", e); - self.result_sender.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::BadBlock(who)) => { - if let Some(peer) = who { - info!("Bad block"); - self.result_sender.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - self.result_sender.restart(); - }, - }; - } - - self.result_sender.blocks_processed(hashes, has_error); - } - - fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let result = self.finality_proof_import.as_ref().map(|finality_proof_import| { - finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier) - .map_err(|e| { - debug!( - "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", - e, - hash, - number, - who, - ); - }) - }).unwrap_or(Err(())); - - trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); - self.result_sender.finality_proof_imported(who, (hash, number), result); - } - - fn import_justification( - &mut self, - who: Origin, - hash: B::Hash, - number: NumberFor, - justification: Justification - ) { - let success = self.justification_import.as_ref().map(|justification_import| { - justification_import.import_justification(hash, number, justification) - .map_err(|e| { - debug!( - target: "sync", - "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", - e, - hash, - number, - who, - ); - e - }).is_ok() - }).unwrap_or(false); - - self.result_sender.justification_imported(who, &hash, number, success); - } -} - /// Hooks that the verification queue can use to influence the synchronization /// algorithm. pub trait Link: Send { @@ -474,146 +150,6 @@ pub trait Link: Send { fn synchronized(&mut self) {} } -/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and -/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer -/// them to another link. -pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { - let (tx, rx) = mpsc::unbounded(); - let tx = BufferedLinkSender { tx }; - let rx = BufferedLinkReceiver { rx }; - (tx, rx) -} - -/// See [`buffered_link`]. -pub struct BufferedLinkSender { - tx: mpsc::UnboundedSender>, -} - -/// Internal buffered message. -enum BlockImportWorkerMsg { - BlockImported(B::Hash, NumberFor), - BlocksProcessed(Vec, bool), - JustificationImported(Origin, B::Hash, NumberFor, bool), - ClearJustificationRequests, - RequestJustification(B::Hash, NumberFor), - FinalityProofImported(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), - RequestFinalityProof(B::Hash, NumberFor), - SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), - ReportPeer(Origin, i32), - Restart, - #[cfg(any(test, feature = "test-helpers"))] - Synchronized, -} - -impl Link for BufferedLinkSender { - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlockImported(hash.clone(), number)); - } - - fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(processed_blocks, has_error)); - } - - fn justification_imported( - &mut self, - who: Origin, - hash: &B::Hash, - number: NumberFor, - success: bool - ) { - let msg = BlockImportWorkerMsg::JustificationImported(who, hash.clone(), number, success); - let _ = self.tx.unbounded_send(msg); - } - - fn clear_justification_requests(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ClearJustificationRequests); - } - - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestJustification(hash.clone(), number)); - } - - fn finality_proof_imported( - &mut self, - who: Origin, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - let msg = BlockImportWorkerMsg::FinalityProofImported(who, request_block, finalization_result); - let _ = self.tx.unbounded_send(msg); - } - - fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number)); - } - - fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::SetFinalityProofRequestBuilder(request_builder)); - } - - fn report_peer(&mut self, who: Origin, reputation_change: i32) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ReportPeer(who, reputation_change)); - } - - fn restart(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart); - } - - #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Synchronized); - } -} - -/// See [`buffered_link`]. -pub struct BufferedLinkReceiver { - rx: mpsc::UnboundedReceiver>, -} - -impl BufferedLinkReceiver { - /// Polls for the buffered link actions. Any enqueued action will be propagated to the link - /// passed as parameter. - /// - /// This method should behave in a way similar to `Future::poll`. It can register the current - /// task and notify later when more actions are ready to be polled. To continue the comparison, - /// it is as if this method always returned `Ok(Async::NotReady)`. - pub fn poll_actions(&mut self, link: &mut dyn Link) { - loop { - let msg = if let Ok(Async::Ready(Some(msg))) = self.rx.poll() { - msg - } else { - break - }; - - match msg { - BlockImportWorkerMsg::BlockImported(hash, number) => - link.block_imported(&hash, number), - BlockImportWorkerMsg::BlocksProcessed(blocks, has_error) => - link.blocks_processed(blocks, has_error), - BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => - link.justification_imported(who, &hash, number, success), - BlockImportWorkerMsg::ClearJustificationRequests => - link.clear_justification_requests(), - BlockImportWorkerMsg::RequestJustification(hash, number) => - link.request_justification(&hash, number), - BlockImportWorkerMsg::FinalityProofImported(who, block, result) => - link.finality_proof_imported(who, block, result), - BlockImportWorkerMsg::RequestFinalityProof(hash, number) => - link.request_finality_proof(&hash, number), - BlockImportWorkerMsg::SetFinalityProofRequestBuilder(builder) => - link.set_finality_proof_request_builder(builder), - BlockImportWorkerMsg::ReportPeer(who, reput) => - link.report_peer(who, reput), - BlockImportWorkerMsg::Restart => - link.restart(), - #[cfg(any(test, feature = "test-helpers"))] - BlockImportWorkerMsg::Synchronized => - link.synchronized(), - } - } - } -} - /// Block import successful result. #[derive(Debug, PartialEq)] pub enum BlockImportResult { @@ -638,59 +174,6 @@ pub enum BlockImportError { Error, } -/// Import several blocks at once, returning import result for each block. -fn import_many_blocks>( - import_handle: &dyn BlockImport, - blocks_origin: BlockOrigin, - blocks: Vec>, - verifier: Arc, -) -> (usize, usize, Vec<( - Result>, BlockImportError>, - B::Hash, -)>) { - let count = blocks.len(); - let mut imported = 0; - - let blocks_range = match ( - blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), - blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - - trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range); - - let mut results = vec![]; - - let mut has_error = false; - - // Blocks in the response/drain should be in ascending order. - for block in blocks { - let block_hash = block.hash; - let import_result = if has_error { - Err(BlockImportError::Error) - } else { - import_single_block( - import_handle, - blocks_origin.clone(), - block, - verifier.clone(), - ) - }; - let was_ok = import_result.is_ok(); - results.push((import_result, block_hash)); - if was_ok { - imported += 1; - } else { - has_error = true; - } - } - - (imported, count, results) -} - /// Single block import function. pub fn import_single_block>( import_handle: &dyn BlockImport, diff --git a/substrate/core/consensus/common/src/import_queue/basic_queue.rs b/substrate/core/consensus/common/src/import_queue/basic_queue.rs new file mode 100644 index 0000000000..74353dbbec --- /dev/null +++ b/substrate/core/consensus/common/src/import_queue/basic_queue.rs @@ -0,0 +1,408 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::sync::Arc; +use futures::{prelude::*, future::Executor, sync::mpsc}; +use runtime_primitives::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; + +use crate::error::Error as ConsensusError; +use crate::block_import::{BlockImport, BlockOrigin}; +use crate::import_queue::{ + BlockImportResult, BlockImportError, Verifier, SharedBlockImport, SharedFinalityProofImport, + SharedFinalityProofRequestBuilder, SharedJustificationImport, ImportQueue, Link, Origin, + IncomingBlock, import_single_block, + buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver} +}; + +/// Reputation change for peers which send us a block with an incomplete header. +const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20); +/// Reputation change for peers which send us a block which we fail to verify. +const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20); +/// Reputation change for peers which send us a bad block. +const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29); +/// Reputation change for peers which send us a block with bad justifications. +const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16); + +/// Interface to a basic block import queue that is importing blocks sequentially in a separate +/// task, with pluggable verification. +pub struct BasicQueue { + /// Channel to send messages to the background task. + sender: mpsc::UnboundedSender>, + /// Results coming from the worker task. + result_port: BufferedLinkReceiver, + /// Sent through the link as soon as possible. + finality_proof_request_builder: Option>, + /// Since we have to be in a tokio context in order to spawn background tasks, we first store + /// the task to spawn here, then extract it as soon as we are in a tokio context. + /// If `Some`, contains the task to spawn in the background. If `None`, the future has already + /// been spawned. + future_to_spawn: Option + Send>>, + /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in + /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from + /// `poll_actions`. + manual_poll: Option + Send>>, +} + +impl BasicQueue { + /// Instantiate a new basic queue, with given verifier. + /// + /// This creates a background task, and calls `on_start` on the justification importer and + /// finality proof importer. + pub fn new>( + verifier: Arc, + block_import: SharedBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + finality_proof_request_builder: Option>, + ) -> Self { + let (result_sender, result_port) = buffered_link::buffered_link(); + let (future, worker_sender) = BlockImportWorker::new( + result_sender, + verifier, + block_import, + justification_import, + finality_proof_import, + ); + + Self { + sender: worker_sender, + result_port, + finality_proof_request_builder, + future_to_spawn: Some(Box::new(future)), + manual_poll: None, + } + } + + /// Send synchronization request to the block import channel. + /// + /// The caller should wait for Link::synchronized() call to ensure that it + /// has synchronized with ImportQueue. + #[cfg(any(test, feature = "test-helpers"))] + pub fn synchronize(&self) { + let _ = self.sender.unbounded_send(ToWorkerMsg::Synchronize); + } +} + +impl ImportQueue for BasicQueue { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + if blocks.is_empty() { + return; + } + + trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + } + + fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ) { + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportJustification(who.clone(), hash, number, justification)); + } + + fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); + let _ = self.sender.unbounded_send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); + } + + fn poll_actions(&mut self, link: &mut dyn Link) { + // Try to spawn the future in `future_to_spawn`. + if let Some(future) = self.future_to_spawn.take() { + if let Err(err) = tokio_executor::DefaultExecutor::current().execute(future) { + debug_assert!(self.manual_poll.is_none()); + self.manual_poll = Some(err.into_future()); + } + } + + // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll + // manually here. + if let Some(manual_poll) = self.manual_poll.as_mut() { + match manual_poll.poll() { + Ok(Async::NotReady) => {} + _ => self.manual_poll = None, + } + } + + if let Some(fprb) = self.finality_proof_request_builder.take() { + link.set_finality_proof_request_builder(fprb); + } + + self.result_port.poll_actions(link); + } +} + +/// Message destinated to the background worker. +#[derive(Debug)] +enum ToWorkerMsg { + ImportBlocks(BlockOrigin, Vec>), + ImportJustification(Origin, B::Hash, NumberFor, Justification), + ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, +} + +struct BlockImportWorker> { + result_sender: BufferedLinkSender, + block_import: SharedBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + verifier: Arc, +} + +impl> BlockImportWorker { + fn new( + result_sender: BufferedLinkSender, + verifier: Arc, + block_import: SharedBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + ) -> (impl Future + Send, mpsc::UnboundedSender>) { + let (sender, mut port) = mpsc::unbounded(); + + let mut worker = BlockImportWorker { + result_sender, + verifier, + justification_import, + block_import, + finality_proof_import, + }; + + if let Some(justification_import) = worker.justification_import.as_ref() { + for (hash, number) in justification_import.on_start() { + worker.result_sender.request_justification(&hash, number); + } + } + + if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { + for (hash, number) in finality_proof_import.on_start() { + worker.result_sender.request_finality_proof(&hash, number); + } + } + + let future = futures::future::poll_fn(move || { + loop { + let msg = match port.poll() { + Ok(Async::Ready(Some(msg))) => msg, + Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => return Ok(Async::NotReady), + }; + + match msg { + ToWorkerMsg::ImportBlocks(origin, blocks) => { + worker.import_a_batch_of_blocks(origin, blocks); + }, + ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { + worker.import_finality_proof(who, hash, number, proof); + }, + ToWorkerMsg::ImportJustification(who, hash, number, justification) => { + worker.import_justification(who, hash, number, justification); + } + #[cfg(any(test, feature = "test-helpers"))] + ToWorkerMsg::Synchronize => { + trace!(target: "sync", "Sending sync message"); + worker.result_sender.synchronized(); + }, + } + } + }); + + (future, sender) + } + + fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + let (imported, count, results) = import_many_blocks( + &*self.block_import, + origin, + blocks, + self.verifier.clone(), + ); + + trace!(target: "sync", "Imported {} of {}", imported, count); + + let mut has_error = false; + let mut hashes = vec![]; + for (result, hash) in results { + hashes.push(hash); + + if has_error { + continue; + } + + if result.is_err() { + has_error = true; + } + + match result { + Ok(BlockImportResult::ImportedKnown(number)) => self.result_sender.block_imported(&hash, number), + Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { + self.result_sender.block_imported(&hash, number); + + if aux.clear_justification_requests { + trace!( + target: "sync", + "Block imported clears all pending justification requests {}: {:?}", + number, + hash + ); + self.result_sender.clear_justification_requests(); + } + + if aux.needs_justification { + trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + self.result_sender.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(peer) = who { + info!("Sent block with bad justification to import"); + self.result_sender.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); + } + } + + if aux.needs_finality_proof { + trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); + self.result_sender.request_finality_proof(&hash, number); + } + }, + Err(BlockImportError::IncompleteHeader(who)) => { + if let Some(peer) = who { + info!("Peer sent block with incomplete header to import"); + self.result_sender.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::VerificationFailed(who, e)) => { + if let Some(peer) = who { + info!("Verification failed from peer: {}", e); + self.result_sender.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::BadBlock(who)) => { + if let Some(peer) = who { + info!("Bad block"); + self.result_sender.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); + self.result_sender.restart(); + } + }, + Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { + self.result_sender.restart(); + }, + }; + } + + self.result_sender.blocks_processed(hashes, has_error); + } + + fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { + let result = self.finality_proof_import.as_ref().map(|finality_proof_import| { + finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier) + .map_err(|e| { + debug!( + "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + }) + }).unwrap_or(Err(())); + + trace!(target: "sync", "Imported finality proof for {}/{}", number, hash); + self.result_sender.finality_proof_imported(who, (hash, number), result); + } + + fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ) { + let success = self.justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification) + .map_err(|e| { + debug!( + target: "sync", + "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", + e, + hash, + number, + who, + ); + e + }).is_ok() + }).unwrap_or(false); + + self.result_sender.justification_imported(who, &hash, number, success); + } +} + +/// Import several blocks at once, returning import result for each block. +fn import_many_blocks>( + import_handle: &dyn BlockImport, + blocks_origin: BlockOrigin, + blocks: Vec>, + verifier: Arc, +) -> (usize, usize, Vec<( + Result>, BlockImportError>, + B::Hash, +)>) { + let count = blocks.len(); + let mut imported = 0; + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range); + + let mut results = vec![]; + + let mut has_error = false; + + // Blocks in the response/drain should be in ascending order. + for block in blocks { + let block_hash = block.hash; + let import_result = if has_error { + Err(BlockImportError::Error) + } else { + import_single_block( + import_handle, + blocks_origin.clone(), + block, + verifier.clone(), + ) + }; + let was_ok = import_result.is_ok(); + results.push((import_result, block_hash)); + if was_ok { + imported += 1; + } else { + has_error = true; + } + } + + (imported, count, results) +} diff --git a/substrate/core/consensus/common/src/import_queue/buffered_link.rs b/substrate/core/consensus/common/src/import_queue/buffered_link.rs new file mode 100644 index 0000000000..f73b439a66 --- /dev/null +++ b/substrate/core/consensus/common/src/import_queue/buffered_link.rs @@ -0,0 +1,177 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Provides the `buffered_link` utility. +//! +//! The buffered link is a channel that allows buffering the method calls on `Link`. +//! +//! # Example +//! +//! ```no_run +//! use substrate_consensus_common::import_queue::Link; +//! # use substrate_consensus_common::import_queue::buffered_link::buffered_link; +//! # use test_client::runtime::Block; +//! # struct DummyLink; impl Link for DummyLink {} +//! # let mut my_link = DummyLink; +//! let (mut tx, mut rx) = buffered_link::(); +//! tx.blocks_processed(vec![], false); +//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(vec![], false)` +//! ``` +//! + +use futures::{prelude::*, sync::mpsc}; +use runtime_primitives::traits::{Block as BlockT, NumberFor}; +use crate::import_queue::{Origin, Link, SharedFinalityProofRequestBuilder}; + +/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and +/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer +/// them to another link. +pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { + let (tx, rx) = mpsc::unbounded(); + let tx = BufferedLinkSender { tx }; + let rx = BufferedLinkReceiver { rx }; + (tx, rx) +} + +/// See [`buffered_link`]. +pub struct BufferedLinkSender { + tx: mpsc::UnboundedSender>, +} + +/// Internal buffered message. +enum BlockImportWorkerMsg { + BlockImported(B::Hash, NumberFor), + BlocksProcessed(Vec, bool), + JustificationImported(Origin, B::Hash, NumberFor, bool), + ClearJustificationRequests, + RequestJustification(B::Hash, NumberFor), + FinalityProofImported(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), + RequestFinalityProof(B::Hash, NumberFor), + SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), + ReportPeer(Origin, i32), + Restart, + #[cfg(any(test, feature = "test-helpers"))] + Synchronized, +} + +impl Link for BufferedLinkSender { + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlockImported(hash.clone(), number)); + } + + fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(processed_blocks, has_error)); + } + + fn justification_imported( + &mut self, + who: Origin, + hash: &B::Hash, + number: NumberFor, + success: bool + ) { + let msg = BlockImportWorkerMsg::JustificationImported(who, hash.clone(), number, success); + let _ = self.tx.unbounded_send(msg); + } + + fn clear_justification_requests(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ClearJustificationRequests); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestJustification(hash.clone(), number)); + } + + fn finality_proof_imported( + &mut self, + who: Origin, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let msg = BlockImportWorkerMsg::FinalityProofImported(who, request_block, finalization_result); + let _ = self.tx.unbounded_send(msg); + } + + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number)); + } + + fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::SetFinalityProofRequestBuilder(request_builder)); + } + + fn report_peer(&mut self, who: Origin, reputation_change: i32) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ReportPeer(who, reputation_change)); + } + + fn restart(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart); + } + + #[cfg(any(test, feature = "test-helpers"))] + fn synchronized(&mut self) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Synchronized); + } +} + +/// See [`buffered_link`]. +pub struct BufferedLinkReceiver { + rx: mpsc::UnboundedReceiver>, +} + +impl BufferedLinkReceiver { + /// Polls for the buffered link actions. Any enqueued action will be propagated to the link + /// passed as parameter. + /// + /// This method should behave in a way similar to `Future::poll`. It can register the current + /// task and notify later when more actions are ready to be polled. To continue the comparison, + /// it is as if this method always returned `Ok(Async::NotReady)`. + pub fn poll_actions(&mut self, link: &mut dyn Link) { + loop { + let msg = if let Ok(Async::Ready(Some(msg))) = self.rx.poll() { + msg + } else { + break + }; + + match msg { + BlockImportWorkerMsg::BlockImported(hash, number) => + link.block_imported(&hash, number), + BlockImportWorkerMsg::BlocksProcessed(blocks, has_error) => + link.blocks_processed(blocks, has_error), + BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => + link.justification_imported(who, &hash, number, success), + BlockImportWorkerMsg::ClearJustificationRequests => + link.clear_justification_requests(), + BlockImportWorkerMsg::RequestJustification(hash, number) => + link.request_justification(&hash, number), + BlockImportWorkerMsg::FinalityProofImported(who, block, result) => + link.finality_proof_imported(who, block, result), + BlockImportWorkerMsg::RequestFinalityProof(hash, number) => + link.request_finality_proof(&hash, number), + BlockImportWorkerMsg::SetFinalityProofRequestBuilder(builder) => + link.set_finality_proof_request_builder(builder), + BlockImportWorkerMsg::ReportPeer(who, reput) => + link.report_peer(who, reput), + BlockImportWorkerMsg::Restart => + link.restart(), + #[cfg(any(test, feature = "test-helpers"))] + BlockImportWorkerMsg::Synchronized => + link.synchronized(), + } + } + } +}