Fix on-different-forks metrics during initialization (#1468)

* fix on-different-forks metrics during initialization

* "initialize" parachain finality pallet in on-demand parachains relay

* decrease converstion rate requests count

* more error logging

* fix compilation

* clippy
This commit is contained in:
Svyatoslav Nikolsky
2022-06-21 14:52:07 +03:00
committed by Bastian Köcher
parent 418942826d
commit 31a2be845c
25 changed files with 173 additions and 105 deletions
@@ -58,4 +58,9 @@ pub enum Error<Hash: Debug + MaybeDisplay, HeaderNumber: Debug + MaybeDisplay> {
/// Failed to retrieve best finalized source header hash from the target chain.
#[error("Failed to retrieve best finalized {0} header from the target chain: {1}")]
RetrieveBestFinalizedHeaderHash(&'static str, client::Error),
/// Failed to submit signed extrinsic from to the target chain.
#[error(
"Failed to retrieve `is_initialized` flag of the with-{0} finality pallet at {1}: {2:?}"
)]
IsInitializedRetrieve(&'static str, &'static str, client::Error),
}
@@ -37,7 +37,7 @@ use std::marker::PhantomData;
/// Finality enfine, used by the Substrate chain.
#[async_trait]
pub trait Engine<C: Chain> {
pub trait Engine<C: Chain>: Send {
/// Unique consensus engine identifier.
const ID: ConsensusEngineId;
/// Type of finality proofs, used by consensus engine.
@@ -59,6 +59,23 @@ pub trait Engine<C: Chain> {
async fn prepare_initialization_data(
client: Client<C>,
) -> Result<Self::InitializationData, Error<HashOf<C>, BlockNumberOf<C>>>;
/// Returns `Ok(true)` if finality pallet at the bridged chain has already been initialized.
async fn is_initialized<TargetChain: Chain>(
target_client: &Client<TargetChain>,
) -> Result<bool, SubstrateError> {
Ok(target_client
.raw_storage_value(Self::is_initialized_key(), None)
.await?
.is_some())
}
/// Returns `Ok(true)` if finality pallet at the bridged chain is halted.
async fn is_halted<TargetChain: Chain>(
target_client: &Client<TargetChain>,
) -> Result<bool, SubstrateError> {
Ok(target_client.storage_value(Self::is_halted_key(), None).await?.unwrap_or(false))
}
}
/// GRANDPA finality engine.
@@ -23,7 +23,7 @@
use crate::{error::Error, finality::engine::Engine};
use relay_substrate_client::{BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf};
use relay_substrate_client::{Chain, Client, Error as SubstrateError};
use sp_core::Bytes;
use sp_runtime::traits::Header as HeaderT;
@@ -80,7 +80,9 @@ where
+ Send
+ 'static,
{
let is_initialized = is_initialized::<E, SourceChain, TargetChain>(&target_client).await?;
let is_initialized = E::is_initialized(&target_client)
.await
.map_err(|e| Error::IsInitializedRetrieve(SourceChain::NAME, TargetChain::NAME, e))?;
if is_initialized {
log::info!(
target: "bridge",
@@ -108,18 +110,3 @@ where
.map_err(|err| Error::SubmitTransaction(TargetChain::NAME, err))?;
Ok(Some(initialization_tx_hash))
}
/// Returns `Ok(true)` if bridge has already been initialized.
pub(crate) async fn is_initialized<
E: Engine<SourceChain>,
SourceChain: Chain,
TargetChain: Chain,
>(
target_client: &Client<TargetChain>,
) -> Result<bool, Error<HashOf<SourceChain>, BlockNumberOf<SourceChain>>> {
Ok(target_client
.raw_storage_value(E::is_initialized_key(), None)
.await
.map_err(|err| Error::RetrieveBestFinalizedHeaderHash(SourceChain::NAME, err))?
.is_some())
}
@@ -51,15 +51,12 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalityTarget<P> {
/// Ensure that the bridge pallet at target chain is active.
pub async fn ensure_pallet_active(&self) -> Result<(), Error> {
let is_halted = self.client.storage_value(P::FinalityEngine::is_halted_key(), None).await?;
if is_halted.unwrap_or(false) {
let is_halted = P::FinalityEngine::is_halted(&self.client).await?;
if is_halted {
return Err(Error::BridgePalletIsHalted)
}
let is_initialized =
super::initialize::is_initialized::<P::FinalityEngine, P::SourceChain, P::TargetChain>(
&self.client,
)
let is_initialized = P::FinalityEngine::is_initialized(&self.client)
.await
.map_err(|e| Error::Custom(e.to_string()))?;
if !is_initialized {
@@ -146,7 +146,11 @@ where
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver
//
// we can't continue to deliver messages if target node is out of sync, because
// it may have already received (some of) messages that we're going to deliver
self.source_client.ensure_synced().await?;
self.target_client.ensure_synced().await?;
// we can't relay confirmations if messages pallet at source chain is halted
self.ensure_pallet_active().await?;
@@ -562,9 +566,13 @@ where
Some(at_self_hash),
)
.await?;
let decoded_best_finalized_peer_on_self: (BlockNumberOf<PeerChain>, HashOf<PeerChain>) =
Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..])
.map_err(SubstrateError::ResponseParseFailed)?;
let decoded_best_finalized_peer_on_self =
Option::<(BlockNumberOf<PeerChain>, HashOf<PeerChain>)>::decode(
&mut &encoded_best_finalized_peer_on_self.0[..],
)
.map_err(SubstrateError::ResponseParseFailed)?
.map(Ok)
.unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized))?;
let peer_on_self_best_finalized_id =
HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1);
@@ -146,8 +146,12 @@ where
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{
async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver
//
// we can't continue to deliver messages if target node is out of sync, because
// it may have already received (some of) messages that we're going to deliver
self.source_client.ensure_synced().await?;
self.target_client.ensure_synced().await?;
// we can't relay messages if messages pallet at target chain is halted
self.ensure_pallet_active().await?;
@@ -253,7 +253,7 @@ async fn background_task<P: SubstrateFinalitySyncPipeline>(
log::info!(
target: "bridge",
"[{}] Starting on-demand relay task\n\t\
"[{}] Starting on-demand headers relay task\n\t\
Only mandatory headers: {}\n\t\
Tx mortality: {:?} (~{}m)\n\t\
Stall timeout: {:?}",
@@ -181,6 +181,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
new_required_parachain_header_number,
);
},
_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
_ = parachains_relay_task => {
// this should never happen in practice given the current code
restart_relay = true;
@@ -266,7 +267,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
log::info!(
target: "bridge",
"[{}] Starting on-demand-relay task\n\t\
"[{}] Starting on-demand-parachains relay task\n\t\
Tx mortality: {:?} (~{}m)\n\t\
Stall timeout: {:?}",
relay_task_name,
@@ -317,7 +318,7 @@ struct RelayData<ParaHash, ParaNumber, RelayNumber> {
/// Parachain header number that is required at the target chain.
pub required_para_header: ParaNumber,
/// Parachain header number, known to the target chain.
pub para_header_at_target: ParaNumber,
pub para_header_at_target: Option<ParaNumber>,
/// Parachain header id, known to the source (relay) chain.
pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
/// Parachain header, that is available at the source relay chain at `relay_header_at_target`
@@ -374,9 +375,15 @@ where
best_target_block_hash,
P::SourceParachain::BEST_FINALIZED_HEADER_ID_METHOD,
)
.await
.map_err(map_target_err)?
.0;
.await;
// if there are no parachain heads at the target (`BridgePalletIsNotInitialized`), we'll need
// to submit at least one. Otherwise the pallet will be treated as uninitialized and messages
// sync will stall.
let para_header_at_target = match para_header_at_target {
Ok(para_header_at_target) => Some(para_header_at_target.0),
Err(SubstrateError::BridgePalletIsNotInitialized) => None,
Err(e) => return Err(map_target_err(e)),
};
let best_finalized_relay_header =
source.client().best_finalized_header().await.map_err(map_source_err)?;
@@ -424,7 +431,7 @@ fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
) -> RelayState<ParaHash, ParaNumber, RelayNumber>
where
ParaHash: Clone,
ParaNumber: Copy + PartialOrd,
ParaNumber: Copy + PartialOrd + Zero,
RelayNumber: Copy + Debug + Ord,
{
// this switch is responsible for processing `RelayingRelayHeader` state
@@ -450,30 +457,38 @@ where
}
// this switch is responsible for processing `RelayingParaHeader` state
let para_header_at_target_or_zero = data.para_header_at_target.unwrap_or_else(Zero::zero);
match state {
RelayState::Idle => (),
RelayState::RelayingRelayHeader(_) => unreachable!("processed by previous match; qed"),
RelayState::RelayingParaHeader(para_header_id) => {
if data.para_header_at_target < para_header_id.0 {
if para_header_at_target_or_zero < para_header_id.0 {
// parachain header hasn't yet been relayed
return RelayState::RelayingParaHeader(para_header_id)
}
},
}
// if we have already satisfied our "customer", do nothing
if data.required_para_header <= data.para_header_at_target {
return RelayState::Idle
}
// if we haven't read para head from the source, we can't yet do anyhting
let para_header_at_source = match data.para_header_at_source {
Some(ref para_header_at_source) => para_header_at_source.clone(),
None => return RelayState::Idle,
};
// if we have parachain head at the source, but no parachain heads at the target, we'll need
// to deliver at least one parachain head
let (required_para_header, para_header_at_target) = match data.para_header_at_target {
Some(para_header_at_target) => (data.required_para_header, para_header_at_target),
None => (para_header_at_source.0, Zero::zero()),
};
// if we have already satisfied our "customer", do nothing
if required_para_header <= para_header_at_target {
return RelayState::Idle
}
// if required header is not available even at the source chain, let's wait
if data.required_para_header > para_header_at_source.0 {
if required_para_header > para_header_at_source.0 {
return RelayState::Idle
}
@@ -499,7 +514,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: 50,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 700,
@@ -517,7 +532,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: 50,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 750,
@@ -535,7 +550,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: 50,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -552,7 +567,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: 50,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -570,7 +585,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -588,7 +603,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: None,
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -606,7 +621,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -624,7 +639,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: 780,
@@ -642,7 +657,7 @@ mod tests {
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: 800,
@@ -660,7 +675,7 @@ mod tests {
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 120,
para_header_at_target: 105,
para_header_at_target: Some(105),
para_header_at_source: None,
relay_header_at_source: 800,
relay_header_at_target: 800,
@@ -671,4 +686,40 @@ mod tests {
RelayState::Idle,
);
}
#[test]
fn relay_starts_relaying_first_parachain_header() {
assert_eq!(
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 0,
para_header_at_target: None,
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: 800,
para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
},
RelayState::Idle,
),
RelayState::RelayingParaHeader(HeaderId(125, 125)),
);
}
#[test]
fn relay_starts_relaying_relay_header_to_relay_first_parachain_header() {
assert_eq!(
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 0,
para_header_at_target: None,
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: 700,
para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
},
RelayState::Idle,
),
RelayState::RelayingRelayHeader(800),
);
}
}
@@ -102,11 +102,13 @@ where
Some(at_block.1),
)
.await?;
let decoded_best_finalized_source_block: (
BlockNumberOf<P::SourceRelayChain>,
HashOf<P::SourceRelayChain>,
) = Decode::decode(&mut &encoded_best_finalized_source_block.0[..])
.map_err(SubstrateError::ResponseParseFailed)?;
let decoded_best_finalized_source_block =
Option::<(BlockNumberOf<P::SourceRelayChain>, HashOf<P::SourceRelayChain>)>::decode(
&mut &encoded_best_finalized_source_block.0[..],
)
.map_err(SubstrateError::ResponseParseFailed)?
.map(Ok)
.unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized))?;
Ok(HeaderId(decoded_best_finalized_source_block.0, decoded_best_finalized_source_block.1))
}