Add prometheus registry to transaction pool, with couple of initial metrics (#5657)

* make new contructor

* add metrics to txpool

* fix review

* fix doc comment

* change to counters

* Update client/transaction-pool/src/metrics.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Update client/transaction-pool/src/metrics.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Update client/transaction-pool/src/metrics.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* use dedicated wrapper

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Nikolay Volf
2020-04-17 12:02:45 +03:00
committed by GitHub
parent 7dcff4c1e2
commit fea626ca84
15 changed files with 157 additions and 26 deletions
+1
View File
@@ -6766,6 +6766,7 @@ dependencies = [
"sp-runtime",
"sp-transaction-pool",
"sp-utils",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client",
"substrate-test-runtime-transaction-pool",
"tracing",
@@ -35,9 +35,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
@@ -183,13 +183,13 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
+4 -4
View File
@@ -56,9 +56,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
@@ -312,12 +312,12 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
@@ -360,7 +360,11 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);
futures::executor::block_on(
@@ -408,7 +412,11 @@ mod tests {
fn should_not_panic_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);
let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
@@ -440,8 +448,13 @@ mod tests {
.build_with_backend();
let client = Arc::new(client);
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);
let genesis_hash = client.info().best_hash;
let block_id = BlockId::Hash(genesis_hash);
@@ -493,7 +506,11 @@ mod tests {
// given
let mut client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);
futures::executor::block_on(
+1 -1
View File
@@ -26,7 +26,7 @@
//! # use substrate_test_runtime_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0);
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0);
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
//!
@@ -217,7 +217,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
@@ -281,7 +281,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
@@ -349,7 +349,7 @@ mod tests {
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool_api = api();
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0);
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone(),
+1
View File
@@ -206,6 +206,7 @@ mod tests {
let pool = Arc::new(TestPool(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0));
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&pool.clone()) as _);
+1
View File
@@ -65,6 +65,7 @@ impl Default for TestSetup {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
TestSetup {
runtime: runtime::Runtime::new().expect("Failed to create runtime in test setup"),
+3
View File
@@ -53,6 +53,7 @@ use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
use sp_blockchain;
use prometheus_endpoint::Registry as PrometheusRegistry;
pub type BackgroundTask = Pin<Box<dyn Future<Output=()> + Send>>;
@@ -585,6 +586,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
sc_transaction_pool::txpool::Options,
Arc<TCl>,
Option<TFchr>,
Option<&PrometheusRegistry>,
) -> Result<(UExPool, Option<BackgroundTask>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UExPool, TRpc, Backend>, Error>
@@ -593,6 +595,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.config.transaction_pool.clone(),
self.client.clone(),
self.fetcher.clone(),
self.config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
if let Some(background_task) = background_task{
+1
View File
@@ -696,6 +696,7 @@ mod tests {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
let source = sp_runtime::transaction_validity::TransactionSource::External;
let best = longest_chain.best_chain().unwrap();
@@ -20,6 +20,7 @@ intervalier = "0.4.0"
log = "0.4.8"
parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] }
parking_lot = "0.10.0"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-dev"}
sc-client-api = { version = "2.0.0-dev", path = "../api" }
sc-transaction-graph = { version = "2.0.0-dev", path = "./graph" }
sp-api = { version = "2.0.0-dev", path = "../../primitives/api" }
+39 -6
View File
@@ -21,8 +21,10 @@
#![warn(unused_extern_crates)]
mod api;
pub mod error;
mod revalidation;
mod metrics;
pub mod error;
#[cfg(any(feature = "test-helpers", test))]
pub mod testing;
@@ -45,6 +47,9 @@ use sp_transaction_pool::{
};
use wasm_timer::Instant;
use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::metrics::MetricsLink as PrometheusMetrics;
type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;
type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
@@ -62,6 +67,7 @@ pub struct BasicPool<PoolApi, Block>
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
metrics: PrometheusMetrics,
}
struct ReadyPoll<T, Block: BlockT> {
@@ -147,8 +153,9 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn new(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
Self::with_revalidation_type(options, pool_api, prometheus, RevalidationType::Full)
}
/// Create new basic transaction pool with provided api, for tests.
@@ -166,6 +173,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
revalidation_queue: Arc::new(revalidation_queue),
revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
ready_poll: Default::default(),
metrics: Default::default(),
},
background_task,
notifier,
@@ -177,6 +185,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone()));
@@ -187,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
(queue, Some(background))
},
};
(
BasicPool {
api: pool_api,
@@ -199,6 +209,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
}
)),
ready_poll: Default::default(),
metrics: PrometheusMetrics::new(prometheus),
},
background_task,
)
@@ -228,8 +239,15 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
self.metrics.report(|metrics| metrics.validations_scheduled.inc_by(xts.len() as u64));
let metrics = self.metrics.clone();
async move {
pool.submit_at(&at, source, xts, false).await
let tx_count = xts.len();
let res = pool.submit_at(&at, source, xts, false).await;
metrics.report(|metrics| metrics.validations_finished.inc_by(tx_count as u64));
res
}.boxed()
}
@@ -241,8 +259,16 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
self.metrics.report(|metrics| metrics.validations_scheduled.inc());
let metrics = self.metrics.clone();
async move {
pool.submit_one(&at, source, xt).await
let res = pool.submit_one(&at, source, xt).await;
metrics.report(|metrics| metrics.validations_finished.inc());
res
}.boxed()
}
@@ -255,10 +281,17 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
let at = *at;
let pool = self.pool.clone();
self.metrics.report(|metrics| metrics.validations_scheduled.inc());
let metrics = self.metrics.clone();
async move {
pool.submit_and_watch(&at, source, xt)
let result = pool.submit_and_watch(&at, source, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
.await
.await;
metrics.report(|metrics| metrics.validations_finished.inc());
result
}.boxed()
}
@@ -0,0 +1,69 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Transaction pool Prometheus metrics.
use std::sync::Arc;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
#[derive(Clone, Default)]
pub struct MetricsLink(Arc<Option<Metrics>>);
impl MetricsLink {
pub fn new(registry: Option<&Registry>) -> Self {
Self(Arc::new(
registry.and_then(|registry|
Metrics::register(registry)
.map_err(|err| { log::warn!("Failed to register prometheus metrics: {}", err); })
.ok()
)
))
}
pub fn report(&self, do_this: impl FnOnce(&Metrics)) {
if let Some(metrics) = self.0.as_ref() {
do_this(metrics);
}
}
}
/// Transaction pool Prometheus metrics.
pub struct Metrics {
pub validations_scheduled: Counter<U64>,
pub validations_finished: Counter<U64>,
}
impl Metrics {
pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
validations_scheduled: register(
Counter::new(
"sub_txpool_validations_scheduled",
"Total number of transactions scheduled for validation",
)?,
registry,
)?,
validations_finished: register(
Counter::new(
"sub_txpool_validations_finished",
"Total number of transactions that finished validation",
)?,
registry,
)?,
})
}
}
@@ -415,7 +415,7 @@ fn finalization() {
let xt = uxt(Alice, 209);
let api = TestApi::with_alice_nonce(209);
api.push_block(1, vec![]);
let (pool, _background) = BasicPool::new(Default::default(), api.into());
let (pool, _background, _) = BasicPool::new_test(api.into());
let watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone())
).expect("1. Imported");
@@ -446,7 +446,7 @@ fn fork_aware_finalization() {
// starting block A1 (last finalized.)
api.push_block(1, vec![]);
let (pool, _background) = BasicPool::new(Default::default(), api.into());
let (pool, _background, _) = BasicPool::new_test(api.into());
let mut canon_watchers = vec![];
let from_alice = uxt(Alice, 1);
@@ -677,7 +677,7 @@ fn should_not_accept_old_signatures() {
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client))).0
BasicPool::new_test(Arc::new(FullChainApi::new(client))).0
);
let transfer = Transfer {
+5 -1
View File
@@ -236,7 +236,11 @@ mod tests {
let _ = env_logger::try_init();
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);
let source = sp_runtime::transaction_validity::TransactionSource::External;