diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 16a4a624d1..f8041ae99b 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -113,14 +113,15 @@ pub trait ImportQueue: Send { /// Hooks that the verification queue can use to influence the synchronization /// algorithm. pub trait Link: Send { - /// Block imported. - fn block_imported(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Batch of blocks imported, with or without error. - fn blocks_processed(&mut self, _processed_blocks: Vec, _has_error: bool) {} + fn blocks_processed( + &mut self, + _imported: usize, + _count: usize, + _results: Vec<(Result>, BlockImportError>, B::Hash)> + ) {} /// Justification import result. fn justification_imported(&mut self, _who: Origin, _hash: &B::Hash, _number: NumberFor, _success: bool) {} - /// Clear all pending justification requests. - fn clear_justification_requests(&mut self) {} /// Request a justification for the given block. fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Finality proof import result. @@ -136,10 +137,6 @@ pub trait Link: Send { ) {} /// Request a finality proof for the given block. fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor) {} - /// Adjusts the reputation of the given peer. - fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {} - /// Restart sync. - fn restart(&mut self) {} } /// Block import successful result. @@ -152,7 +149,7 @@ pub enum BlockImportResult { } /// Block import error. -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum BlockImportError { /// Block missed header, can't be imported IncompleteHeader(Option), @@ -162,8 +159,10 @@ pub enum BlockImportError { BadBlock(Option), /// Block has an unknown parent UnknownParent, - /// Other Error. - Error, + /// Block import has been cancelled. This can happen if the parent block fails to be imported. + Cancelled, + /// Other error. + Other(ConsensusError), } /// Single block import function. @@ -210,7 +209,7 @@ pub fn import_single_block>( }, Err(e) => { debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - Err(BlockImportError::Error) + Err(BlockImportError::Other(e)) } } }; diff --git a/substrate/core/consensus/common/src/import_queue/basic_queue.rs b/substrate/core/consensus/common/src/import_queue/basic_queue.rs index d8dcc1fa13..51d30cddbb 100644 --- a/substrate/core/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/core/consensus/common/src/import_queue/basic_queue.rs @@ -27,15 +27,6 @@ use crate::import_queue::{ buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver} }; -/// Reputation change for peers which send us a block with an incomplete header. -const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20); -/// Reputation change for peers which send us a block which we fail to verify. -const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20); -/// Reputation change for peers which send us a bad block. -const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29); -/// Reputation change for peers which send us a block with bad justifications. -const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16); - /// Interface to a basic block import queue that is importing blocks sequentially in a separate /// task, with pluggable verification. pub struct BasicQueue { @@ -202,88 +193,16 @@ impl> BlockImportWorker { } fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + let result_sender = &self.result_sender; let (imported, count, results) = import_many_blocks( &mut *self.block_import, origin, blocks, self.verifier.clone(), + || !result_sender.is_closed(), ); - trace!(target: "sync", "Imported {} of {}", imported, count); - - let mut has_error = false; - let mut hashes = vec![]; - for (result, hash) in results { - hashes.push(hash); - - if has_error { - continue; - } - - if result.is_err() { - has_error = true; - } - - match result { - Ok(BlockImportResult::ImportedKnown(number)) => self.result_sender.block_imported(&hash, number), - Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { - self.result_sender.block_imported(&hash, number); - - if aux.clear_justification_requests { - trace!( - target: "sync", - "Block imported clears all pending justification requests {}: {:?}", - number, - hash - ); - self.result_sender.clear_justification_requests(); - } - - if aux.needs_justification { - trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); - self.result_sender.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(peer) = who { - info!("Sent block with bad justification to import"); - self.result_sender.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); - } - } - - if aux.needs_finality_proof { - trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); - self.result_sender.request_finality_proof(&hash, number); - } - }, - Err(BlockImportError::IncompleteHeader(who)) => { - if let Some(peer) = who { - info!("Peer sent block with incomplete header to import"); - self.result_sender.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::VerificationFailed(who, e)) => { - if let Some(peer) = who { - info!("Verification failed from peer: {}", e); - self.result_sender.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::BadBlock(who)) => { - if let Some(peer) = who { - info!("Bad block"); - self.result_sender.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); - self.result_sender.restart(); - } - }, - Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - self.result_sender.restart(); - }, - }; - } - - self.result_sender.blocks_processed(hashes, has_error); + self.result_sender.blocks_processed(imported, count, results); } fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { @@ -332,11 +251,15 @@ impl> BlockImportWorker { } /// Import several blocks at once, returning import result for each block. +/// +/// The `keep_going` closure will be called regularly. If it returns false, then the function will +/// end prematurely. fn import_many_blocks>( import_handle: &mut dyn BlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: Arc, + keep_going: impl Fn() -> bool, ) -> (usize, usize, Vec<( Result>, BlockImportError>, B::Hash, @@ -361,9 +284,15 @@ fn import_many_blocks>( // Blocks in the response/drain should be in ascending order. for block in blocks { + if !keep_going() { + // Setting `has_error` to true cancels the rest of the import. + has_error = true; + } + + let block_number = block.header.as_ref().map(|h| h.number().clone()); let block_hash = block.hash; let import_result = if has_error { - Err(BlockImportError::Error) + Err(BlockImportError::Cancelled) } else { import_single_block( import_handle, @@ -373,12 +302,13 @@ fn import_many_blocks>( ) }; let was_ok = import_result.is_ok(); - results.push((import_result, block_hash)); if was_ok { + trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash); imported += 1; } else { has_error = true; } + results.push((import_result, block_hash)); } (imported, count, results) diff --git a/substrate/core/consensus/common/src/import_queue/buffered_link.rs b/substrate/core/consensus/common/src/import_queue/buffered_link.rs index d757b19c57..9c555ba9d9 100644 --- a/substrate/core/consensus/common/src/import_queue/buffered_link.rs +++ b/substrate/core/consensus/common/src/import_queue/buffered_link.rs @@ -27,14 +27,14 @@ //! # struct DummyLink; impl Link for DummyLink {} //! # let mut my_link = DummyLink; //! let (mut tx, mut rx) = buffered_link::(); -//! tx.blocks_processed(vec![], false); -//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(vec![], false)` +//! tx.blocks_processed(0, 0, vec![]); +//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(0, 0, vec![])` //! ``` //! use futures::{prelude::*, sync::mpsc}; use runtime_primitives::traits::{Block as BlockT, NumberFor}; -use crate::import_queue::{Origin, Link}; +use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError}; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer @@ -51,26 +51,32 @@ pub struct BufferedLinkSender { tx: mpsc::UnboundedSender>, } +impl BufferedLinkSender { + /// Returns true if the sender points to nowhere. + /// + /// Once `true` is returned, it is pointless to use the sender anymore. + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } +} + /// Internal buffered message. enum BlockImportWorkerMsg { - BlockImported(B::Hash, NumberFor), - BlocksProcessed(Vec, bool), + BlocksProcessed(usize, usize, Vec<(Result>, BlockImportError>, B::Hash)>), JustificationImported(Origin, B::Hash, NumberFor, bool), - ClearJustificationRequests, RequestJustification(B::Hash, NumberFor), FinalityProofImported(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), RequestFinalityProof(B::Hash, NumberFor), - ReportPeer(Origin, i32), - Restart, } impl Link for BufferedLinkSender { - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlockImported(hash.clone(), number)); - } - - fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(processed_blocks, has_error)); + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results)); } fn justification_imported( @@ -84,10 +90,6 @@ impl Link for BufferedLinkSender { let _ = self.tx.unbounded_send(msg); } - fn clear_justification_requests(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ClearJustificationRequests); - } - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestJustification(hash.clone(), number)); } @@ -105,14 +107,6 @@ impl Link for BufferedLinkSender { fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number)); } - - fn report_peer(&mut self, who: Origin, reputation_change: i32) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::ReportPeer(who, reputation_change)); - } - - fn restart(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart); - } } /// See [`buffered_link`]. @@ -136,25 +130,30 @@ impl BufferedLinkReceiver { }; match msg { - BlockImportWorkerMsg::BlockImported(hash, number) => - link.block_imported(&hash, number), - BlockImportWorkerMsg::BlocksProcessed(blocks, has_error) => - link.blocks_processed(blocks, has_error), + BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => + link.blocks_processed(imported, count, results), BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => link.justification_imported(who, &hash, number, success), - BlockImportWorkerMsg::ClearJustificationRequests => - link.clear_justification_requests(), BlockImportWorkerMsg::RequestJustification(hash, number) => link.request_justification(&hash, number), BlockImportWorkerMsg::FinalityProofImported(who, block, result) => link.finality_proof_imported(who, block, result), BlockImportWorkerMsg::RequestFinalityProof(hash, number) => link.request_finality_proof(&hash, number), - BlockImportWorkerMsg::ReportPeer(who, reput) => - link.report_peer(who, reput), - BlockImportWorkerMsg::Restart => - link.restart(), } } } } + +#[cfg(test)] +mod tests { + use test_client::runtime::Block; + + #[test] + fn is_closed() { + let (tx, rx) = super::buffered_link::(); + assert!(!tx.is_closed()); + drop(rx); + assert!(tx.is_closed()); + } +} diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 9e876e7285..50b6ad2741 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -28,6 +28,7 @@ use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub, SaturatedConversion }; +use consensus::import_queue::{BlockImportResult, BlockImportError}; use message::{BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; use event::Event; @@ -1194,22 +1195,23 @@ impl, H: ExHashT> Protocol { self.sync.request_justification(&hash, number) } - /// Clears all pending justification requests. - pub fn clear_justification_requests(&mut self) { - self.sync.clear_justification_requests() - } - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the import queue, with or without + /// Call this when a batch of blocks have been processed by the importqueue, with or without /// errors. - pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - self.sync.on_blocks_processed(processed_blocks, has_error); - } - - /// Restart the sync process. - pub fn restart(&mut self) { + pub fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { let peers = self.context_data.peers.clone(); - for result in self.sync.restart(|peer_id| peers.get(peer_id).map(|i| i.info.clone())) { + let results = self.sync.on_blocks_processed( + imported, + count, + results, + |peer_id| peers.get(peer_id).map(|i| i.info.clone()) + ); + for result in results { match result { Ok((id, req)) => { let msg = GenericMessage::BlockRequest(req); @@ -1223,11 +1225,6 @@ impl, H: ExHashT> Protocol { } } - /// Notify about successful import of the given block. - pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - trace!(target: "sync", "Block imported successfully {} ({})", number, hash) - } - /// Call this when a justification has been processed by the import queue, with or without /// errors. pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 75658a2f08..5405073a0e 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -29,7 +29,7 @@ use blocks::BlockCollection; use client::{BlockStatus, ClientInfo, error::Error as ClientError}; -use consensus::{BlockOrigin, import_queue::IncomingBlock}; +use consensus::{BlockOrigin, import_queue::{IncomingBlock, BlockImportResult, BlockImportError}}; use crate::{ config::{Roles, BoxFinalityProofRequestBuilder}, message::{self, generic::FinalityProofRequest, BlockAttributes, BlockRequest, BlockResponse, FinalityProofResponse}, @@ -80,6 +80,18 @@ const ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE: i32 = -(1 << 9); /// genesis than us. const GENESIS_MISMATCH_REPUTATION_CHANGE: i32 = i32::min_value() + 1; +/// Reputation change for peers which send us a block with an incomplete header. +const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20); + +/// Reputation change for peers which send us a block which we fail to verify. +const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20); + +/// Reputation change for peers which send us a bad block. +const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29); + +/// Reputation change for peers which send us a block with bad justifications. +const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16); + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -414,11 +426,6 @@ impl ChainSync { }) } - /// Clears all pending justification requests. - pub fn clear_justification_requests(&mut self) { - self.extra_justifications.reset() - } - /// Schedule a finality proof request for the given block. pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { let client = &self.client; @@ -690,13 +697,99 @@ impl ChainSync { /// /// Call this when a batch of blocks have been processed by the import /// queue, with or without errors. - pub fn on_blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - for hash in processed_blocks { + /// + /// `peer_info` is passed in case of a restart. + pub fn on_blocks_processed<'a>( + &'a mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + mut peer_info: impl FnMut(&PeerId) -> Option> + ) -> impl Iterator), BadPeer>> + 'a { + trace!(target: "sync", "Imported {} of {}", imported, count); + + let mut output = Vec::new(); + + let mut has_error = false; + let mut hashes = vec![]; + for (result, hash) in results { + hashes.push(hash); + + if has_error { + continue; + } + + if result.is_err() { + has_error = true; + } + + match result { + Ok(BlockImportResult::ImportedKnown(_number)) => {} + Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { + if aux.clear_justification_requests { + trace!( + target: "sync", + "Block imported clears all pending justification requests {}: {:?}", + number, + hash + ); + self.extra_justifications.reset() + } + + if aux.needs_justification { + trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + self.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(peer) = who { + info!("Sent block with bad justification to import"); + output.push(Err(BadPeer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE))); + } + } + + if aux.needs_finality_proof { + trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash); + self.request_finality_proof(&hash, number); + } + }, + Err(BlockImportError::IncompleteHeader(who)) => { + if let Some(peer) = who { + info!("Peer sent block with incomplete header to import"); + output.push(Err(BadPeer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE))); + output.extend(self.restart(&mut peer_info)); + } + }, + Err(BlockImportError::VerificationFailed(who, e)) => { + if let Some(peer) = who { + info!("Verification failed from peer: {}", e); + output.push(Err(BadPeer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE))); + output.extend(self.restart(&mut peer_info)); + } + }, + Err(BlockImportError::BadBlock(who)) => { + if let Some(peer) = who { + info!("Bad block"); + output.push(Err(BadPeer(peer, BAD_BLOCK_REPUTATION_CHANGE))); + output.extend(self.restart(&mut peer_info)); + } + }, + Err(BlockImportError::UnknownParent) | + Err(BlockImportError::Cancelled) | + Err(BlockImportError::Other(_)) => { + output.extend(self.restart(&mut peer_info)); + }, + }; + } + + for hash in hashes { self.queue_blocks.remove(&hash); } if has_error { self.best_importing_number = Zero::zero() } + + output.into_iter() } /// Call this when a justification has been processed by the import queue, @@ -894,7 +987,7 @@ impl ChainSync { } /// Restart the sync process. - pub fn restart<'a, F> + fn restart<'a, F> (&'a mut self, mut peer_info: F) -> impl Iterator), BadPeer>> + 'a where F: FnMut(&PeerId) -> Option> + 'a { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 6d523306f5..fbafa29d04 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -29,6 +29,7 @@ use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use consensus::import_queue::{ImportQueue, Link}; +use consensus::import_queue::{BlockImportResult, BlockImportError}; use futures::{prelude::*, sync::mpsc}; use log::{warn, error, info}; use libp2p::core::{swarm::NetworkBehaviour, transport::boxed::Boxed, muxing::StreamMuxerBox}; @@ -641,11 +642,13 @@ struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { } impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().block_imported(&hash, number) - } - fn blocks_processed(&mut self, hashes: Vec, has_error: bool) { - self.protocol.user_protocol_mut().blocks_processed(hashes, has_error) + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + self.protocol.user_protocol_mut().blocks_processed(imported, count, results) } fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); @@ -655,9 +658,6 @@ impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for Network self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); } } - fn clear_justification_requests(&mut self) { - self.protocol.user_protocol_mut().clear_justification_requests() - } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { self.protocol.user_protocol_mut().request_justification(hash, number) } @@ -678,10 +678,4 @@ impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for Network self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); } } - fn report_peer(&mut self, who: PeerId, reputation_change: i32) { - self.protocol.user_protocol_mut().report_peer(who, reputation_change) - } - fn restart(&mut self) { - self.protocol.user_protocol_mut().restart() - } } diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index deb20ab311..eb49dbda7a 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -45,29 +45,30 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock) #[test] fn import_single_good_block_works() { let (_, _hash, number, peer_id, block) = prepare_good_block(); - assert_eq!( - import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Ok(BlockImportResult::ImportedUnknown(number, Default::default(), Some(peer_id))) - ); + match import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))) { + Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org)) + if *num == number && *aux == Default::default() && *org == Some(peer_id) => {} + _ => panic!() + } } #[test] fn import_single_good_known_block_is_ignored() { let (mut client, _hash, number, _, block) = prepare_good_block(); - assert_eq!( - import_single_block(&mut client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Ok(BlockImportResult::ImportedKnown(number)) - ); + match import_single_block(&mut client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))) { + Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {} + _ => panic!() + } } #[test] fn import_single_good_block_without_header_fails() { let (_, _, _, peer_id, mut block) = prepare_good_block(); block.header = None; - assert_eq!( - import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Err(BlockImportError::IncompleteHeader(Some(peer_id))) - ); + match import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))) { + Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {} + _ => panic!() + } } #[test] diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index e3fe2f47cf..dd2d26f8a3 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -21,8 +21,8 @@ use futures::prelude::*; use log::{info, warn}; use runtime_primitives::generic::{SignedBlock, BlockId}; -use runtime_primitives::traits::{SaturatedConversion, Zero, One, Block, Header}; -use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link}; +use runtime_primitives::traits::{SaturatedConversion, Zero, One, Block, Header, NumberFor}; +use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link, BlockImportError, BlockImportResult}; use network::message; use consensus_common::BlockOrigin; @@ -111,10 +111,15 @@ impl WaitLink { } impl Link for WaitLink { - fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - self.imported_blocks += processed_blocks.len() as u64; - if has_error { - warn!("There was an error importing {} blocks", processed_blocks.len()); + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + self.imported_blocks += imported as u64; + if results.iter().any(|(r, _)| r.is_err()) { + warn!("There was an error importing {} blocks", count); } } }