diff --git a/substrate/client/network/test/src/block_import.rs b/substrate/client/network/test/src/block_import.rs index ef6ac9268f..8636cfbc21 100644 --- a/substrate/client/network/test/src/block_import.rs +++ b/substrate/client/network/test/src/block_import.rs @@ -55,7 +55,11 @@ fn import_single_good_block_works() { let mut expected_aux = ImportedAux::default(); expected_aux.is_new_best = true; - match import_single_block(&mut substrate_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) { + match import_single_block( + &mut substrate_test_runtime_client::new(), + BlockOrigin::File, block, + &mut PassThroughVerifier(true), + ) { Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org)) if *num == number && *aux == expected_aux && *org == Some(peer_id) => {} r @ _ => panic!("{:?}", r) @@ -65,7 +69,11 @@ fn import_single_good_block_works() { #[test] fn import_single_good_known_block_is_ignored() { let (mut client, _hash, number, _, block) = prepare_good_block(); - match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) { + match import_single_block( + &mut client, BlockOrigin::File, + block, + &mut PassThroughVerifier(true) + ) { Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {} _ => panic!() } @@ -75,7 +83,11 @@ fn import_single_good_known_block_is_ignored() { fn import_single_good_block_without_header_fails() { let (_, _, _, peer_id, mut block) = prepare_good_block(); block.header = None; - match import_single_block(&mut substrate_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) { + match import_single_block( + &mut substrate_test_runtime_client::new(), + BlockOrigin::File, block, + &mut PassThroughVerifier(true), + ) { Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {} _ => panic!() } diff --git a/substrate/primitives/consensus/common/src/import_queue.rs b/substrate/primitives/consensus/common/src/import_queue.rs index 85e82cc484..1a034e11d6 100644 --- a/substrate/primitives/consensus/common/src/import_queue.rs +++ b/substrate/primitives/consensus/common/src/import_queue.rs @@ -27,13 +27,17 @@ //! queues to be instantiated simply. use std::collections::HashMap; -use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}}; -use crate::error::Error as ConsensusError; -use crate::block_import::{ - BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult, - BlockCheckParams, FinalityProofImport, -}; +use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}}; + +use crate::{ + error::Error as ConsensusError, + block_import::{ + BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult, + BlockCheckParams, FinalityProofImport, + }, + metrics::Metrics, +}; pub use basic_queue::BasicQueue; mod basic_queue; @@ -186,6 +190,17 @@ pub fn import_single_block, Transaction>( block_origin: BlockOrigin, block: IncomingBlock, verifier: &mut V, +) -> Result>, BlockImportError> { + import_single_block_metered(import_handle, block_origin, block, verifier, None) +} + +/// Single block import function with metering. +pub(crate) fn import_single_block_metered, Transaction>( + import_handle: &mut dyn BlockImport, + block_origin: BlockOrigin, + block: IncomingBlock, + verifier: &mut V, + metrics: Option, ) -> Result>, BlockImportError> { let peer = block.origin; @@ -207,8 +222,8 @@ pub fn import_single_block, Transaction>( let hash = header.hash(); let parent_hash = header.parent_hash().clone(); - let import_error = |e| { - match e { + let import_handler = |import| { + match import { Ok(ImportResult::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); Ok(BlockImportResult::ImportedKnown(number)) @@ -232,7 +247,8 @@ pub fn import_single_block, Transaction>( } } }; - match import_error(import_handle.check_block(BlockCheckParams { + + match import_handler(import_handle.check_block(BlockCheckParams { hash, number, parent_hash, @@ -243,6 +259,7 @@ pub fn import_single_block, Transaction>( r => return Ok(r), // Any other successful result means that the block is already imported. } + let started = std::time::Instant::now(); let (mut import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body) .map_err(|msg| { if let Some(ref peer) = peer { @@ -250,14 +267,21 @@ pub fn import_single_block, Transaction>( } else { trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg); } + if let Some(metrics) = metrics.as_ref() { + metrics.report_verification(false, started.elapsed()); + } BlockImportError::VerificationFailed(peer.clone(), msg) })?; + if let Some(metrics) = metrics.as_ref() { + metrics.report_verification(true, started.elapsed()); + } + let mut cache = HashMap::new(); if let Some(keys) = maybe_keys { cache.extend(keys.into_iter()); } import_block.allow_missing_state = block.allow_missing_state; - import_error(import_handle.import_block(import_block.convert_transaction(), cache)) + import_handler(import_handle.import_block(import_block.convert_transaction(), cache)) } diff --git a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs index c63c73bb42..33c3da910d 100644 --- a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -22,13 +22,15 @@ use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, Num use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; use prometheus_endpoint::Registry; -use crate::block_import::BlockOrigin; -use crate::metrics::Metrics; -use crate::import_queue::{ - BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport, - BoxJustificationImport, ImportQueue, Link, Origin, - IncomingBlock, import_single_block, - buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver} +use crate::{ + block_import::BlockOrigin, + import_queue::{ + BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport, + BoxJustificationImport, ImportQueue, Link, Origin, + IncomingBlock, import_single_block_metered, + buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver}, + }, + metrics::Metrics, }; /// Interface to a basic block import queue that is importing blocks sequentially in a separate @@ -146,11 +148,6 @@ struct BlockImportWorker { _phantom: PhantomData, } -const METRIC_SUCCESS_FIELDS: [&'static str; 8] = [ - "success", "incomplete_header", "verification_failed", "bad_block", - "missing_state", "unknown_parent", "cancelled", "failed" -]; - impl BlockImportWorker { fn new>( result_sender: BufferedLinkSender, @@ -228,7 +225,7 @@ impl BlockImportWorker { // a `Future` into `importing`. let (bi, verif) = block_import_verifier.take() .expect("block_import_verifier is always Some; qed"); - importing = Some(worker.import_a_batch_of_blocks(bi, verif, origin, blocks)); + importing = Some(worker.import_batch(bi, verif, origin, blocks)); }, ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { let (_, verif) = block_import_verifier.as_mut() @@ -250,39 +247,18 @@ impl BlockImportWorker { /// /// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is /// yielded back in the output once the import is finished. - fn import_a_batch_of_blocks>( + fn import_batch>( &mut self, block_import: BoxBlockImport, verifier: V, origin: BlockOrigin, - blocks: Vec> + blocks: Vec>, ) -> impl Future, V)> { let mut result_sender = self.result_sender.clone(); let metrics = self.metrics.clone(); - import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks) + import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks, metrics) .then(move |(imported, count, results, block_import, verifier)| { - if let Some(metrics) = metrics { - let amounts = results.iter().fold([0u64; 8], |mut acc, result| { - match result.0 { - Ok(_) => acc[0] += 1, - Err(BlockImportError::IncompleteHeader(_)) => acc[1] += 1, - Err(BlockImportError::VerificationFailed(_,_)) => acc[2] += 1, - Err(BlockImportError::BadBlock(_)) => acc[3] += 1, - Err(BlockImportError::MissingState) => acc[4] += 1, - Err(BlockImportError::UnknownParent) => acc[5] += 1, - Err(BlockImportError::Cancelled) => acc[6] += 1, - Err(BlockImportError::Other(_)) => acc[7] += 1, - }; - acc - }); - for (idx, field) in METRIC_SUCCESS_FIELDS.iter().enumerate() { - let amount = amounts[idx]; - if amount > 0 { - metrics.import_queue_processed.with_label_values(&[&field]).inc_by(amount) - } - }; - } result_sender.blocks_processed(imported, count, results); future::ready((block_import, verifier)) }) @@ -352,6 +328,7 @@ fn import_many_blocks, Transaction>( blocks: Vec>, verifier: V, delay_between_blocks: Duration, + metrics: Option, ) -> impl Future< Output = ( usize, @@ -401,9 +378,9 @@ fn import_many_blocks, Transaction>( None => { // No block left to import, success! let import_handle = import_handle.take() - .expect("Future polled again after it has finished"); + .expect("Future polled again after it has finished (import handle is None)"); let verifier = verifier.take() - .expect("Future polled again after it has finished"); + .expect("Future polled again after it has finished (verifier handle is None)"); let results = mem::replace(&mut results, Vec::new()); return Poll::Ready((imported, count, results, import_handle, verifier)); }, @@ -413,9 +390,9 @@ fn import_many_blocks, Transaction>( // therefore `import_handle` and `verifier` are always `Some` here. It is illegal to poll // a `Future` again after it has ended. let import_handle = import_handle.as_mut() - .expect("Future polled again after it has finished"); + .expect("Future polled again after it has finished (import handle is None)"); let verifier = verifier.as_mut() - .expect("Future polled again after it has finished"); + .expect("Future polled again after it has finished (verifier handle is None)"); let block_number = block.header.as_ref().map(|h| h.number().clone()); let block_hash = block.hash; @@ -423,14 +400,19 @@ fn import_many_blocks, Transaction>( Err(BlockImportError::Cancelled) } else { // The actual import. - import_single_block( + import_single_block_metered( &mut **import_handle, blocks_origin.clone(), block, verifier, + metrics.clone(), ) }; + if let Some(metrics) = metrics.as_ref() { + metrics.report_import::(&import_result); + } + if import_result.is_ok() { trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash); imported += 1; diff --git a/substrate/primitives/consensus/common/src/metrics.rs b/substrate/primitives/consensus/common/src/metrics.rs index 90e01214d8..90df85a294 100644 --- a/substrate/primitives/consensus/common/src/metrics.rs +++ b/substrate/primitives/consensus/common/src/metrics.rs @@ -16,12 +16,17 @@ //! Metering tools for consensus -use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec}; +use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec, HistogramVec, HistogramOpts}; + +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +use crate::import_queue::{BlockImportResult, BlockImportError}; /// Generic Prometheus metrics for common consensus functionality. #[derive(Clone)] pub(crate) struct Metrics { pub import_queue_processed: CounterVec, + pub block_verification_time: HistogramVec, } impl Metrics { @@ -34,6 +39,42 @@ impl Metrics { )?, registry, )?, + block_verification_time: register( + HistogramVec::new( + HistogramOpts::new( + "block_verification_time", + "Histogram of time taken to import blocks", + ), + &["result"], + )?, + registry, + )?, }) } + + pub fn report_import( + &self, + result: &Result>, BlockImportError>, + ) { + let label = match result { + Ok(_) => "success", + Err(BlockImportError::IncompleteHeader(_)) => "incomplete_header", + Err(BlockImportError::VerificationFailed(_,_)) => "verification_failed", + Err(BlockImportError::BadBlock(_)) => "bad_block", + Err(BlockImportError::MissingState) => "missing_state", + Err(BlockImportError::UnknownParent) => "unknown_parent", + Err(BlockImportError::Cancelled) => "cancelled", + Err(BlockImportError::Other(_)) => "failed", + }; + + self.import_queue_processed.with_label_values( + &[label] + ).inc(); + } + + pub fn report_verification(&self, success: bool, time: std::time::Duration) { + self.block_verification_time.with_label_values( + &[if success { "success" } else { "verification_failed" }] + ).observe(time.as_secs_f64()); + } }