mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 23:21:02 +00:00
Parachain loop metrics (#1484)
* parachain loop metrics * some fixes * mini refactoring * add tests
This commit is contained in:
committed by
Bastian Köcher
parent
f8ff3c9142
commit
d11d9fd0b7
@@ -23,7 +23,10 @@ use async_trait::async_trait;
|
||||
use bp_parachains::parachain_head_storage_key_at_source;
|
||||
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
|
||||
use codec::Decode;
|
||||
use parachains_relay::parachains_loop::{ParaHashAtSource, SourceClient};
|
||||
use parachains_relay::{
|
||||
parachains_loop::{ParaHashAtSource, SourceClient},
|
||||
parachains_loop_metrics::ParachainsLoopMetrics,
|
||||
};
|
||||
use relay_substrate_client::{
|
||||
Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
|
||||
};
|
||||
@@ -100,6 +103,7 @@ where
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<ParaHashAtSource, Self::Error> {
|
||||
// we don't need to support many parachains now
|
||||
@@ -111,9 +115,11 @@ where
|
||||
)))
|
||||
}
|
||||
|
||||
Ok(match self.on_chain_parachain_header(at_block, para_id).await? {
|
||||
let mut para_header_number_at_source = None;
|
||||
let para_hash_at_source = match self.on_chain_parachain_header(at_block, para_id).await? {
|
||||
Some(parachain_header) => {
|
||||
let mut parachain_head = ParaHashAtSource::Some(parachain_header.hash());
|
||||
para_header_number_at_source = Some(*parachain_header.number());
|
||||
// never return head that is larger than requested. This way we'll never sync
|
||||
// headers past `maximal_header_id`
|
||||
if let Some(ref maximal_header_id) = self.maximal_header_id {
|
||||
@@ -125,11 +131,13 @@ where
|
||||
// we don't want this header yet => let's report previously requested
|
||||
// header
|
||||
parachain_head = ParaHashAtSource::Some(maximal_header_id.1);
|
||||
para_header_number_at_source = Some(maximal_header_id.0);
|
||||
},
|
||||
Some(_) => (),
|
||||
None => {
|
||||
// on-demand relay has not yet asked us to sync anything let's do that
|
||||
parachain_head = ParaHashAtSource::Unavailable;
|
||||
para_header_number_at_source = None;
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -137,7 +145,15 @@ where
|
||||
parachain_head
|
||||
},
|
||||
None => ParaHashAtSource::None,
|
||||
})
|
||||
};
|
||||
|
||||
if let (Some(metrics), Some(para_header_number_at_source)) =
|
||||
(metrics, para_header_number_at_source)
|
||||
{
|
||||
metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source);
|
||||
}
|
||||
|
||||
Ok(para_hash_at_source)
|
||||
}
|
||||
|
||||
async fn prove_parachain_heads(
|
||||
|
||||
@@ -24,13 +24,19 @@ use crate::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bp_parachains::{parachain_head_storage_key_at_target, BestParaHeadHash};
|
||||
use bp_polkadot_core::parachains::{ParaHeadsProof, ParaId};
|
||||
use bp_parachains::{
|
||||
best_parachain_head_hash_storage_key_at_target, imported_parachain_head_storage_key_at_target,
|
||||
BestParaHeadHash,
|
||||
};
|
||||
use bp_polkadot_core::parachains::{ParaHead, ParaHeadsProof, ParaId};
|
||||
use codec::{Decode, Encode};
|
||||
use parachains_relay::parachains_loop::TargetClient;
|
||||
use parachains_relay::{
|
||||
parachains_loop::TargetClient, parachains_loop_metrics::ParachainsLoopMetrics,
|
||||
};
|
||||
use relay_substrate_client::{
|
||||
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
|
||||
HeaderIdOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction,
|
||||
HeaderIdOf, HeaderOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme,
|
||||
UnsignedTransaction,
|
||||
};
|
||||
use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
|
||||
use sp_core::{Bytes, Pair};
|
||||
@@ -115,15 +121,46 @@ where
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::TargetChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<BestParaHeadHash>, Self::Error> {
|
||||
let storage_key = parachain_head_storage_key_at_target(
|
||||
let best_para_head_hash_key = best_parachain_head_hash_storage_key_at_target(
|
||||
P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME,
|
||||
para_id,
|
||||
);
|
||||
let para_head = self.client.storage_value(storage_key, Some(at_block.1)).await?;
|
||||
let best_para_head_hash: Option<BestParaHeadHash> =
|
||||
self.client.storage_value(best_para_head_hash_key, Some(at_block.1)).await?;
|
||||
if let (Some(metrics), &Some(ref best_para_head_hash)) = (metrics, &best_para_head_hash) {
|
||||
let imported_para_head_key = imported_parachain_head_storage_key_at_target(
|
||||
P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME,
|
||||
para_id,
|
||||
best_para_head_hash.head_hash,
|
||||
);
|
||||
let imported_para_header = self
|
||||
.client
|
||||
.storage_value::<ParaHead>(imported_para_head_key, Some(at_block.1))
|
||||
.await?
|
||||
.and_then(|h| match HeaderOf::<P::SourceParachain>::decode(&mut &h.0[..]) {
|
||||
Ok(header) => Some(header),
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
target: "bridge-metrics",
|
||||
"Failed to decode {} parachain header at {}: {:?}. Metric will have obsolete value",
|
||||
P::SourceParachain::NAME,
|
||||
P::TargetChain::NAME,
|
||||
e,
|
||||
);
|
||||
|
||||
Ok(para_head)
|
||||
None
|
||||
},
|
||||
});
|
||||
if let Some(imported_para_header) = imported_para_header {
|
||||
metrics
|
||||
.update_best_parachain_block_at_target(para_id, *imported_para_header.number());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(best_para_head_hash)
|
||||
}
|
||||
|
||||
async fn submit_parachain_heads_proof(
|
||||
|
||||
@@ -76,9 +76,13 @@ pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
|
||||
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
|
||||
|
||||
/// Get parachain head hash at given block.
|
||||
///
|
||||
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source`
|
||||
/// on provided `metrics` object to update corresponding metric value.
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<ParaHashAtSource, Self::Error>;
|
||||
|
||||
@@ -103,9 +107,13 @@ pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
|
||||
) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;
|
||||
|
||||
/// Get parachain head hash at given block.
|
||||
///
|
||||
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target`
|
||||
/// on provided `metrics` object to update corresponding metric value.
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::TargetChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<BestParaHeadHash>, Self::Error>;
|
||||
|
||||
@@ -158,7 +166,7 @@ async fn run_until_connection_lost<P: ParachainsPipeline>(
|
||||
source_client: impl SourceClient<P>,
|
||||
target_client: impl TargetClient<P>,
|
||||
sync_params: ParachainSyncParams,
|
||||
_metrics: Option<ParachainsLoopMetrics>,
|
||||
metrics: Option<ParachainsLoopMetrics>,
|
||||
exit_signal: impl Future<Output = ()> + Send,
|
||||
) -> Result<(), FailedClient>
|
||||
where
|
||||
@@ -213,9 +221,13 @@ where
|
||||
log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceChain::NAME, e);
|
||||
FailedClient::Target
|
||||
})?;
|
||||
let heads_at_target =
|
||||
read_heads_at_target(&target_client, &best_target_block, &sync_params.parachains)
|
||||
.await?;
|
||||
let heads_at_target = read_heads_at_target(
|
||||
&target_client,
|
||||
metrics.as_ref(),
|
||||
&best_target_block,
|
||||
&sync_params.parachains,
|
||||
)
|
||||
.await?;
|
||||
tx_tracker = tx_tracker.take().and_then(|tx_tracker| tx_tracker.update(&heads_at_target));
|
||||
if tx_tracker.is_some() {
|
||||
continue
|
||||
@@ -238,6 +250,7 @@ where
|
||||
})?;
|
||||
let heads_at_source = read_heads_at_source(
|
||||
&source_client,
|
||||
metrics.as_ref(),
|
||||
&best_finalized_relay_block,
|
||||
&sync_params.parachains,
|
||||
)
|
||||
@@ -398,12 +411,13 @@ fn is_update_required(sync_params: &ParachainSyncParams, updated_ids: &[ParaId])
|
||||
/// Guarantees that the returning map will have an entry for every parachain from `parachains`.
|
||||
async fn read_heads_at_source<P: ParachainsPipeline>(
|
||||
source_client: &impl SourceClient<P>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
at_relay_block: &HeaderIdOf<P::SourceChain>,
|
||||
parachains: &[ParaId],
|
||||
) -> Result<BTreeMap<ParaId, ParaHashAtSource>, FailedClient> {
|
||||
let mut para_head_hashes = BTreeMap::new();
|
||||
for para in parachains {
|
||||
let para_head = source_client.parachain_head(*at_relay_block, *para).await;
|
||||
let para_head = source_client.parachain_head(*at_relay_block, metrics, *para).await;
|
||||
match para_head {
|
||||
Ok(para_head) => {
|
||||
para_head_hashes.insert(*para, para_head);
|
||||
@@ -428,12 +442,13 @@ async fn read_heads_at_source<P: ParachainsPipeline>(
|
||||
/// Guarantees that the returning map will have an entry for every parachain from `parachains`.
|
||||
async fn read_heads_at_target<P: ParachainsPipeline>(
|
||||
target_client: &impl TargetClient<P>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
at_block: &HeaderIdOf<P::TargetChain>,
|
||||
parachains: &[ParaId],
|
||||
) -> Result<BTreeMap<ParaId, Option<BestParaHeadHash>>, FailedClient> {
|
||||
let mut para_best_head_hashes = BTreeMap::new();
|
||||
for para in parachains {
|
||||
let para_best_head = target_client.parachain_head(*at_block, *para).await;
|
||||
let para_best_head = target_client.parachain_head(*at_block, metrics, *para).await;
|
||||
match para_best_head {
|
||||
Ok(para_best_head) => {
|
||||
para_best_head_hashes.insert(*para, para_best_head);
|
||||
@@ -638,6 +653,7 @@ mod tests {
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
_at_block: HeaderIdOf<TestChain>,
|
||||
_metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<ParaHashAtSource, TestError> {
|
||||
match self.data.lock().await.source_heads.get(¶_id.0).cloned() {
|
||||
@@ -684,6 +700,7 @@ mod tests {
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
_at_block: HeaderIdOf<TestChain>,
|
||||
_metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<BestParaHeadHash>, TestError> {
|
||||
self.data.lock().await.target_heads.get(¶_id.0).cloned().transpose()
|
||||
|
||||
@@ -14,21 +14,85 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use relay_utils::metrics::{Metric, PrometheusError, Registry};
|
||||
use bp_polkadot_core::parachains::ParaId;
|
||||
use relay_utils::metrics::{
|
||||
metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64,
|
||||
};
|
||||
|
||||
/// Parachains sync metrics.
|
||||
#[derive(Clone)]
|
||||
pub struct ParachainsLoopMetrics;
|
||||
pub struct ParachainsLoopMetrics {
|
||||
/// Best parachains header numbers at the source.
|
||||
best_source_block_numbers: GaugeVec<U64>,
|
||||
/// Best parachains header numbers at the target.
|
||||
best_target_block_numbers: GaugeVec<U64>,
|
||||
}
|
||||
|
||||
impl ParachainsLoopMetrics {
|
||||
/// Create and register parachains loop metrics.
|
||||
pub fn new(_prefix: Option<&str>) -> Result<Self, PrometheusError> {
|
||||
Ok(ParachainsLoopMetrics)
|
||||
pub fn new(prefix: Option<&str>) -> Result<Self, PrometheusError> {
|
||||
Ok(ParachainsLoopMetrics {
|
||||
best_source_block_numbers: GaugeVec::new(
|
||||
Opts::new(
|
||||
metric_name(prefix, "best_parachain_block_number_at_source"),
|
||||
"Best parachain block numbers at the source relay chain".to_string(),
|
||||
),
|
||||
&["parachain"],
|
||||
)?,
|
||||
best_target_block_numbers: GaugeVec::new(
|
||||
Opts::new(
|
||||
metric_name(prefix, "best_parachain_block_number_at_target"),
|
||||
"Best parachain block numbers at the target chain".to_string(),
|
||||
),
|
||||
&["parachain"],
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Update best block number at source.
|
||||
pub fn update_best_parachain_block_at_source<Number: Into<u64>>(
|
||||
&self,
|
||||
parachain: ParaId,
|
||||
block_number: Number,
|
||||
) {
|
||||
let block_number = block_number.into();
|
||||
let label = parachain_label(¶chain);
|
||||
log::trace!(
|
||||
target: "bridge-metrics",
|
||||
"Updated value of metric 'best_parachain_block_number_at_source[{}]': {:?}",
|
||||
label,
|
||||
block_number,
|
||||
);
|
||||
self.best_source_block_numbers.with_label_values(&[&label]).set(block_number);
|
||||
}
|
||||
|
||||
/// Update best block number at target.
|
||||
pub fn update_best_parachain_block_at_target<Number: Into<u64>>(
|
||||
&self,
|
||||
parachain: ParaId,
|
||||
block_number: Number,
|
||||
) {
|
||||
let block_number = block_number.into();
|
||||
let label = parachain_label(¶chain);
|
||||
log::trace!(
|
||||
target: "bridge-metrics",
|
||||
"Updated value of metric 'best_parachain_block_number_at_target[{}]': {:?}",
|
||||
label,
|
||||
block_number,
|
||||
);
|
||||
self.best_target_block_numbers.with_label_values(&[&label]).set(block_number);
|
||||
}
|
||||
}
|
||||
|
||||
impl Metric for ParachainsLoopMetrics {
|
||||
fn register(&self, _registry: &Registry) -> Result<(), PrometheusError> {
|
||||
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
|
||||
register(self.best_source_block_numbers.clone(), registry)?;
|
||||
register(self.best_target_block_numbers.clone(), registry)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Return metric label for the parachain.
|
||||
fn parachain_label(parachain: &ParaId) -> String {
|
||||
format!("para_{}", parachain.0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user