mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 19:51:02 +00:00
* Parachains source cosmetic changes - Make `ParaHashAtSource` more generic - Modify `on_chain_parachain_header` to return `HeaderId` - Shortening variable names Signed-off-by: Serban Iorga <serban@parity.io> * Change ParachainsSource::max_head_id type Change ParachainsSource::max_head_id to Arc<Mutex<NoopOption>> Signed-off-by: Serban Iorga <serban@parity.io> * code review changes
This commit is contained in:
committed by
Bastian Köcher
parent
dd9debed3c
commit
77af92b17b
@@ -27,7 +27,6 @@ use relay_substrate_client::{
|
||||
BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf,
|
||||
};
|
||||
use relay_utils::relay_loop::Client as RelayClient;
|
||||
use sp_runtime::traits::Header as HeaderT;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Shared updatable reference to the maximal header number that we want to sync from the source.
|
||||
@@ -76,9 +75,7 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
|
||||
) -> Result<BlockNumberOf<P::SourceChain>, Error> {
|
||||
// we **CAN** continue to relay finality proofs if source node is out of sync, because
|
||||
// target node may be missing proofs that are already available at the source
|
||||
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
|
||||
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
|
||||
Ok(*finalized_header.number())
|
||||
self.client.best_finalized_header_number().await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ use bp_runtime::HeaderIdProvider;
|
||||
use futures::{select, FutureExt};
|
||||
use num_traits::Zero;
|
||||
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
|
||||
use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient};
|
||||
use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient};
|
||||
use relay_substrate_client::{
|
||||
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
|
||||
TransactionSignScheme,
|
||||
@@ -143,7 +143,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
|
||||
let mut relay_state = RelayState::Idle;
|
||||
let mut required_parachain_header_number = Zero::zero();
|
||||
let required_para_header_number_ref = Arc::new(Mutex::new(None));
|
||||
let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
|
||||
|
||||
let mut restart_relay = true;
|
||||
let parachains_relay_task = futures::future::Fuse::terminated();
|
||||
@@ -151,7 +151,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
|
||||
let mut parachains_source = ParachainsSource::<P>::new(
|
||||
source_relay_client.clone(),
|
||||
Some(required_para_header_number_ref.clone()),
|
||||
required_para_header_number_ref.clone(),
|
||||
);
|
||||
let mut parachains_target =
|
||||
ParachainsTarget::<P>::new(target_client.clone(), target_transaction_params.clone());
|
||||
@@ -253,7 +253,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
|
||||
.await;
|
||||
},
|
||||
RelayState::RelayingParaHeader(required_para_header) => {
|
||||
*required_para_header_number_ref.lock().await = Some(required_para_header);
|
||||
*required_para_header_number_ref.lock().await =
|
||||
AvailableHeader::Available(required_para_header);
|
||||
},
|
||||
}
|
||||
|
||||
@@ -389,13 +390,9 @@ where
|
||||
source.client().best_finalized_header().await.map_err(map_source_err)?;
|
||||
let best_finalized_relay_block_id = best_finalized_relay_header.id();
|
||||
let para_header_at_source = source
|
||||
.on_chain_parachain_header(
|
||||
best_finalized_relay_block_id,
|
||||
P::SOURCE_PARACHAIN_PARA_ID.into(),
|
||||
)
|
||||
.on_chain_para_head_id(best_finalized_relay_block_id, P::SOURCE_PARACHAIN_PARA_ID.into())
|
||||
.await
|
||||
.map_err(map_source_err)?
|
||||
.map(|h| h.id());
|
||||
.map_err(map_source_err)?;
|
||||
|
||||
let relay_header_at_source = best_finalized_relay_block_id.0;
|
||||
let relay_header_at_target =
|
||||
@@ -408,10 +405,9 @@ where
|
||||
.map_err(map_target_err)?;
|
||||
|
||||
let para_header_at_relay_header_at_target = source
|
||||
.on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
|
||||
.on_chain_para_head_id(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
|
||||
.await
|
||||
.map_err(map_source_err)?
|
||||
.map(|h| h.id());
|
||||
.map_err(map_source_err)?;
|
||||
|
||||
Ok(RelayData {
|
||||
required_para_header: required_header_number,
|
||||
|
||||
@@ -22,35 +22,35 @@ use async_std::sync::{Arc, Mutex};
|
||||
use async_trait::async_trait;
|
||||
use bp_parachains::parachain_head_storage_key_at_source;
|
||||
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
|
||||
use bp_runtime::HeaderIdProvider;
|
||||
use codec::Decode;
|
||||
use parachains_relay::{
|
||||
parachains_loop::{ParaHashAtSource, SourceClient},
|
||||
parachains_loop::{AvailableHeader, SourceClient},
|
||||
parachains_loop_metrics::ParachainsLoopMetrics,
|
||||
};
|
||||
use relay_substrate_client::{
|
||||
Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
|
||||
};
|
||||
use relay_utils::relay_loop::Client as RelayClient;
|
||||
use sp_runtime::traits::Header as HeaderT;
|
||||
|
||||
/// Shared updatable reference to the maximal parachain header id that we want to sync from the
|
||||
/// source.
|
||||
pub type RequiredHeaderIdRef<C> = Arc<Mutex<Option<HeaderIdOf<C>>>>;
|
||||
pub type RequiredHeaderIdRef<C> = Arc<Mutex<AvailableHeader<HeaderIdOf<C>>>>;
|
||||
|
||||
/// Substrate client as parachain heads source.
|
||||
#[derive(Clone)]
|
||||
pub struct ParachainsSource<P: SubstrateParachainsPipeline> {
|
||||
client: Client<P::SourceRelayChain>,
|
||||
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
|
||||
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
|
||||
}
|
||||
|
||||
impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
|
||||
/// Creates new parachains source client.
|
||||
pub fn new(
|
||||
client: Client<P::SourceRelayChain>,
|
||||
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
|
||||
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
|
||||
) -> Self {
|
||||
ParachainsSource { client, maximal_header_id }
|
||||
ParachainsSource { client, max_head_id }
|
||||
}
|
||||
|
||||
/// Returns reference to the underlying RPC client.
|
||||
@@ -59,11 +59,11 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
|
||||
}
|
||||
|
||||
/// Return decoded head of given parachain.
|
||||
pub async fn on_chain_parachain_header(
|
||||
pub async fn on_chain_para_head_id(
|
||||
&self,
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
para_id: ParaId,
|
||||
) -> Result<Option<HeaderOf<P::SourceParachain>>, SubstrateError> {
|
||||
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
|
||||
let storage_key =
|
||||
parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id);
|
||||
let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?;
|
||||
@@ -72,8 +72,8 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
|
||||
Some(para_head) => para_head,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
Ok(Some(Decode::decode(&mut ¶_head.0[..])?))
|
||||
let para_head: HeaderOf<P::SourceParachain> = Decode::decode(&mut ¶_head.0[..])?;
|
||||
Ok(Some(para_head.id()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ where
|
||||
at_block: HeaderIdOf<P::SourceRelayChain>,
|
||||
metrics: Option<&ParachainsLoopMetrics>,
|
||||
para_id: ParaId,
|
||||
) -> Result<ParaHashAtSource, Self::Error> {
|
||||
) -> Result<AvailableHeader<ParaHash>, Self::Error> {
|
||||
// we don't need to support many parachains now
|
||||
if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID {
|
||||
return Err(SubstrateError::Custom(format!(
|
||||
@@ -115,44 +115,28 @@ where
|
||||
)))
|
||||
}
|
||||
|
||||
let mut para_hash_at_source = ParaHashAtSource::None;
|
||||
let mut para_header_number_at_source = None;
|
||||
match self.on_chain_parachain_header(at_block, para_id).await? {
|
||||
Some(parachain_header) => {
|
||||
para_hash_at_source = ParaHashAtSource::Some(parachain_header.hash());
|
||||
para_header_number_at_source = Some(*parachain_header.number());
|
||||
// never return head that is larger than requested. This way we'll never sync
|
||||
// headers past `maximal_header_id`
|
||||
if let Some(ref maximal_header_id) = self.maximal_header_id {
|
||||
let maximal_header_id = *maximal_header_id.lock().await;
|
||||
match maximal_header_id {
|
||||
Some(maximal_header_id)
|
||||
if *parachain_header.number() > maximal_header_id.0 =>
|
||||
{
|
||||
// we don't want this header yet => let's report previously requested
|
||||
// header
|
||||
para_hash_at_source = ParaHashAtSource::Some(maximal_header_id.1);
|
||||
para_header_number_at_source = Some(maximal_header_id.0);
|
||||
},
|
||||
Some(_) => (),
|
||||
None => {
|
||||
// on-demand relay has not yet asked us to sync anything let's do that
|
||||
para_hash_at_source = ParaHashAtSource::Unavailable;
|
||||
para_header_number_at_source = None;
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {},
|
||||
};
|
||||
|
||||
if let (Some(metrics), Some(para_header_number_at_source)) =
|
||||
(metrics, para_header_number_at_source)
|
||||
{
|
||||
metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source);
|
||||
let mut para_head_id = AvailableHeader::Missing;
|
||||
if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? {
|
||||
// Never return head that is larger than requested. This way we'll never sync
|
||||
// headers past `max_header_id`.
|
||||
para_head_id = match *self.max_head_id.lock().await {
|
||||
AvailableHeader::Unavailable => AvailableHeader::Unavailable,
|
||||
AvailableHeader::Missing => {
|
||||
// `max_header_id` is not set. There is no limit.
|
||||
AvailableHeader::Available(on_chain_para_head_id)
|
||||
},
|
||||
AvailableHeader::Available(max_head_id) => {
|
||||
// We report at most `max_header_id`.
|
||||
AvailableHeader::Available(std::cmp::min(on_chain_para_head_id, max_head_id))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Ok(para_hash_at_source)
|
||||
if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) {
|
||||
metrics.update_best_parachain_block_at_source(para_id, para_head_id.0);
|
||||
}
|
||||
|
||||
Ok(para_head_id.map(|para_head_id| para_head_id.1))
|
||||
}
|
||||
|
||||
async fn prove_parachain_heads(
|
||||
|
||||
Reference in New Issue
Block a user