Improvements to the import queue (#3101)

* Remove block_imported

* Move blocks results processing to sync

* Remove methods from Link

* Better errors

* Allow cancelling the import queue

* Restore the import trace

* Fix network tests

* Line widths

* Use has_error instead

* Minor style
This commit is contained in:
Pierre Krieger
2019-07-14 14:22:32 +02:00
committed by Gavin Wood
parent 7ae6556a02
commit 5bd806bd9b
8 changed files with 212 additions and 194 deletions
@@ -113,14 +113,15 @@ pub trait ImportQueue<B: BlockT>: Send {
/// Hooks that the verification queue can use to influence the synchronization
/// algorithm.
pub trait Link<B: BlockT>: Send {
/// Block imported.
fn block_imported(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
/// Batch of blocks imported, with or without error.
fn blocks_processed(&mut self, _processed_blocks: Vec<B::Hash>, _has_error: bool) {}
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {}
/// Justification import result.
fn justification_imported(&mut self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) {}
/// Clear all pending justification requests.
fn clear_justification_requests(&mut self) {}
/// Request a justification for the given block.
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
/// Finality proof import result.
@@ -136,10 +137,6 @@ pub trait Link<B: BlockT>: Send {
) {}
/// Request a finality proof for the given block.
fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
/// Adjusts the reputation of the given peer.
fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {}
/// Restart sync.
fn restart(&mut self) {}
}
/// Block import successful result.
@@ -152,7 +149,7 @@ pub enum BlockImportResult<N: ::std::fmt::Debug + PartialEq> {
}
/// Block import error.
#[derive(Debug, PartialEq)]
#[derive(Debug)]
pub enum BlockImportError {
/// Block missed header, can't be imported
IncompleteHeader(Option<Origin>),
@@ -162,8 +159,10 @@ pub enum BlockImportError {
BadBlock(Option<Origin>),
/// Block has an unknown parent
UnknownParent,
/// Other Error.
Error,
/// Block import has been cancelled. This can happen if the parent block fails to be imported.
Cancelled,
/// Other error.
Other(ConsensusError),
}
/// Single block import function.
@@ -210,7 +209,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
},
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Error)
Err(BlockImportError::Other(e))
}
}
};
@@ -27,15 +27,6 @@ use crate::import_queue::{
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver}
};
/// Reputation change for peers which send us a block with an incomplete header.
const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20);
/// Reputation change for peers which send us a block which we fail to verify.
const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20);
/// Reputation change for peers which send us a bad block.
const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29);
/// Reputation change for peers which send us a block with bad justifications.
const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16);
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with pluggable verification.
pub struct BasicQueue<B: BlockT> {
@@ -202,88 +193,16 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
}
fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
let result_sender = &self.result_sender;
let (imported, count, results) = import_many_blocks(
&mut *self.block_import,
origin,
blocks,
self.verifier.clone(),
|| !result_sender.is_closed(),
);
trace!(target: "sync", "Imported {} of {}", imported, count);
let mut has_error = false;
let mut hashes = vec![];
for (result, hash) in results {
hashes.push(hash);
if has_error {
continue;
}
if result.is_err() {
has_error = true;
}
match result {
Ok(BlockImportResult::ImportedKnown(number)) => self.result_sender.block_imported(&hash, number),
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
self.result_sender.block_imported(&hash, number);
if aux.clear_justification_requests {
trace!(
target: "sync",
"Block imported clears all pending justification requests {}: {:?}",
number,
hash
);
self.result_sender.clear_justification_requests();
}
if aux.needs_justification {
trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash);
self.result_sender.request_justification(&hash, number);
}
if aux.bad_justification {
if let Some(peer) = who {
info!("Sent block with bad justification to import");
self.result_sender.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE);
}
}
if aux.needs_finality_proof {
trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash);
self.result_sender.request_finality_proof(&hash, number);
}
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
info!("Peer sent block with incomplete header to import");
self.result_sender.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE);
self.result_sender.restart();
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
info!("Verification failed from peer: {}", e);
self.result_sender.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE);
self.result_sender.restart();
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("Bad block");
self.result_sender.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE);
self.result_sender.restart();
}
},
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
self.result_sender.restart();
},
};
}
self.result_sender.blocks_processed(hashes, has_error);
self.result_sender.blocks_processed(imported, count, results);
}
fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
@@ -332,11 +251,15 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
}
/// Import several blocks at once, returning import result for each block.
///
/// The `keep_going` closure will be called regularly. If it returns false, then the function will
/// end prematurely.
fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut dyn BlockImport<B, Error = ConsensusError>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: Arc<V>,
keep_going: impl Fn() -> bool,
) -> (usize, usize, Vec<(
Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
B::Hash,
@@ -361,9 +284,15 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>>(
// Blocks in the response/drain should be in ascending order.
for block in blocks {
if !keep_going() {
// Setting `has_error` to true cancels the rest of the import.
has_error = true;
}
let block_number = block.header.as_ref().map(|h| h.number().clone());
let block_hash = block.hash;
let import_result = if has_error {
Err(BlockImportError::Error)
Err(BlockImportError::Cancelled)
} else {
import_single_block(
import_handle,
@@ -373,12 +302,13 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>>(
)
};
let was_ok = import_result.is_ok();
results.push((import_result, block_hash));
if was_ok {
trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash);
imported += 1;
} else {
has_error = true;
}
results.push((import_result, block_hash));
}
(imported, count, results)
@@ -27,14 +27,14 @@
//! # struct DummyLink; impl Link<Block> for DummyLink {}
//! # let mut my_link = DummyLink;
//! let (mut tx, mut rx) = buffered_link::<Block>();
//! tx.blocks_processed(vec![], false);
//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(vec![], false)`
//! tx.blocks_processed(0, 0, vec![]);
//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(0, 0, vec![])`
//! ```
//!
use futures::{prelude::*, sync::mpsc};
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use crate::import_queue::{Origin, Link};
use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
@@ -51,26 +51,32 @@ pub struct BufferedLinkSender<B: BlockT> {
tx: mpsc::UnboundedSender<BlockImportWorkerMsg<B>>,
}
impl<B: BlockT> BufferedLinkSender<B> {
/// Returns true if the sender points to nowhere.
///
/// Once `true` is returned, it is pointless to use the sender anymore.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
BlockImported(B::Hash, NumberFor<B>),
BlocksProcessed(Vec<B::Hash>, bool),
BlocksProcessed(usize, usize, Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>),
JustificationImported(Origin, B::Hash, NumberFor<B>, bool),
ClearJustificationRequests,
RequestJustification(B::Hash, NumberFor<B>),
FinalityProofImported(Origin, (B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
RequestFinalityProof(B::Hash, NumberFor<B>),
ReportPeer(Origin, i32),
Restart,
}
impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlockImported(hash.clone(), number));
}
fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(processed_blocks, has_error));
fn blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
}
fn justification_imported(
@@ -84,10 +90,6 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
let _ = self.tx.unbounded_send(msg);
}
fn clear_justification_requests(&mut self) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ClearJustificationRequests);
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestJustification(hash.clone(), number));
}
@@ -105,14 +107,6 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number));
}
fn report_peer(&mut self, who: Origin, reputation_change: i32) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ReportPeer(who, reputation_change));
}
fn restart(&mut self) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart);
}
}
/// See [`buffered_link`].
@@ -136,25 +130,30 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
};
match msg {
BlockImportWorkerMsg::BlockImported(hash, number) =>
link.block_imported(&hash, number),
BlockImportWorkerMsg::BlocksProcessed(blocks, has_error) =>
link.blocks_processed(blocks, has_error),
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::ClearJustificationRequests =>
link.clear_justification_requests(),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
BlockImportWorkerMsg::FinalityProofImported(who, block, result) =>
link.finality_proof_imported(who, block, result),
BlockImportWorkerMsg::RequestFinalityProof(hash, number) =>
link.request_finality_proof(&hash, number),
BlockImportWorkerMsg::ReportPeer(who, reput) =>
link.report_peer(who, reput),
BlockImportWorkerMsg::Restart =>
link.restart(),
}
}
}
}
#[cfg(test)]
mod tests {
use test_client::runtime::Block;
#[test]
fn is_closed() {
let (tx, rx) = super::buffered_link::<Block>();
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
}
}