mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 01:41:03 +00:00
Always run on-demand headers relay in complex relay (#975)
* always run on-demand headers relay in complex relay * fix compilation
This commit is contained in:
committed by
Bastian Köcher
parent
cae105b55f
commit
6301ae0636
@@ -103,7 +103,6 @@ impl RelayHeaders {
|
|||||||
Finality::new(target_client.clone(), target_sign),
|
Finality::new(target_client.clone(), target_sign),
|
||||||
source_client,
|
source_client,
|
||||||
target_client,
|
target_client,
|
||||||
false,
|
|
||||||
metrics_params,
|
metrics_params,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -26,12 +26,12 @@ use sp_core::Bytes;
|
|||||||
use std::{fmt::Debug, marker::PhantomData, time::Duration};
|
use std::{fmt::Debug, marker::PhantomData, time::Duration};
|
||||||
|
|
||||||
/// Default synchronization loop timeout.
|
/// Default synchronization loop timeout.
|
||||||
const STALL_TIMEOUT: Duration = Duration::from_secs(120);
|
pub(crate) const STALL_TIMEOUT: Duration = Duration::from_secs(120);
|
||||||
/// Default limit of recent finality proofs.
|
/// Default limit of recent finality proofs.
|
||||||
///
|
///
|
||||||
/// Finality delay of 4096 blocks is unlikely to happen in practice in
|
/// Finality delay of 4096 blocks is unlikely to happen in practice in
|
||||||
/// Substrate+GRANDPA based chains (good to know).
|
/// Substrate+GRANDPA based chains (good to know).
|
||||||
const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096;
|
pub(crate) const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096;
|
||||||
|
|
||||||
/// Headers sync pipeline for Substrate <-> Substrate relays.
|
/// Headers sync pipeline for Substrate <-> Substrate relays.
|
||||||
pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline {
|
pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline {
|
||||||
@@ -119,7 +119,6 @@ pub async fn run<SourceChain, TargetChain, P>(
|
|||||||
pipeline: P,
|
pipeline: P,
|
||||||
source_client: Client<SourceChain>,
|
source_client: Client<SourceChain>,
|
||||||
target_client: Client<TargetChain>,
|
target_client: Client<TargetChain>,
|
||||||
is_on_demand_task: bool,
|
|
||||||
metrics_params: MetricsParams,
|
metrics_params: MetricsParams,
|
||||||
) -> anyhow::Result<()>
|
) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
@@ -142,10 +141,9 @@ where
|
|||||||
);
|
);
|
||||||
|
|
||||||
finality_relay::run(
|
finality_relay::run(
|
||||||
FinalitySource::new(source_client),
|
FinalitySource::new(source_client, None),
|
||||||
SubstrateFinalityTarget::new(target_client, pipeline),
|
SubstrateFinalityTarget::new(target_client, pipeline),
|
||||||
FinalitySyncParams {
|
FinalitySyncParams {
|
||||||
is_on_demand_task,
|
|
||||||
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
||||||
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
||||||
stall_timeout: STALL_TIMEOUT,
|
stall_timeout: STALL_TIMEOUT,
|
||||||
|
|||||||
@@ -239,7 +239,7 @@ where
|
|||||||
|
|
||||||
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>) {
|
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>) {
|
||||||
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
|
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
|
||||||
target_to_source_headers_relay.require_finalized_header(id);
|
target_to_source_headers_relay.require_finalized_header(id).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -226,7 +226,7 @@ where
|
|||||||
|
|
||||||
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>) {
|
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>) {
|
||||||
if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay {
|
if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay {
|
||||||
source_to_target_headers_relay.require_finalized_header(id);
|
source_to_target_headers_relay.require_finalized_header(id).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,39 +16,38 @@
|
|||||||
|
|
||||||
//! On-demand Substrate -> Substrate headers relay.
|
//! On-demand Substrate -> Substrate headers relay.
|
||||||
|
|
||||||
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
|
use crate::finality_pipeline::{
|
||||||
|
SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate, RECENT_FINALITY_PROOFS_LIMIT, STALL_TIMEOUT,
|
||||||
|
};
|
||||||
use crate::finality_target::SubstrateFinalityTarget;
|
use crate::finality_target::SubstrateFinalityTarget;
|
||||||
|
|
||||||
|
use async_std::sync::{Arc, Mutex};
|
||||||
use bp_header_chain::justification::GrandpaJustification;
|
use bp_header_chain::justification::GrandpaJustification;
|
||||||
use finality_relay::{
|
use finality_relay::{
|
||||||
FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient,
|
FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient,
|
||||||
};
|
TargetClient as FinalityTargetClient,
|
||||||
use futures::{
|
|
||||||
channel::{mpsc, oneshot},
|
|
||||||
select, FutureExt, StreamExt,
|
|
||||||
};
|
};
|
||||||
|
use futures::{select, FutureExt};
|
||||||
use num_traits::{CheckedSub, Zero};
|
use num_traits::{CheckedSub, Zero};
|
||||||
use relay_substrate_client::{
|
use relay_substrate_client::{
|
||||||
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
|
finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef},
|
||||||
SyncHeader,
|
BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader,
|
||||||
};
|
};
|
||||||
use relay_utils::{
|
use relay_utils::{
|
||||||
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId,
|
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, MaybeConnectionError,
|
||||||
MaybeConnectionError,
|
|
||||||
};
|
};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
/// On-demand Substrate <-> Substrate headers relay.
|
/// On-demand Substrate <-> Substrate headers relay.
|
||||||
///
|
///
|
||||||
/// This relay may be started by messages whenever some other relay (e.g. messages relay) needs more
|
/// This relay may be requested to sync more headers, whenever some other relay (e.g. messages relay) needs
|
||||||
/// headers to be relayed to continue its regular work. When enough headers are relayed, on-demand
|
/// it to continue its regular work. When enough headers are relayed, on-demand stops syncing headers.
|
||||||
/// relay may be deactivated.
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct OnDemandHeadersRelay<SourceChain: Chain> {
|
pub struct OnDemandHeadersRelay<SourceChain: Chain> {
|
||||||
/// Background task name.
|
/// Relay task name.
|
||||||
background_task_name: String,
|
relay_task_name: String,
|
||||||
/// Required headers to background sender.
|
/// Shared reference to maximal required finalized header number.
|
||||||
required_header_tx: mpsc::Sender<HeaderId<SourceChain::Hash, SourceChain::BlockNumber>>,
|
required_header_number: RequiredHeaderNumberRef<SourceChain>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
||||||
@@ -75,49 +74,49 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
|||||||
SubstrateFinalityTarget<TargetChain, SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>:
|
SubstrateFinalityTarget<TargetChain, SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>:
|
||||||
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
|
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
|
||||||
{
|
{
|
||||||
let (required_header_tx, required_header_rx) = mpsc::channel(1);
|
let required_header_number = Arc::new(Mutex::new(Zero::zero()));
|
||||||
|
let this = OnDemandHeadersRelay {
|
||||||
|
relay_task_name: on_demand_headers_relay_name::<SourceChain, TargetChain>(),
|
||||||
|
required_header_number: required_header_number.clone(),
|
||||||
|
};
|
||||||
async_std::task::spawn(async move {
|
async_std::task::spawn(async move {
|
||||||
background_task(
|
background_task(
|
||||||
source_client,
|
source_client,
|
||||||
target_client,
|
target_client,
|
||||||
pipeline,
|
pipeline,
|
||||||
maximal_headers_difference,
|
maximal_headers_difference,
|
||||||
required_header_rx,
|
required_header_number,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let background_task_name = format!(
|
this
|
||||||
"{}-background",
|
|
||||||
on_demand_headers_relay_name::<SourceChain, TargetChain>()
|
|
||||||
);
|
|
||||||
OnDemandHeadersRelay {
|
|
||||||
background_task_name,
|
|
||||||
required_header_tx,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Someone is asking us to relay given finalized header.
|
/// Someone is asking us to relay given finalized header.
|
||||||
pub fn require_finalized_header(&self, header_id: HeaderIdOf<SourceChain>) {
|
pub async fn require_finalized_header(&self, header_id: HeaderIdOf<SourceChain>) {
|
||||||
if let Err(error) = self.required_header_tx.clone().try_send(header_id) {
|
let mut required_header_number = self.required_header_number.lock().await;
|
||||||
log::error!(
|
if header_id.0 > *required_header_number {
|
||||||
|
log::trace!(
|
||||||
target: "bridge",
|
target: "bridge",
|
||||||
"Failed to send require header id {:?} to {:?}: {:?}",
|
"More {} headers required in {} relay. Going to sync up to the {}",
|
||||||
header_id,
|
SourceChain::NAME,
|
||||||
self.background_task_name,
|
self.relay_task_name,
|
||||||
error,
|
header_id.0,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
*required_header_number = header_id.0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Background task that is responsible for starting and stopping headers relay when required.
|
/// Background task that is responsible for starting headers relay.
|
||||||
async fn background_task<SourceChain, TargetChain, TargetSign>(
|
async fn background_task<SourceChain, TargetChain, TargetSign>(
|
||||||
source_client: Client<SourceChain>,
|
source_client: Client<SourceChain>,
|
||||||
target_client: Client<TargetChain>,
|
target_client: Client<TargetChain>,
|
||||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
maximal_headers_difference: SourceChain::BlockNumber,
|
maximal_headers_difference: SourceChain::BlockNumber,
|
||||||
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
|
required_header_number: RequiredHeaderNumberRef<SourceChain>,
|
||||||
) where
|
) where
|
||||||
SourceChain: Chain + Debug,
|
SourceChain: Chain + Debug,
|
||||||
SourceChain::BlockNumber: BlockNumberBase,
|
SourceChain::BlockNumber: BlockNumberBase,
|
||||||
@@ -138,36 +137,19 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
let mut finality_source = SubstrateFinalitySource::<
|
let mut finality_source = SubstrateFinalitySource::<
|
||||||
_,
|
_,
|
||||||
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
>::new(source_client.clone());
|
>::new(source_client.clone(), Some(required_header_number.clone()));
|
||||||
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
||||||
|
|
||||||
let mut active_headers_relay = None;
|
let mut restart_relay = true;
|
||||||
let mut required_header_number = Zero::zero();
|
let finality_relay_task = futures::future::Fuse::terminated();
|
||||||
let mut relay_exited_rx = futures::future::pending().left_future();
|
futures::pin_mut!(finality_relay_task);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// wait for next target block or for new required header
|
|
||||||
select! {
|
select! {
|
||||||
_ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
|
_ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
|
||||||
required_header_id = required_header_rx.next() => {
|
_ = finality_relay_task => {
|
||||||
match required_header_id {
|
// this should never happen in practice given the current code
|
||||||
Some(required_header_id) => {
|
restart_relay = true;
|
||||||
if required_header_id.0 > required_header_number {
|
|
||||||
required_header_number = required_header_id.0;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
// that's the only way to exit background task - to drop `required_header_tx`
|
|
||||||
break
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ = relay_exited_rx => {
|
|
||||||
// there could be a situation when we're receiving exit signals after we
|
|
||||||
// have already stopped relay or when we have already started new relay.
|
|
||||||
// but it isn't critical, because even if we'll accidentally stop new relay
|
|
||||||
// we'll restart it almost immediately
|
|
||||||
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,33 +181,84 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// start or stop headers relay if required
|
// update required header
|
||||||
let action = select_on_demand_relay_action::<SourceChain>(
|
update_required_header_number_if_too_many_are_missing::<SourceChain>(
|
||||||
best_finalized_source_header_at_source.ok(),
|
best_finalized_source_header_at_source.ok(),
|
||||||
best_finalized_source_header_at_target.ok(),
|
best_finalized_source_header_at_target.ok(),
|
||||||
required_header_number,
|
|
||||||
maximal_headers_difference,
|
maximal_headers_difference,
|
||||||
|
&required_header_number,
|
||||||
&relay_task_name,
|
&relay_task_name,
|
||||||
active_headers_relay.is_some(),
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// start/restart relay
|
||||||
|
if restart_relay {
|
||||||
|
finality_relay_task.set(
|
||||||
|
finality_relay::run(
|
||||||
|
finality_source.clone(),
|
||||||
|
finality_target.clone(),
|
||||||
|
FinalitySyncParams {
|
||||||
|
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
||||||
|
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
||||||
|
stall_timeout: STALL_TIMEOUT,
|
||||||
|
},
|
||||||
|
MetricsParams::disabled(),
|
||||||
|
futures::future::pending(),
|
||||||
|
)
|
||||||
|
.fuse(),
|
||||||
);
|
);
|
||||||
match action {
|
|
||||||
OnDemandRelayAction::Start => {
|
restart_relay = false;
|
||||||
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
|
}
|
||||||
active_headers_relay = start_on_demand_headers_relay(
|
}
|
||||||
relay_task_name.clone(),
|
}
|
||||||
relay_exited_tx,
|
|
||||||
source_client.clone(),
|
/// If there are too many source headers missing at target, we ask for syncing more headers.
|
||||||
target_client.clone(),
|
async fn update_required_header_number_if_too_many_are_missing<C: Chain>(
|
||||||
pipeline.clone(),
|
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
||||||
|
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
||||||
|
maximal_headers_difference: C::BlockNumber,
|
||||||
|
required_header_number: &RequiredHeaderNumberRef<C>,
|
||||||
|
relay_task_name: &str,
|
||||||
|
) {
|
||||||
|
let mut required_header_number = required_header_number.lock().await;
|
||||||
|
|
||||||
|
// if we have been unable to read header number from the target, then let's assume
|
||||||
|
// that it is the same as required header number. Otherwise we risk submitting
|
||||||
|
// unneeded transactions
|
||||||
|
let best_finalized_source_header_at_target =
|
||||||
|
best_finalized_source_header_at_target.unwrap_or(*required_header_number);
|
||||||
|
|
||||||
|
// if we have been unable to read header number from the source, then let's assume
|
||||||
|
// that it is the same as at the target
|
||||||
|
let best_finalized_source_header_at_source =
|
||||||
|
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
||||||
|
|
||||||
|
// if there are too many source headers missing from the target node, require some
|
||||||
|
// new headers at target
|
||||||
|
//
|
||||||
|
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
||||||
|
// headers when there are undelivered messages/confirmations. But security model of the
|
||||||
|
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
|
||||||
|
// that it'll see authorities-change header before unbonding period will end for previous
|
||||||
|
// authorities set.
|
||||||
|
let current_headers_difference = best_finalized_source_header_at_source
|
||||||
|
.checked_sub(&best_finalized_source_header_at_target)
|
||||||
|
.unwrap_or_else(Zero::zero);
|
||||||
|
if current_headers_difference > maximal_headers_difference {
|
||||||
|
// if relay is already asked to sync headers, don't log anything
|
||||||
|
if *required_header_number <= best_finalized_source_header_at_target {
|
||||||
|
log::trace!(
|
||||||
|
target: "bridge",
|
||||||
|
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
|
||||||
|
C::NAME,
|
||||||
|
relay_task_name,
|
||||||
|
best_finalized_source_header_at_source,
|
||||||
|
best_finalized_source_header_at_target,
|
||||||
|
best_finalized_source_header_at_source,
|
||||||
);
|
);
|
||||||
if active_headers_relay.is_some() {
|
|
||||||
relay_exited_rx = new_relay_exited_rx.right_future();
|
*required_header_number = best_finalized_source_header_at_source;
|
||||||
}
|
|
||||||
}
|
|
||||||
OnDemandRelayAction::Stop => {
|
|
||||||
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
|
||||||
}
|
|
||||||
OnDemandRelayAction::None => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -236,12 +269,15 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
|
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
|
||||||
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||||
relay_task_name: &str,
|
relay_task_name: &str,
|
||||||
) -> Result<SourceChain::BlockNumber, <SubstrateFinalitySource<SourceChain, P> as RelayClient>::Error>
|
) -> Result<SourceChain::BlockNumber, relay_substrate_client::Error>
|
||||||
where
|
where
|
||||||
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||||
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||||
{
|
{
|
||||||
finality_source.best_finalized_block_number().await.map_err(|error| {
|
finality_source
|
||||||
|
.on_chain_best_finalized_block_number()
|
||||||
|
.await
|
||||||
|
.map_err(|error| {
|
||||||
log::error!(
|
log::error!(
|
||||||
target: "bridge",
|
target: "bridge",
|
||||||
"Failed to read best finalized source header from source in {} relay: {:?}",
|
"Failed to read best finalized source header from source in {} relay: {:?}",
|
||||||
@@ -279,130 +315,11 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// What to do with the on-demand relay task?
|
|
||||||
#[derive(Debug, PartialEq)]
|
|
||||||
enum OnDemandRelayAction {
|
|
||||||
Start,
|
|
||||||
Stop,
|
|
||||||
None,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn select_on_demand_relay_action<C: Chain>(
|
|
||||||
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
|
||||||
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
|
||||||
mut required_source_header_at_target: C::BlockNumber,
|
|
||||||
maximal_headers_difference: C::BlockNumber,
|
|
||||||
relay_task_name: &str,
|
|
||||||
is_active: bool,
|
|
||||||
) -> OnDemandRelayAction {
|
|
||||||
// if we have been unable to read header number from the target, then let's assume
|
|
||||||
// that it is the same as required header number. Otherwise we risk submitting
|
|
||||||
// unneeded transactions
|
|
||||||
let best_finalized_source_header_at_target =
|
|
||||||
best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target);
|
|
||||||
|
|
||||||
// if we have been unable to read header number from the source, then let's assume
|
|
||||||
// that it is the same as at the target
|
|
||||||
let best_finalized_source_header_at_source =
|
|
||||||
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
|
||||||
|
|
||||||
// if there are too many source headers missing from the target node, require some
|
|
||||||
// new headers at target
|
|
||||||
//
|
|
||||||
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
|
||||||
// headers when there are undelivered messages/confirmations. But security model of the
|
|
||||||
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
|
|
||||||
// that it'll see authorities-change header before unbonding period will end for previous
|
|
||||||
// authorities set.
|
|
||||||
let current_headers_difference = best_finalized_source_header_at_source
|
|
||||||
.checked_sub(&best_finalized_source_header_at_target)
|
|
||||||
.unwrap_or_else(Zero::zero);
|
|
||||||
if current_headers_difference > maximal_headers_difference {
|
|
||||||
required_source_header_at_target = best_finalized_source_header_at_source;
|
|
||||||
|
|
||||||
// don't log if relay is already running
|
|
||||||
if !is_active {
|
|
||||||
log::trace!(
|
|
||||||
target: "bridge",
|
|
||||||
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
|
|
||||||
C::NAME,
|
|
||||||
relay_task_name,
|
|
||||||
best_finalized_source_header_at_source,
|
|
||||||
best_finalized_source_header_at_target,
|
|
||||||
best_finalized_source_header_at_source,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now let's select what to do with relay
|
|
||||||
let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target;
|
|
||||||
match (needs_to_be_active, is_active) {
|
|
||||||
(true, false) => OnDemandRelayAction::Start,
|
|
||||||
(false, true) => OnDemandRelayAction::Stop,
|
|
||||||
_ => OnDemandRelayAction::None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// On-demand headers relay task name.
|
/// On-demand headers relay task name.
|
||||||
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
||||||
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start on-demand headers relay task.
|
|
||||||
fn start_on_demand_headers_relay<SourceChain: Chain, TargetChain: Chain, TargetSign>(
|
|
||||||
task_name: String,
|
|
||||||
relay_exited_tx: oneshot::Sender<()>,
|
|
||||||
source_client: Client<SourceChain>,
|
|
||||||
target_client: Client<TargetChain>,
|
|
||||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
|
||||||
) -> Option<async_std::task::JoinHandle<()>>
|
|
||||||
where
|
|
||||||
SourceChain::BlockNumber: BlockNumberBase,
|
|
||||||
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>: SubstrateFinalitySyncPipeline<
|
|
||||||
Hash = HashOf<SourceChain>,
|
|
||||||
Number = BlockNumberOf<SourceChain>,
|
|
||||||
Header = SyncHeader<SourceChain::Header>,
|
|
||||||
FinalityProof = GrandpaJustification<SourceChain::Header>,
|
|
||||||
TargetChain = TargetChain,
|
|
||||||
>,
|
|
||||||
TargetSign: 'static,
|
|
||||||
{
|
|
||||||
let headers_relay_future =
|
|
||||||
crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled());
|
|
||||||
let closure_task_name = task_name.clone();
|
|
||||||
async_std::task::Builder::new()
|
|
||||||
.name(task_name.clone())
|
|
||||||
.spawn(async move {
|
|
||||||
log::info!(target: "bridge", "Starting {} headers relay", closure_task_name);
|
|
||||||
let result = headers_relay_future.await;
|
|
||||||
log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result);
|
|
||||||
let _ = relay_exited_tx.send(());
|
|
||||||
})
|
|
||||||
.map_err(|error| {
|
|
||||||
log::error!(
|
|
||||||
target: "bridge",
|
|
||||||
"Failed to start {} relay: {:?}",
|
|
||||||
task_name,
|
|
||||||
error,
|
|
||||||
);
|
|
||||||
})
|
|
||||||
.ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop on-demand headers relay task.
|
|
||||||
async fn stop_on_demand_headers_relay(task: Option<async_std::task::JoinHandle<()>>) {
|
|
||||||
if let Some(task) = task {
|
|
||||||
let task_name = task
|
|
||||||
.task()
|
|
||||||
.name()
|
|
||||||
.expect("on-demand tasks are always started with name; qed")
|
|
||||||
.to_string();
|
|
||||||
log::trace!(target: "bridge", "Cancelling {} headers relay", task_name);
|
|
||||||
task.cancel().await;
|
|
||||||
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -412,42 +329,17 @@ mod tests {
|
|||||||
const AT_SOURCE: Option<bp_millau::BlockNumber> = Some(10);
|
const AT_SOURCE: Option<bp_millau::BlockNumber> = Some(10);
|
||||||
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
||||||
|
|
||||||
#[test]
|
#[async_std::test]
|
||||||
fn starts_relay_when_headers_are_required() {
|
async fn updates_required_header_when_too_many_headers_missing() {
|
||||||
assert_eq!(
|
let required_header_number = Arc::new(Mutex::new(0));
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", false),
|
update_required_header_number_if_too_many_are_missing::<TestChain>(
|
||||||
OnDemandRelayAction::Start,
|
AT_SOURCE,
|
||||||
);
|
AT_TARGET,
|
||||||
|
5,
|
||||||
assert_eq!(
|
&required_header_number,
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", true),
|
"test",
|
||||||
OnDemandRelayAction::None,
|
)
|
||||||
);
|
.await;
|
||||||
}
|
assert_eq!(*required_header_number.lock().await, AT_SOURCE.unwrap());
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn starts_relay_when_too_many_headers_missing() {
|
|
||||||
assert_eq!(
|
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", false),
|
|
||||||
OnDemandRelayAction::Start,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", true),
|
|
||||||
OnDemandRelayAction::None,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stops_relay_if_required_header_is_synced() {
|
|
||||||
assert_eq!(
|
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true),
|
|
||||||
OnDemandRelayAction::Stop,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false),
|
|
||||||
OnDemandRelayAction::None,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ edition = "2018"
|
|||||||
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
|
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-std = "1.6.5"
|
async-std = { version = "1.6.5", features = ["attributes"] }
|
||||||
async-trait = "0.1.40"
|
async-trait = "0.1.40"
|
||||||
codec = { package = "parity-scale-codec", version = "2.0.0" }
|
codec = { package = "parity-scale-codec", version = "2.0.0" }
|
||||||
jsonrpsee-proc-macros = "=0.2.0-alpha.6"
|
jsonrpsee-proc-macros = "=0.2.0-alpha.6"
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ use crate::client::Client;
|
|||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::sync_header::SyncHeader;
|
use crate::sync_header::SyncHeader;
|
||||||
|
|
||||||
|
use async_std::sync::{Arc, Mutex};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bp_header_chain::justification::GrandpaJustification;
|
use bp_header_chain::justification::GrandpaJustification;
|
||||||
use codec::Decode;
|
use codec::Decode;
|
||||||
@@ -30,26 +31,41 @@ use relay_utils::relay_loop::Client as RelayClient;
|
|||||||
use sp_runtime::traits::Header as HeaderT;
|
use sp_runtime::traits::Header as HeaderT;
|
||||||
use std::{marker::PhantomData, pin::Pin};
|
use std::{marker::PhantomData, pin::Pin};
|
||||||
|
|
||||||
|
/// Shared updatable reference to the maximal header number that we want to sync from the source.
|
||||||
|
pub type RequiredHeaderNumberRef<C> = Arc<Mutex<<C as bp_runtime::Chain>::BlockNumber>>;
|
||||||
|
|
||||||
/// Substrate node as finality source.
|
/// Substrate node as finality source.
|
||||||
pub struct FinalitySource<C: Chain, P> {
|
pub struct FinalitySource<C: Chain, P> {
|
||||||
client: Client<C>,
|
client: Client<C>,
|
||||||
|
maximal_header_number: Option<RequiredHeaderNumberRef<C>>,
|
||||||
_phantom: PhantomData<P>,
|
_phantom: PhantomData<P>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C: Chain, P> FinalitySource<C, P> {
|
impl<C: Chain, P> FinalitySource<C, P> {
|
||||||
/// Create new headers source using given client.
|
/// Create new headers source using given client.
|
||||||
pub fn new(client: Client<C>) -> Self {
|
pub fn new(client: Client<C>, maximal_header_number: Option<RequiredHeaderNumberRef<C>>) -> Self {
|
||||||
FinalitySource {
|
FinalitySource {
|
||||||
client,
|
client,
|
||||||
|
maximal_header_number,
|
||||||
_phantom: Default::default(),
|
_phantom: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns best finalized block number.
|
||||||
|
pub async fn on_chain_best_finalized_block_number(&self) -> Result<C::BlockNumber, Error> {
|
||||||
|
// we **CAN** continue to relay finality proofs if source node is out of sync, because
|
||||||
|
// target node may be missing proofs that are already available at the source
|
||||||
|
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
|
||||||
|
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
|
||||||
|
Ok(*finalized_header.number())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C: Chain, P> Clone for FinalitySource<C, P> {
|
impl<C: Chain, P> Clone for FinalitySource<C, P> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
FinalitySource {
|
FinalitySource {
|
||||||
client: self.client.clone(),
|
client: self.client.clone(),
|
||||||
|
maximal_header_number: self.maximal_header_number.clone(),
|
||||||
_phantom: Default::default(),
|
_phantom: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -80,11 +96,16 @@ where
|
|||||||
type FinalityProofsStream = Pin<Box<dyn Stream<Item = GrandpaJustification<C::Header>> + Send>>;
|
type FinalityProofsStream = Pin<Box<dyn Stream<Item = GrandpaJustification<C::Header>> + Send>>;
|
||||||
|
|
||||||
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
|
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
|
||||||
// we **CAN** continue to relay finality proofs if source node is out of sync, because
|
let mut finalized_header_number = self.on_chain_best_finalized_block_number().await?;
|
||||||
// target node may be missing proofs that are already available at the source
|
// never return block number larger than requested. This way we'll never sync headers
|
||||||
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
|
// past `maximal_header_number`
|
||||||
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
|
if let Some(ref maximal_header_number) = self.maximal_header_number {
|
||||||
Ok(*finalized_header.number())
|
let maximal_header_number = *maximal_header_number.lock().await;
|
||||||
|
if finalized_header_number > maximal_header_number {
|
||||||
|
finalized_header_number = maximal_header_number;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(finalized_header_number)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn header_and_finality_proof(
|
async fn header_and_finality_proof(
|
||||||
|
|||||||
@@ -39,8 +39,6 @@ use std::{
|
|||||||
/// Finality proof synchronization loop parameters.
|
/// Finality proof synchronization loop parameters.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct FinalitySyncParams {
|
pub struct FinalitySyncParams {
|
||||||
/// If `true`, then the separate async task for running finality loop is NOT spawned.
|
|
||||||
pub is_on_demand_task: bool,
|
|
||||||
/// Interval at which we check updates on both clients. Normally should be larger than
|
/// Interval at which we check updates on both clients. Normally should be larger than
|
||||||
/// `min(source_block_time, target_block_time)`.
|
/// `min(source_block_time, target_block_time)`.
|
||||||
///
|
///
|
||||||
@@ -107,7 +105,6 @@ pub async fn run<P: FinalitySyncPipeline>(
|
|||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
let exit_signal = exit_signal.shared();
|
let exit_signal = exit_signal.shared();
|
||||||
relay_utils::relay_loop(source_client, target_client)
|
relay_utils::relay_loop(source_client, target_client)
|
||||||
.spawn_loop_task(!sync_params.is_on_demand_task)
|
|
||||||
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
|
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
|
||||||
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
|
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
|
||||||
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
|
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
|
||||||
|
|||||||
@@ -197,7 +197,6 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
|
|||||||
data: clients_data.clone(),
|
data: clients_data.clone(),
|
||||||
};
|
};
|
||||||
let sync_params = FinalitySyncParams {
|
let sync_params = FinalitySyncParams {
|
||||||
is_on_demand_task: false,
|
|
||||||
tick: Duration::from_secs(0),
|
tick: Duration::from_secs(0),
|
||||||
recent_finality_proofs_limit: 1024,
|
recent_finality_proofs_limit: 1024,
|
||||||
stall_timeout: Duration::from_secs(1),
|
stall_timeout: Duration::from_secs(1),
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ pub trait Client: 'static + Clone + Send + Sync {
|
|||||||
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
|
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
|
||||||
Loop {
|
Loop {
|
||||||
reconnect_delay: RECONNECT_DELAY,
|
reconnect_delay: RECONNECT_DELAY,
|
||||||
spawn_loop_task: true,
|
|
||||||
source_client,
|
source_client,
|
||||||
target_client,
|
target_client,
|
||||||
loop_metric: None,
|
loop_metric: None,
|
||||||
@@ -50,7 +49,6 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
|
|||||||
LoopMetrics {
|
LoopMetrics {
|
||||||
relay_loop: Loop {
|
relay_loop: Loop {
|
||||||
reconnect_delay: RECONNECT_DELAY,
|
reconnect_delay: RECONNECT_DELAY,
|
||||||
spawn_loop_task: true,
|
|
||||||
source_client: (),
|
source_client: (),
|
||||||
target_client: (),
|
target_client: (),
|
||||||
loop_metric: None,
|
loop_metric: None,
|
||||||
@@ -65,7 +63,6 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
|
|||||||
/// Generic relay loop.
|
/// Generic relay loop.
|
||||||
pub struct Loop<SC, TC, LM> {
|
pub struct Loop<SC, TC, LM> {
|
||||||
reconnect_delay: Duration,
|
reconnect_delay: Duration,
|
||||||
spawn_loop_task: bool,
|
|
||||||
source_client: SC,
|
source_client: SC,
|
||||||
target_client: TC,
|
target_client: TC,
|
||||||
loop_metric: Option<LM>,
|
loop_metric: Option<LM>,
|
||||||
@@ -87,23 +84,11 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set spawn-dedicated-loop-task flag.
|
|
||||||
///
|
|
||||||
/// If `true` (default), separate async task is spawned to run relay loop. This is the default
|
|
||||||
/// behavior for all loops. If `false`, then loop is executed as a part of the current
|
|
||||||
/// task. The `false` is used for on-demand tasks, which are cancelled from time to time
|
|
||||||
/// and there's already a dedicated on-demand task for running such loops.
|
|
||||||
pub fn spawn_loop_task(mut self, spawn_loop_task: bool) -> Self {
|
|
||||||
self.spawn_loop_task = spawn_loop_task;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Start building loop metrics using given prefix.
|
/// Start building loop metrics using given prefix.
|
||||||
pub fn with_metrics(self, prefix: Option<String>, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
|
pub fn with_metrics(self, prefix: Option<String>, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
|
||||||
LoopMetrics {
|
LoopMetrics {
|
||||||
relay_loop: Loop {
|
relay_loop: Loop {
|
||||||
reconnect_delay: self.reconnect_delay,
|
reconnect_delay: self.reconnect_delay,
|
||||||
spawn_loop_task: self.spawn_loop_task,
|
|
||||||
source_client: self.source_client,
|
source_client: self.source_client,
|
||||||
target_client: self.target_client,
|
target_client: self.target_client,
|
||||||
loop_metric: None,
|
loop_metric: None,
|
||||||
@@ -128,7 +113,6 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
|
|||||||
TC: 'static + Client,
|
TC: 'static + Client,
|
||||||
LM: 'static + Send + Clone,
|
LM: 'static + Send + Clone,
|
||||||
{
|
{
|
||||||
let spawn_loop_task = self.spawn_loop_task;
|
|
||||||
let run_loop_task = async move {
|
let run_loop_task = async move {
|
||||||
crate::initialize::initialize_loop(loop_name);
|
crate::initialize::initialize_loop(loop_name);
|
||||||
|
|
||||||
@@ -156,11 +140,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
if spawn_loop_task {
|
|
||||||
async_std::task::spawn(run_loop_task).await
|
async_std::task::spawn(run_loop_task).await
|
||||||
} else {
|
|
||||||
run_loop_task.await
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -236,7 +216,6 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
|
|||||||
|
|
||||||
Ok(Loop {
|
Ok(Loop {
|
||||||
reconnect_delay: self.relay_loop.reconnect_delay,
|
reconnect_delay: self.relay_loop.reconnect_delay,
|
||||||
spawn_loop_task: self.relay_loop.spawn_loop_task,
|
|
||||||
source_client: self.relay_loop.source_client,
|
source_client: self.relay_loop.source_client,
|
||||||
target_client: self.relay_loop.target_client,
|
target_client: self.relay_loop.target_client,
|
||||||
loop_metric: self.loop_metric,
|
loop_metric: self.loop_metric,
|
||||||
|
|||||||
Reference in New Issue
Block a user