mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 08:41:02 +00:00
Fix multiple parachain headers submission for single message delivery (#1916)
* switch off parachains relay when we don't need to relay parachains (temp solution) * parachains relay now only works with single parachain * fix usages of parachains relay * revert hacky fix * fixes * fixed Westmint parachain ID * fixed metrics * fixed compilation * fmt * clippy * call -> typed_state_call
This commit is contained in:
committed by
Bastian Köcher
parent
ba155f990f
commit
e7f5560951
@@ -36,9 +36,7 @@ use bp_runtime::HeaderIdProvider;
|
||||
use futures::{select, FutureExt};
|
||||
use num_traits::Zero;
|
||||
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
|
||||
use parachains_relay::parachains_loop::{
|
||||
AvailableHeader, ParachainSyncParams, SourceClient, TargetClient,
|
||||
};
|
||||
use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient};
|
||||
use relay_substrate_client::{
|
||||
is_ancient_block, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client,
|
||||
Error as SubstrateError, HashOf, HeaderIdOf, ParachainBase,
|
||||
@@ -183,7 +181,7 @@ where
|
||||
let mut proved_parachain_block = selected_parachain_block;
|
||||
if proved_relay_block != selected_relay_block {
|
||||
proved_parachain_block = parachains_source
|
||||
.on_chain_para_head_id(proved_relay_block, para_id)
|
||||
.on_chain_para_head_id(proved_relay_block)
|
||||
.await?
|
||||
// this could happen e.g. if parachain has been offboarded?
|
||||
.ok_or_else(|| {
|
||||
@@ -209,11 +207,11 @@ where
|
||||
}
|
||||
|
||||
// and finally - prove parachain head
|
||||
let (para_proof, para_hashes) =
|
||||
parachains_source.prove_parachain_heads(proved_relay_block, &[para_id]).await?;
|
||||
let (para_proof, para_hash) =
|
||||
parachains_source.prove_parachain_head(proved_relay_block).await?;
|
||||
calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
|
||||
proved_relay_block,
|
||||
para_hashes.into_iter().map(|h| (para_id, h)).collect(),
|
||||
vec![(para_id, para_hash)],
|
||||
para_proof,
|
||||
));
|
||||
|
||||
@@ -241,16 +239,14 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
|
||||
let mut relay_state = RelayState::Idle;
|
||||
let mut required_parachain_header_number = Zero::zero();
|
||||
let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
|
||||
let required_para_header_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
|
||||
|
||||
let mut restart_relay = true;
|
||||
let parachains_relay_task = futures::future::Fuse::terminated();
|
||||
futures::pin_mut!(parachains_relay_task);
|
||||
|
||||
let mut parachains_source = ParachainsSource::<P>::new(
|
||||
source_relay_client.clone(),
|
||||
required_para_header_number_ref.clone(),
|
||||
);
|
||||
let mut parachains_source =
|
||||
ParachainsSource::<P>::new(source_relay_client.clone(), required_para_header_ref.clone());
|
||||
let mut parachains_target =
|
||||
ParachainsTarget::<P>::new(target_client.clone(), target_transaction_params.clone());
|
||||
|
||||
@@ -271,13 +267,20 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
},
|
||||
};
|
||||
|
||||
// keep in mind that we are not updating `required_para_header_number_ref` here, because
|
||||
// keep in mind that we are not updating `required_para_header_ref` here, because
|
||||
// then we'll be submitting all previous headers as well (while required relay headers are
|
||||
// delivered) and we want to avoid that (to reduce cost)
|
||||
required_parachain_header_number = std::cmp::max(
|
||||
required_parachain_header_number,
|
||||
new_required_parachain_header_number,
|
||||
);
|
||||
if new_required_parachain_header_number > required_parachain_header_number {
|
||||
log::trace!(
|
||||
target: "bridge",
|
||||
"[{}] More {} headers required. Going to sync up to the {}",
|
||||
relay_task_name,
|
||||
P::SourceParachain::NAME,
|
||||
new_required_parachain_header_number,
|
||||
);
|
||||
|
||||
required_parachain_header_number = new_required_parachain_header_number;
|
||||
}
|
||||
},
|
||||
_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
|
||||
_ = parachains_relay_task => {
|
||||
@@ -351,7 +354,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
.await;
|
||||
},
|
||||
RelayState::RelayingParaHeader(required_para_header) => {
|
||||
*required_para_header_number_ref.lock().await =
|
||||
*required_para_header_ref.lock().await =
|
||||
AvailableHeader::Available(required_para_header);
|
||||
},
|
||||
}
|
||||
@@ -379,11 +382,6 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
parachains_relay::parachains_loop::run(
|
||||
parachains_source.clone(),
|
||||
parachains_target.clone(),
|
||||
ParachainSyncParams {
|
||||
parachains: vec![P::SourceParachain::PARACHAIN_ID.into()],
|
||||
stall_timeout: std::time::Duration::from_secs(60),
|
||||
strategy: parachains_relay::parachains_loop::ParachainSyncStrategy::Any,
|
||||
},
|
||||
MetricsParams::disabled(),
|
||||
futures::future::pending(),
|
||||
)
|
||||
@@ -489,10 +487,7 @@ where
|
||||
source.client().best_finalized_header().await.map_err(map_source_err)?;
|
||||
let best_finalized_relay_block_id = best_finalized_relay_header.id();
|
||||
let para_header_at_source = source
|
||||
.on_chain_para_head_id(
|
||||
best_finalized_relay_block_id,
|
||||
P::SourceParachain::PARACHAIN_ID.into(),
|
||||
)
|
||||
.on_chain_para_head_id(best_finalized_relay_block_id)
|
||||
.await
|
||||
.map_err(map_source_err)?;
|
||||
|
||||
@@ -515,10 +510,7 @@ where
|
||||
let para_header_at_relay_header_at_target =
|
||||
if let Some(available_relay_header_at_target) = available_relay_header_at_target {
|
||||
source
|
||||
.on_chain_para_head_id(
|
||||
available_relay_header_at_target,
|
||||
P::SourceParachain::PARACHAIN_ID.into(),
|
||||
)
|
||||
.on_chain_para_head_id(available_relay_header_at_target)
|
||||
.await
|
||||
.map_err(map_source_err)?
|
||||
} else {
|
||||
@@ -669,7 +661,7 @@ impl<'a, P: SubstrateParachainsPipeline>
|
||||
&self,
|
||||
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
|
||||
self.1.on_chain_para_head_id(at_relay_block, self.parachain_id()).await
|
||||
self.1.on_chain_para_head_id(at_relay_block).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -56,7 +56,8 @@ pub struct ParachainsPipelineAdapter<P: SubstrateParachainsPipeline> {
|
||||
}
|
||||
|
||||
impl<P: SubstrateParachainsPipeline> ParachainsPipeline for ParachainsPipelineAdapter<P> {
|
||||
type SourceChain = P::SourceRelayChain;
|
||||
type SourceParachain = P::SourceParachain;
|
||||
type SourceRelayChain = P::SourceRelayChain;
|
||||
type TargetChain = P::TargetChain;
|
||||
}
|
||||
|
||||
|
||||
@@ -24,10 +24,7 @@ use bp_parachains::parachain_head_storage_key_at_source;
|
||||
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
|
||||
use bp_runtime::HeaderIdProvider;
|
||||
use codec::Decode;
|
||||
use parachains_relay::{
|
||||
parachains_loop::{AvailableHeader, SourceClient},
|
||||
parachains_loop_metrics::ParachainsLoopMetrics,
|
||||
};
|
||||
use parachains_relay::parachains_loop::{AvailableHeader, SourceClient};
|
||||
use relay_substrate_client::{
|
||||
is_ancient_block, Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, ParachainBase,
|
||||
RelayChain,
|
||||
@@ -63,8 +60,8 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
|
||||
pub async fn on_chain_para_head_id(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
|
||||
let para_id = ParaId(P::SourceParachain::PARACHAIN_ID);
|
||||
let storage_key =
|
||||
parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id);
|
||||
let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?;
|
||||
@@ -104,18 +101,7 @@ where
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<AvailableHeader<ParaHash>, Self::Error> {
|
||||
// we don't need to support many parachains now
|
||||
if para_id.0 != P::SourceParachain::PARACHAIN_ID {
|
||||
return Err(SubstrateError::Custom(format!(
|
||||
"Parachain id {} is not matching expected {}",
|
||||
para_id.0,
|
||||
P::SourceParachain::PARACHAIN_ID,
|
||||
)))
|
||||
}
|
||||
|
||||
) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error> {
|
||||
// if requested relay header is ancient, then we don't even want to try to read the
|
||||
// parachain head - we simply return `Unavailable`
|
||||
let best_block_number = self.client.best_finalized_header_number().await?;
|
||||
@@ -125,7 +111,7 @@ where
|
||||
|
||||
// else - try to read head from the source client
|
||||
let mut para_head_id = AvailableHeader::Missing;
|
||||
if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? {
|
||||
if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block).await? {
|
||||
// Never return head that is larger than requested. This way we'll never sync
|
||||
// headers past `max_header_id`.
|
||||
para_head_id = match *self.max_head_id.lock().await {
|
||||
@@ -141,26 +127,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) {
|
||||
metrics.update_best_parachain_block_at_source(para_id, para_head_id.0);
|
||||
}
|
||||
|
||||
Ok(para_head_id.map(|para_head_id| para_head_id.1))
|
||||
Ok(para_head_id)
|
||||
}
|
||||
|
||||
async fn prove_parachain_heads(
|
||||
async fn prove_parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
parachains: &[ParaId],
|
||||
) -> Result<(ParaHeadsProof, Vec<ParaHash>), Self::Error> {
|
||||
) -> Result<(ParaHeadsProof, ParaHash), Self::Error> {
|
||||
let parachain = ParaId(P::SourceParachain::PARACHAIN_ID);
|
||||
if parachains != [parachain] {
|
||||
return Err(SubstrateError::Custom(format!(
|
||||
"Trying to prove unexpected parachains {parachains:?}. Expected {parachain:?}",
|
||||
)))
|
||||
}
|
||||
|
||||
let parachain = parachains[0];
|
||||
let storage_key =
|
||||
parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, parachain);
|
||||
let parachain_heads_proof = self
|
||||
@@ -190,6 +164,6 @@ where
|
||||
})?;
|
||||
let parachain_head_hash = parachain_head.hash();
|
||||
|
||||
Ok((ParaHeadsProof(parachain_heads_proof), vec![parachain_head_hash]))
|
||||
Ok((ParaHeadsProof(parachain_heads_proof), parachain_head_hash))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,18 +24,15 @@ use crate::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bp_parachains::{BestParaHeadHash, ImportedParaHeadsKeyProvider, ParasInfoKeyProvider};
|
||||
use bp_polkadot_core::parachains::{ParaHash, ParaHeadsProof, ParaId};
|
||||
use bp_runtime::HeaderIdProvider;
|
||||
use codec::Decode;
|
||||
use parachains_relay::{
|
||||
parachains_loop::TargetClient, parachains_loop_metrics::ParachainsLoopMetrics,
|
||||
};
|
||||
use parachains_relay::parachains_loop::TargetClient;
|
||||
use relay_substrate_client::{
|
||||
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
|
||||
HeaderIdOf, ParachainBase, RelayChain, TransactionEra, TransactionTracker, UnsignedTransaction,
|
||||
AccountIdOf, AccountKeyPairOf, Chain, Client, Error as SubstrateError, HeaderIdOf,
|
||||
ParachainBase, TransactionEra, TransactionTracker, UnsignedTransaction,
|
||||
};
|
||||
use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
|
||||
use relay_utils::relay_loop::Client as RelayClient;
|
||||
use sp_core::{Bytes, Pair};
|
||||
|
||||
/// Substrate client as parachain heads source.
|
||||
@@ -92,93 +89,50 @@ where
|
||||
Ok(best_id)
|
||||
}
|
||||
|
||||
async fn best_finalized_source_block(
|
||||
async fn best_finalized_source_relay_chain_block(
|
||||
&self,
|
||||
at_block: &HeaderIdOf<P::TargetChain>,
|
||||
) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error> {
|
||||
let encoded_best_finalized_source_block = self
|
||||
.client
|
||||
.state_call(
|
||||
self.client
|
||||
.typed_state_call::<_, Option<HeaderIdOf<P::SourceRelayChain>>>(
|
||||
P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD.into(),
|
||||
Bytes(Vec::new()),
|
||||
(),
|
||||
Some(at_block.1),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Option::<HeaderId<HashOf<P::SourceRelayChain>, BlockNumberOf<P::SourceRelayChain>>>::decode(
|
||||
&mut &encoded_best_finalized_source_block.0[..],
|
||||
)
|
||||
.map_err(SubstrateError::ResponseParseFailed)?
|
||||
.map(Ok)
|
||||
.unwrap_or(Err(SubstrateError::NoParachainHeadAtTarget(
|
||||
P::SourceParachain::PARACHAIN_ID,
|
||||
P::TargetChain::NAME.into(),
|
||||
)))
|
||||
.await?
|
||||
.map(Ok)
|
||||
.unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized))
|
||||
}
|
||||
|
||||
async fn parachain_head(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::TargetChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<BestParaHeadHash>, Self::Error> {
|
||||
let best_para_head_hash: Option<BestParaHeadHash> = self
|
||||
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, Self::Error> {
|
||||
let encoded_best_finalized_source_para_block = self
|
||||
.client
|
||||
.storage_map_value::<ParasInfoKeyProvider>(
|
||||
P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME,
|
||||
¶_id,
|
||||
.state_call(
|
||||
P::SourceParachain::BEST_FINALIZED_HEADER_ID_METHOD.into(),
|
||||
Bytes(Vec::new()),
|
||||
Some(at_block.1),
|
||||
)
|
||||
.await?
|
||||
.map(|para_info| para_info.best_head_hash);
|
||||
.await?;
|
||||
|
||||
if let (Some(metrics), Some(best_para_head_hash)) = (metrics, &best_para_head_hash) {
|
||||
let imported_para_head_number = self
|
||||
.client
|
||||
.storage_double_map_value::<ImportedParaHeadsKeyProvider>(
|
||||
P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME,
|
||||
¶_id,
|
||||
&best_para_head_hash.head_hash,
|
||||
Some(at_block.1),
|
||||
)
|
||||
.await
|
||||
.and_then(|maybe_encoded_head| match maybe_encoded_head {
|
||||
Some(encoded_head) => encoded_head
|
||||
.decode_parachain_head_data::<P::SourceParachain>()
|
||||
.map(|head| head.number)
|
||||
.map(Some)
|
||||
.map_err(Self::Error::ResponseParseFailed),
|
||||
None => Ok(None),
|
||||
})
|
||||
.map_err(|e| {
|
||||
log::error!(
|
||||
target: "bridge-metrics",
|
||||
"Failed to read or decode {} parachain header at {}: {:?}. Metric will have obsolete value",
|
||||
P::SourceParachain::NAME,
|
||||
P::TargetChain::NAME,
|
||||
e,
|
||||
);
|
||||
e
|
||||
})
|
||||
.unwrap_or(None);
|
||||
if let Some(imported_para_head_number) = imported_para_head_number {
|
||||
metrics.update_best_parachain_block_at_target(para_id, imported_para_head_number);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(best_para_head_hash)
|
||||
Ok(Option::<HeaderIdOf<P::SourceParachain>>::decode(
|
||||
&mut &encoded_best_finalized_source_para_block.0[..],
|
||||
)
|
||||
.map_err(SubstrateError::ResponseParseFailed)?)
|
||||
}
|
||||
|
||||
async fn submit_parachain_heads_proof(
|
||||
async fn submit_parachain_head_proof(
|
||||
&self,
|
||||
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
updated_parachains: Vec<(ParaId, ParaHash)>,
|
||||
updated_head_hash: ParaHash,
|
||||
proof: ParaHeadsProof,
|
||||
) -> Result<Self::TransactionTracker, Self::Error> {
|
||||
let transaction_params = self.transaction_params.clone();
|
||||
let call = P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
|
||||
at_relay_block,
|
||||
updated_parachains,
|
||||
vec![(ParaId(P::SourceParachain::PARACHAIN_ID), updated_head_hash)],
|
||||
proof,
|
||||
);
|
||||
self.client
|
||||
|
||||
Reference in New Issue
Block a user