Meter block import results via prometheus (#6025)

This commit is contained in:
Benjamin Kampmann
2020-05-15 17:25:51 +02:00
committed by GitHub
parent 302c543b49
commit efc4849f1a
19 changed files with 163 additions and 18 deletions
+5
View File
@@ -6096,6 +6096,7 @@ dependencies = [
"sp-runtime",
"sp-timestamp",
"sp-version",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client",
"tempfile",
]
@@ -6145,6 +6146,7 @@ dependencies = [
"sp-runtime",
"sp-timestamp",
"sp-version",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client",
"tempfile",
]
@@ -6209,6 +6211,7 @@ dependencies = [
"sp-inherents",
"sp-runtime",
"sp-transaction-pool",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client",
"substrate-test-runtime-transaction-pool",
"tempfile",
@@ -6233,6 +6236,7 @@ dependencies = [
"sp-inherents",
"sp-runtime",
"sp-timestamp",
"substrate-prometheus-endpoint",
]
[[package]]
@@ -7404,6 +7408,7 @@ dependencies = [
"sp-test-primitives",
"sp-utils",
"sp-version",
"substrate-prometheus-endpoint",
]
[[package]]
@@ -43,7 +43,14 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
.with_import_queue(|
_config,
client,
mut select_chain,
_transaction_pool,
spawn_task_handle,
registry,
| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
@@ -65,6 +72,7 @@ macro_rules! new_full_start {
client,
inherent_data_providers.clone(),
spawn_task_handle,
registry,
)?;
import_setup = Some((grandpa_block_import, grandpa_link));
@@ -198,7 +206,16 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
.with_import_queue_and_fprb(|
_config,
client,
backend,
fetcher,
_select_chain,
_tx_pool,
spawn_task_handle,
prometheus_registry,
| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
@@ -220,6 +237,7 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
client,
inherent_data_providers.clone(),
spawn_task_handle,
prometheus_registry,
)?;
Ok((import_queue, finality_proof_request_builder))
+20 -2
View File
@@ -62,7 +62,14 @@ macro_rules! new_full_start {
prometheus_registry,
))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
.with_import_queue(|
_config,
client,
mut select_chain,
_transaction_pool,
spawn_task_handle,
prometheus_registry,
| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
@@ -86,6 +93,7 @@ macro_rules! new_full_start {
client,
inherent_data_providers.clone(),
spawn_task_handle,
prometheus_registry,
)?;
import_setup = Some((block_import, grandpa_link, babe_link));
@@ -309,7 +317,16 @@ pub fn new_light(config: Configuration)
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
.with_import_queue_and_fprb(|
_config,
client,
backend,
fetcher,
_select_chain,
_tx_pool,
spawn_task_handle,
registry,
| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
@@ -338,6 +355,7 @@ pub fn new_light(config: Configuration)
client.clone(),
inherent_data_providers.clone(),
spawn_task_handle,
registry,
)?;
Ok((import_queue, finality_proof_request_builder))
@@ -35,6 +35,7 @@ sp-api = { version = "2.0.0-dev", path = "../../../primitives/api" }
sp-runtime = { version = "2.0.0-dev", path = "../../../primitives/runtime" }
sp-timestamp = { version = "2.0.0-dev", path = "../../../primitives/timestamp" }
sc-telemetry = { version = "2.0.0-dev", path = "../../telemetry" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-dev"}
[dev-dependencies]
sp-keyring = { version = "2.0.0-dev", path = "../../../primitives/keyring" }
@@ -38,6 +38,7 @@ use std::{
use futures::prelude::*;
use parking_lot::Mutex;
use log::{debug, info, trace};
use prometheus_endpoint::Registry;
use codec::{Encode, Decode, Codec};
@@ -816,6 +817,7 @@ pub fn import_queue<B, I, C, P, S>(
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: &S,
registry: Option<&Registry>,
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
@@ -842,6 +844,7 @@ pub fn import_queue<B, I, C, P, S>(
justification_import,
finality_proof_import,
spawner,
registry,
))
}
@@ -38,6 +38,7 @@ sc-consensus-uncles = { version = "0.8.0-dev", path = "../uncles" }
sc-consensus-slots = { version = "0.8.0-dev", path = "../slots" }
sp-runtime = { version = "2.0.0-dev", path = "../../../primitives/runtime" }
fork-tree = { version = "2.0.0-dev", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-dev"}
futures = "0.3.4"
futures-timer = "3.0.1"
parking_lot = "0.10.0"
@@ -109,6 +109,7 @@ use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::prelude::*;
use log::{debug, info, log, trace, warn};
use prometheus_endpoint::Registry;
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
};
@@ -1291,6 +1292,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnBlocking,
registry: Option<&Registry>,
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
@@ -1314,6 +1316,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
justification_import,
finality_proof_import,
spawner,
registry,
))
}
@@ -22,19 +22,20 @@ parking_lot = "0.10.0"
serde = { version = "1.0", features=["derive"] }
assert_matches = "1.3.0"
sc-client-api = { path = "../../../client/api" , version = "2.0.0-dev"}
sc-transaction-pool = { path = "../../transaction-pool" , version = "2.0.0-dev"}
sp-blockchain = { path = "../../../primitives/blockchain" , version = "2.0.0-dev"}
sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common" , version = "0.8.0-dev"}
sp-inherents = { path = "../../../primitives/inherents" , version = "2.0.0-dev"}
sp-runtime = { path = "../../../primitives/runtime" , version = "2.0.0-dev"}
sp-core = { path = "../../../primitives/core" , version = "2.0.0-dev"}
sp-transaction-pool = { path = "../../../primitives/transaction-pool" , version = "2.0.0-dev"}
sc-client-api = { path = "../../../client/api", version = "2.0.0-dev" }
sc-transaction-pool = { path = "../../transaction-pool", version = "2.0.0-dev" }
sp-blockchain = { path = "../../../primitives/blockchain", version = "2.0.0-dev" }
sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common", version = "0.8.0-dev" }
sp-inherents = { path = "../../../primitives/inherents", version = "2.0.0-dev" }
sp-runtime = { path = "../../../primitives/runtime", version = "2.0.0-dev" }
sp-core = { path = "../../../primitives/core", version = "2.0.0-dev" }
sp-transaction-pool = { path = "../../../primitives/transaction-pool", version = "2.0.0-dev" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-dev" }
[dev-dependencies]
sc-basic-authorship = { path = "../../basic-authorship" , version = "0.8.0-dev"}
substrate-test-runtime-client = { path = "../../../test-utils/runtime/client" , version = "2.0.0-dev"}
substrate-test-runtime-transaction-pool = { path = "../../../test-utils/runtime/transaction-pool" , version = "2.0.0-dev"}
sc-basic-authorship = { path = "../../basic-authorship", version = "0.8.0-dev" }
substrate-test-runtime-client = { path = "../../../test-utils/runtime/client", version = "2.0.0-dev" }
substrate-test-runtime-transaction-pool = { path = "../../../test-utils/runtime/transaction-pool", version = "2.0.0-dev" }
tokio = { version = "0.2", features = ["rt-core", "macros"] }
env_logger = "0.7.0"
tempfile = "3.1.0"
@@ -29,6 +29,7 @@ use sp_runtime::{traits::Block as BlockT, Justification};
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use sc_transaction_pool::txpool;
use std::{sync::Arc, marker::PhantomData};
use prometheus_endpoint::Registry;
mod error;
mod finalize_block;
@@ -69,6 +70,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
pub fn import_queue<Block, Transaction>(
block_import: BoxBlockImport<Block, Transaction>,
spawner: &impl sp_core::traits::SpawnBlocking,
registry: Option<&Registry>,
) -> BasicQueue<Block, Transaction>
where
Block: BlockT,
@@ -80,6 +82,7 @@ pub fn import_queue<Block, Transaction>(
None,
None,
spawner,
registry,
)
}
@@ -26,3 +26,4 @@ log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
sp-timestamp = { version = "2.0.0-dev", path = "../../../primitives/timestamp" }
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-dev"}
@@ -54,6 +54,7 @@ use sp_consensus::import_queue::{
BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport,
};
use codec::{Encode, Decode};
use prometheus_endpoint::Registry;
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
@@ -465,6 +466,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnBlocking,
registry: Option<&Registry>,
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
@@ -483,6 +485,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
justification_import,
finality_proof_import,
spawner,
registry,
))
}
@@ -87,6 +87,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
None,
None,
&sp_core::testing::SpawnBlockingExecutor::new(),
None,
));
let worker = NetworkWorker::new(config::Params {
@@ -93,6 +93,7 @@ fn async_import_queue_drops() {
None,
None,
&executor,
None,
);
drop(queue);
}
+2
View File
@@ -613,6 +613,7 @@ pub trait TestNetFactory: Sized {
justification_import,
finality_proof_import,
&sp_core::testing::SpawnBlockingExecutor::new(),
None,
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
@@ -691,6 +692,7 @@ pub trait TestNetFactory: Sized {
justification_import,
finality_proof_import,
&sp_core::testing::SpawnBlockingExecutor::new(),
None,
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
+7 -3
View File
@@ -485,7 +485,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
/// Defines which import queue to use.
pub fn with_import_queue<UImpQu>(
self,
builder: impl FnOnce(&Configuration, Arc<TCl>, Option<TSc>, Arc<TExPool>, &SpawnTaskHandle)
builder: impl FnOnce(&Configuration, Arc<TCl>, Option<TSc>, Arc<TExPool>, &SpawnTaskHandle, Option<&Registry>)
-> Result<UImpQu, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, TFprb, TFpp,
TExPool, TRpc, Backend>, Error>
@@ -496,6 +496,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.select_chain.clone(),
self.transaction_pool.clone(),
&self.task_manager.spawn_handle(),
self.config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
Ok(ServiceBuilder {
@@ -586,6 +587,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
Option<TSc>,
Arc<TExPool>,
&SpawnTaskHandle,
Option<&Registry>,
) -> Result<(UImpQu, Option<UFprb>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TExPool, TRpc, Backend>, Error>
@@ -598,6 +600,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.select_chain.clone(),
self.transaction_pool.clone(),
&self.task_manager.spawn_handle(),
self.config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
Ok(ServiceBuilder {
@@ -630,12 +633,13 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
Option<TSc>,
Arc<TExPool>,
&SpawnTaskHandle,
Option<&Registry>,
) -> Result<(UImpQu, UFprb), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb|
builder(cfg, cl, b, f, sc, tx, tb)
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb, pr|
builder(cfg, cl, b, f, sc, tx, tb, pr)
.map(|(q, f)| (q, Some(f)))
)
}
@@ -29,6 +29,7 @@ sp-utils = { version = "2.0.0-dev", path = "../../utils" }
codec = { package = "parity-scale-codec", version = "1.3.0", features = ["derive"] }
parking_lot = "0.10.0"
serde = { version = "1.0", features = ["derive"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-dev"}
[dev-dependencies]
sp-test-primitives = { version = "2.0.0-dev", path = "../../test-primitives" }
@@ -20,8 +20,10 @@ use futures::{prelude::*, task::Context, task::Poll};
use futures_timer::Delay;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use prometheus_endpoint::Registry;
use crate::block_import::BlockOrigin;
use crate::metrics::Metrics;
use crate::import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
@@ -58,14 +60,21 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
spawner: &impl sp_core::traits::SpawnBlocking,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let metrics = prometheus_registry.and_then(|r|
Metrics::register(r)
.map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); })
.ok()
);
let (future, worker_sender) = BlockImportWorker::new(
result_sender,
verifier,
block_import,
justification_import,
finality_proof_import,
metrics,
);
spawner.spawn_blocking("basic-block-import-worker", future.boxed());
@@ -133,9 +142,15 @@ struct BlockImportWorker<B: BlockT, Transaction> {
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
_phantom: PhantomData<Transaction>,
}
const METRIC_SUCCESS_FIELDS: [&'static str; 8] = [
"success", "incomplete_header", "verification_failed", "bad_block",
"missing_state", "unknown_parent", "cancelled", "failed"
];
impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
fn new<V: 'static + Verifier<B>>(
result_sender: BufferedLinkSender<B>,
@@ -143,6 +158,7 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
block_import: BoxBlockImport<B, Transaction>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
metrics: Option<Metrics>,
) -> (impl Future<Output = ()> + Send, TracingUnboundedSender<ToWorkerMsg<B>>) {
let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker");
@@ -151,6 +167,7 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
justification_import,
finality_proof_import,
delay_between_blocks: Duration::new(0, 0),
metrics,
_phantom: PhantomData,
};
@@ -241,9 +258,31 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
blocks: Vec<IncomingBlock<B>>
) -> impl Future<Output = (BoxBlockImport<B, Transaction>, V)> {
let mut result_sender = self.result_sender.clone();
let metrics = self.metrics.clone();
import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks)
.then(move |(imported, count, results, block_import, verifier)| {
if let Some(metrics) = metrics {
let amounts = results.iter().fold([0u64; 8], |mut acc, result| {
match result.0 {
Ok(_) => acc[0] += 1,
Err(BlockImportError::IncompleteHeader(_)) => acc[1] += 1,
Err(BlockImportError::VerificationFailed(_,_)) => acc[2] += 1,
Err(BlockImportError::BadBlock(_)) => acc[3] += 1,
Err(BlockImportError::MissingState) => acc[4] += 1,
Err(BlockImportError::UnknownParent) => acc[5] += 1,
Err(BlockImportError::Cancelled) => acc[6] += 1,
Err(BlockImportError::Other(_)) => acc[7] += 1,
};
acc
});
for (idx, field) in METRIC_SUCCESS_FIELDS.iter().enumerate() {
let amount = amounts[idx];
if amount > 0 {
metrics.import_queue_processed.with_label_values(&[&field]).inc_by(amount)
}
};
}
result_sender.blocks_processed(imported, count, results);
future::ready((block_import, verifier))
})
@@ -44,6 +44,7 @@ pub mod block_import;
mod select_chain;
pub mod import_queue;
pub mod evaluation;
mod metrics;
// block size limit.
const MAX_BLOCK_SIZE: usize = 4 * 1024 * 1024 + 512;
@@ -0,0 +1,39 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Metering tools for consensus
use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec};
/// Generic Prometheus metrics for common consensus functionality.
#[derive(Clone)]
pub(crate) struct Metrics {
pub import_queue_processed: CounterVec<U64>,
}
impl Metrics {
pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
import_queue_processed: register(
CounterVec::new(
Opts::new("import_queue_processed_total", "Blocks processed by import queue"),
&["result"] // 'success or failure
)?,
registry,
)?,
})
}
}