Update import metrics and add verification time (#6170)

* refactor import reporting and add time

* Update primitives/consensus/common/src/metrics.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* remove (crate)

* fix longer lines

* swap names to avoid api breaking

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Nikolay Volf
2020-06-01 12:08:59 +03:00
committed by GitHub
parent 9cac359f44
commit 5759cee2e9
4 changed files with 115 additions and 56 deletions
@@ -27,13 +27,17 @@
//! queues to be instantiated simply.
use std::collections::HashMap;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}};
use crate::error::Error as ConsensusError;
use crate::block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams, FinalityProofImport,
};
use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}};
use crate::{
error::Error as ConsensusError,
block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams, FinalityProofImport,
},
metrics::Metrics,
};
pub use basic_queue::BasicQueue;
mod basic_queue;
@@ -186,6 +190,17 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
import_single_block_metered(import_handle, block_origin, block, verifier, None)
}
/// Single block import function with metering.
pub(crate) fn import_single_block_metered<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: &mut dyn BlockImport<B, Transaction = Transaction, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
metrics: Option<Metrics>,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
let peer = block.origin;
@@ -207,8 +222,8 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
let hash = header.hash();
let parent_hash = header.parent_hash().clone();
let import_error = |e| {
match e {
let import_handler = |import| {
match import {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(number))
@@ -232,7 +247,8 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
}
}
};
match import_error(import_handle.check_block(BlockCheckParams {
match import_handler(import_handle.check_block(BlockCheckParams {
hash,
number,
parent_hash,
@@ -243,6 +259,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
r => return Ok(r), // Any other successful result means that the block is already imported.
}
let started = std::time::Instant::now();
let (mut import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body)
.map_err(|msg| {
if let Some(ref peer) = peer {
@@ -250,14 +267,21 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
} else {
trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg);
}
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(false, started.elapsed());
}
BlockImportError::VerificationFailed(peer.clone(), msg)
})?;
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(true, started.elapsed());
}
let mut cache = HashMap::new();
if let Some(keys) = maybe_keys {
cache.extend(keys.into_iter());
}
import_block.allow_missing_state = block.allow_missing_state;
import_error(import_handle.import_block(import_block.convert_transaction(), cache))
import_handler(import_handle.import_block(import_block.convert_transaction(), cache))
}
@@ -22,13 +22,15 @@ use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, Num
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use prometheus_endpoint::Registry;
use crate::block_import::BlockOrigin;
use crate::metrics::Metrics;
use crate::import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
IncomingBlock, import_single_block,
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver}
use crate::{
block_import::BlockOrigin,
import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
IncomingBlock, import_single_block_metered,
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver},
},
metrics::Metrics,
};
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
@@ -146,11 +148,6 @@ struct BlockImportWorker<B: BlockT, Transaction> {
_phantom: PhantomData<Transaction>,
}
const METRIC_SUCCESS_FIELDS: [&'static str; 8] = [
"success", "incomplete_header", "verification_failed", "bad_block",
"missing_state", "unknown_parent", "cancelled", "failed"
];
impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
fn new<V: 'static + Verifier<B>>(
result_sender: BufferedLinkSender<B>,
@@ -228,7 +225,7 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
// a `Future` into `importing`.
let (bi, verif) = block_import_verifier.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_a_batch_of_blocks(bi, verif, origin, blocks));
importing = Some(worker.import_batch(bi, verif, origin, blocks));
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
let (_, verif) = block_import_verifier.as_mut()
@@ -250,39 +247,18 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
///
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is
/// yielded back in the output once the import is finished.
fn import_a_batch_of_blocks<V: 'static + Verifier<B>>(
fn import_batch<V: 'static + Verifier<B>>(
&mut self,
block_import: BoxBlockImport<B, Transaction>,
verifier: V,
origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>
blocks: Vec<IncomingBlock<B>>,
) -> impl Future<Output = (BoxBlockImport<B, Transaction>, V)> {
let mut result_sender = self.result_sender.clone();
let metrics = self.metrics.clone();
import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks)
import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks, metrics)
.then(move |(imported, count, results, block_import, verifier)| {
if let Some(metrics) = metrics {
let amounts = results.iter().fold([0u64; 8], |mut acc, result| {
match result.0 {
Ok(_) => acc[0] += 1,
Err(BlockImportError::IncompleteHeader(_)) => acc[1] += 1,
Err(BlockImportError::VerificationFailed(_,_)) => acc[2] += 1,
Err(BlockImportError::BadBlock(_)) => acc[3] += 1,
Err(BlockImportError::MissingState) => acc[4] += 1,
Err(BlockImportError::UnknownParent) => acc[5] += 1,
Err(BlockImportError::Cancelled) => acc[6] += 1,
Err(BlockImportError::Other(_)) => acc[7] += 1,
};
acc
});
for (idx, field) in METRIC_SUCCESS_FIELDS.iter().enumerate() {
let amount = amounts[idx];
if amount > 0 {
metrics.import_queue_processed.with_label_values(&[&field]).inc_by(amount)
}
};
}
result_sender.blocks_processed(imported, count, results);
future::ready((block_import, verifier))
})
@@ -352,6 +328,7 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
blocks: Vec<IncomingBlock<B>>,
verifier: V,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
) -> impl Future<
Output = (
usize,
@@ -401,9 +378,9 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
None => {
// No block left to import, success!
let import_handle = import_handle.take()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.take()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (verifier handle is None)");
let results = mem::replace(&mut results, Vec::new());
return Poll::Ready((imported, count, results, import_handle, verifier));
},
@@ -413,9 +390,9 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
// therefore `import_handle` and `verifier` are always `Some` here. It is illegal to poll
// a `Future` again after it has ended.
let import_handle = import_handle.as_mut()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.as_mut()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (verifier handle is None)");
let block_number = block.header.as_ref().map(|h| h.number().clone());
let block_hash = block.hash;
@@ -423,14 +400,19 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
Err(BlockImportError::Cancelled)
} else {
// The actual import.
import_single_block(
import_single_block_metered(
&mut **import_handle,
blocks_origin.clone(),
block,
verifier,
metrics.clone(),
)
};
if let Some(metrics) = metrics.as_ref() {
metrics.report_import::<B>(&import_result);
}
if import_result.is_ok() {
trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash);
imported += 1;
@@ -16,12 +16,17 @@
//! Metering tools for consensus
use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec};
use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec, HistogramVec, HistogramOpts};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use crate::import_queue::{BlockImportResult, BlockImportError};
/// Generic Prometheus metrics for common consensus functionality.
#[derive(Clone)]
pub(crate) struct Metrics {
pub import_queue_processed: CounterVec<U64>,
pub block_verification_time: HistogramVec,
}
impl Metrics {
@@ -34,6 +39,42 @@ impl Metrics {
)?,
registry,
)?,
block_verification_time: register(
HistogramVec::new(
HistogramOpts::new(
"block_verification_time",
"Histogram of time taken to import blocks",
),
&["result"],
)?,
registry,
)?,
})
}
pub fn report_import<B: BlockT>(
&self,
result: &Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
) {
let label = match result {
Ok(_) => "success",
Err(BlockImportError::IncompleteHeader(_)) => "incomplete_header",
Err(BlockImportError::VerificationFailed(_,_)) => "verification_failed",
Err(BlockImportError::BadBlock(_)) => "bad_block",
Err(BlockImportError::MissingState) => "missing_state",
Err(BlockImportError::UnknownParent) => "unknown_parent",
Err(BlockImportError::Cancelled) => "cancelled",
Err(BlockImportError::Other(_)) => "failed",
};
self.import_queue_processed.with_label_values(
&[label]
).inc();
}
pub fn report_verification(&self, success: bool, time: std::time::Duration) {
self.block_verification_time.with_label_values(
&[if success { "success" } else { "verification_failed" }]
).observe(time.as_secs_f64());
}
}