mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 20:21:03 +00:00
Squashed 'bridges/' changes from b2099c5..23dda62 (#3369)
23dda62 Rococo <> Wococo messages relay (#1030) bcde21d Update the wasm builder to substrate master (#1029) a8318ce Make target signer optional when sending message. (#1018) f8602e1 Fix insufficient balance when send message. (#1020) d95c0a7 greedy relayer don't need message dispatch to be prepaid if dispatch is supposed to be paid at the target chain (#1016) ad5876f Update types. (#1027) 116cbbc CI: fix starting the pipeline (#1022) 7e0fadd Add temporary `canary` job (#1019) 6787091 Update types to contain dispatch_fee_payment (#1017) 03f79ad Allow Root to assume SourceAccount. (#1011) 372d019 Return dispatch_fee_payment from message details RPC (#1014) 604eb1c Relay basic single-bit message dispatch results back to the source chain (#935) bf52fff Use plain source_queue view when selecting nonces for delivery (#1010) fc5cf7d pay dispatch fee at target chain (#911) 1e35477 Bump Substrate to `286d7ce` (#1006) 7ad07b3 Add --only-mandatory-headers mode (#1004) 5351dc9 Messages relayer operating mode (#995) 9bc29a7 Rococo <> Wococo relayer balance guard (#998) bc17341 rename messages_dispatch_weight -> message_details (#996) 95be244 Bump Rococo and Wococo spec versions (#999) c35567b Move ChainWithBalances::NativeBalance -> Chain::Balance (#990) 1bfece1 Fix some nits (#988) 334ea0f Increase pause before starting relays again (#989) 7fb8248 Fix clippy in test code (#993) d60ae50 fix clippy issues (#991) 75ca813 Make sure GRANDPA shares state with RPC. (#987) da2a38a Bump Substrate (#986) 5a9862f Update submit finality proof weight formula (#981) 69df513 Flag for rejecting all outbound messages (#982) 14d0506 Add script to setup bench machine. (#984) e74e8ab Move CI from GitHub Actions to GitLab (#814) c5ca5dd Custom justification verification (#979) 643f10d Always run on-demand headers relay in complex relay (#975) a35b0ef Add JSON type definitions for Rococo<>Wococo bridge (#977) 0eb83f2 Update cargo.deny (#980) e1d1f4c Bump Rococo/Wococo spec_version (#976) deac90d increase pause before starting relays (#974) 68d6d79 Revert to use InspectCmd, bump substrate `6bef4f4` (#966) 66e1508 Avoid hashing headers twice in verify_justification (#973) a31844f Bump `environmental` dependency (#972) 2a4c29a in auto-relays keep trying to connect to nodes until connection is established (#971) 0e767b3 removed stray file (#969) b9545dc Serve multiple lanes with single complex relay instance (#964) 73419f4 Correct type error (#968) bac256f Start finality relay spec-version guards for Rococo <> Wococo finality relays (#965) bfd7037 pass source and target chain ids to account_ownership_proof (#963) 8436073 Upstream changes from Polkadot repo (#961) e58d851 Increase account endowment amount (#960) git-subtree-dir: bridges git-subtree-split: 23dda6248236b27f20d76cbedc30e189cc6f736c
This commit is contained in:
committed by
GitHub
parent
022e8bc11c
commit
feefc34567
@@ -16,39 +16,38 @@
|
||||
|
||||
//! On-demand Substrate -> Substrate headers relay.
|
||||
|
||||
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
|
||||
use crate::finality_pipeline::{
|
||||
SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate, RECENT_FINALITY_PROOFS_LIMIT, STALL_TIMEOUT,
|
||||
};
|
||||
use crate::finality_target::SubstrateFinalityTarget;
|
||||
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use bp_header_chain::justification::GrandpaJustification;
|
||||
use finality_relay::{
|
||||
FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient,
|
||||
FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient, SourceHeader,
|
||||
TargetClient as FinalityTargetClient,
|
||||
};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
select, FutureExt, StreamExt,
|
||||
};
|
||||
use num_traits::{CheckedSub, Zero};
|
||||
use futures::{select, FutureExt};
|
||||
use num_traits::{CheckedSub, One, Zero};
|
||||
use relay_substrate_client::{
|
||||
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
|
||||
SyncHeader,
|
||||
finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef},
|
||||
BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader,
|
||||
};
|
||||
use relay_utils::{
|
||||
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId,
|
||||
MaybeConnectionError,
|
||||
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, MaybeConnectionError,
|
||||
};
|
||||
use std::fmt::Debug;
|
||||
|
||||
/// On-demand Substrate <-> Substrate headers relay.
|
||||
///
|
||||
/// This relay may be started by messages whenever some other relay (e.g. messages relay) needs more
|
||||
/// headers to be relayed to continue its regular work. When enough headers are relayed, on-demand
|
||||
/// relay may be deactivated.
|
||||
/// This relay may be requested to sync more headers, whenever some other relay (e.g. messages relay) needs
|
||||
/// it to continue its regular work. When enough headers are relayed, on-demand stops syncing headers.
|
||||
#[derive(Clone)]
|
||||
pub struct OnDemandHeadersRelay<SourceChain: Chain> {
|
||||
/// Background task name.
|
||||
background_task_name: String,
|
||||
/// Required headers to background sender.
|
||||
required_header_tx: mpsc::Sender<HeaderId<SourceChain::Hash, SourceChain::BlockNumber>>,
|
||||
/// Relay task name.
|
||||
relay_task_name: String,
|
||||
/// Shared reference to maximal required finalized header number.
|
||||
required_header_number: RequiredHeaderNumberRef<SourceChain>,
|
||||
}
|
||||
|
||||
impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
||||
@@ -75,49 +74,49 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
||||
SubstrateFinalityTarget<TargetChain, SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>:
|
||||
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
|
||||
{
|
||||
let (required_header_tx, required_header_rx) = mpsc::channel(1);
|
||||
let required_header_number = Arc::new(Mutex::new(Zero::zero()));
|
||||
let this = OnDemandHeadersRelay {
|
||||
relay_task_name: on_demand_headers_relay_name::<SourceChain, TargetChain>(),
|
||||
required_header_number: required_header_number.clone(),
|
||||
};
|
||||
async_std::task::spawn(async move {
|
||||
background_task(
|
||||
source_client,
|
||||
target_client,
|
||||
pipeline,
|
||||
maximal_headers_difference,
|
||||
required_header_rx,
|
||||
required_header_number,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
let background_task_name = format!(
|
||||
"{}-background",
|
||||
on_demand_headers_relay_name::<SourceChain, TargetChain>()
|
||||
);
|
||||
OnDemandHeadersRelay {
|
||||
background_task_name,
|
||||
required_header_tx,
|
||||
}
|
||||
this
|
||||
}
|
||||
|
||||
/// Someone is asking us to relay given finalized header.
|
||||
pub fn require_finalized_header(&self, header_id: HeaderIdOf<SourceChain>) {
|
||||
if let Err(error) = self.required_header_tx.clone().try_send(header_id) {
|
||||
log::error!(
|
||||
pub async fn require_finalized_header(&self, header_id: HeaderIdOf<SourceChain>) {
|
||||
let mut required_header_number = self.required_header_number.lock().await;
|
||||
if header_id.0 > *required_header_number {
|
||||
log::trace!(
|
||||
target: "bridge",
|
||||
"Failed to send require header id {:?} to {:?}: {:?}",
|
||||
header_id,
|
||||
self.background_task_name,
|
||||
error,
|
||||
"More {} headers required in {} relay. Going to sync up to the {}",
|
||||
SourceChain::NAME,
|
||||
self.relay_task_name,
|
||||
header_id.0,
|
||||
);
|
||||
|
||||
*required_header_number = header_id.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background task that is responsible for starting and stopping headers relay when required.
|
||||
/// Background task that is responsible for starting headers relay.
|
||||
async fn background_task<SourceChain, TargetChain, TargetSign>(
|
||||
source_client: Client<SourceChain>,
|
||||
target_client: Client<TargetChain>,
|
||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||
maximal_headers_difference: SourceChain::BlockNumber,
|
||||
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
|
||||
required_header_number: RequiredHeaderNumberRef<SourceChain>,
|
||||
) where
|
||||
SourceChain: Chain + Debug,
|
||||
SourceChain::BlockNumber: BlockNumberBase,
|
||||
@@ -138,36 +137,20 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
||||
let mut finality_source = SubstrateFinalitySource::<
|
||||
_,
|
||||
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||
>::new(source_client.clone());
|
||||
>::new(source_client.clone(), Some(required_header_number.clone()));
|
||||
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
||||
let mut latest_non_mandatory_at_source = Zero::zero();
|
||||
|
||||
let mut active_headers_relay = None;
|
||||
let mut required_header_number = Zero::zero();
|
||||
let mut relay_exited_rx = futures::future::pending().left_future();
|
||||
let mut restart_relay = true;
|
||||
let finality_relay_task = futures::future::Fuse::terminated();
|
||||
futures::pin_mut!(finality_relay_task);
|
||||
|
||||
loop {
|
||||
// wait for next target block or for new required header
|
||||
select! {
|
||||
_ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
|
||||
required_header_id = required_header_rx.next() => {
|
||||
match required_header_id {
|
||||
Some(required_header_id) => {
|
||||
if required_header_id.0 > required_header_number {
|
||||
required_header_number = required_header_id.0;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// that's the only way to exit background task - to drop `required_header_tx`
|
||||
break
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = relay_exited_rx => {
|
||||
// there could be a situation when we're receiving exit signals after we
|
||||
// have already stopped relay or when we have already started new relay.
|
||||
// but it isn't critical, because even if we'll accidentally stop new relay
|
||||
// we'll restart it almost immediately
|
||||
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
||||
_ = finality_relay_task => {
|
||||
// this should never happen in practice given the current code
|
||||
restart_relay = true;
|
||||
},
|
||||
}
|
||||
|
||||
@@ -199,58 +182,187 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
||||
continue;
|
||||
}
|
||||
|
||||
// start or stop headers relay if required
|
||||
let action = select_on_demand_relay_action::<SourceChain>(
|
||||
// submit mandatory header if some headers are missing
|
||||
let best_finalized_source_header_at_target_fmt = format!("{:?}", best_finalized_source_header_at_target);
|
||||
let mandatory_scan_range = mandatory_headers_scan_range::<SourceChain>(
|
||||
best_finalized_source_header_at_source.ok(),
|
||||
best_finalized_source_header_at_target.ok(),
|
||||
required_header_number,
|
||||
maximal_headers_difference,
|
||||
&relay_task_name,
|
||||
active_headers_relay.is_some(),
|
||||
);
|
||||
match action {
|
||||
OnDemandRelayAction::Start => {
|
||||
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
|
||||
active_headers_relay = start_on_demand_headers_relay(
|
||||
relay_task_name.clone(),
|
||||
relay_exited_tx,
|
||||
source_client.clone(),
|
||||
target_client.clone(),
|
||||
pipeline.clone(),
|
||||
);
|
||||
if active_headers_relay.is_some() {
|
||||
relay_exited_rx = new_relay_exited_rx.right_future();
|
||||
&required_header_number,
|
||||
)
|
||||
.await;
|
||||
if let Some(mandatory_scan_range) = mandatory_scan_range {
|
||||
let relay_mandatory_header_result = relay_mandatory_header_from_range(
|
||||
&finality_source,
|
||||
&required_header_number,
|
||||
best_finalized_source_header_at_target_fmt,
|
||||
(
|
||||
std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source),
|
||||
mandatory_scan_range.1,
|
||||
),
|
||||
&relay_task_name,
|
||||
)
|
||||
.await;
|
||||
match relay_mandatory_header_result {
|
||||
Ok(true) => (),
|
||||
Ok(false) => {
|
||||
// there are no (or we don't need to relay them) mandatory headers in the range
|
||||
// => to avoid scanning the same headers over and over again, remember that
|
||||
latest_non_mandatory_at_source = mandatory_scan_range.1;
|
||||
}
|
||||
Err(e) => {
|
||||
if e.is_connection_error() {
|
||||
relay_utils::relay_loop::reconnect_failed_client(
|
||||
FailedClient::Source,
|
||||
relay_utils::relay_loop::RECONNECT_DELAY,
|
||||
&mut finality_source,
|
||||
&mut finality_target,
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
OnDemandRelayAction::Stop => {
|
||||
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
||||
}
|
||||
OnDemandRelayAction::None => (),
|
||||
}
|
||||
|
||||
// start/restart relay
|
||||
if restart_relay {
|
||||
finality_relay_task.set(
|
||||
finality_relay::run(
|
||||
finality_source.clone(),
|
||||
finality_target.clone(),
|
||||
FinalitySyncParams {
|
||||
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
||||
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
||||
stall_timeout: STALL_TIMEOUT,
|
||||
only_mandatory_headers: false,
|
||||
},
|
||||
MetricsParams::disabled(),
|
||||
futures::future::pending(),
|
||||
)
|
||||
.fuse(),
|
||||
);
|
||||
|
||||
restart_relay = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `Some()` with inclusive range of headers which must be scanned for manadatory headers
|
||||
/// and the first of such headers must be submitted to the target node.
|
||||
async fn mandatory_headers_scan_range<C: Chain>(
|
||||
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
||||
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
||||
maximal_headers_difference: C::BlockNumber,
|
||||
required_header_number: &RequiredHeaderNumberRef<C>,
|
||||
) -> Option<(C::BlockNumber, C::BlockNumber)> {
|
||||
let required_header_number = *required_header_number.lock().await;
|
||||
|
||||
// if we have been unable to read header number from the target, then let's assume
|
||||
// that it is the same as required header number. Otherwise we risk submitting
|
||||
// unneeded transactions
|
||||
let best_finalized_source_header_at_target =
|
||||
best_finalized_source_header_at_target.unwrap_or(required_header_number);
|
||||
|
||||
// if we have been unable to read header number from the source, then let's assume
|
||||
// that it is the same as at the target
|
||||
let best_finalized_source_header_at_source =
|
||||
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
||||
|
||||
// if there are too many source headers missing from the target node, sync mandatory
|
||||
// headers to target
|
||||
//
|
||||
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
||||
// headers when there are undelivered messages/confirmations. But security model of the
|
||||
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
|
||||
// that it'll see authorities-change header before unbonding period will end for previous
|
||||
// authorities set.
|
||||
let current_headers_difference = best_finalized_source_header_at_source
|
||||
.checked_sub(&best_finalized_source_header_at_target)
|
||||
.unwrap_or_else(Zero::zero);
|
||||
if current_headers_difference <= maximal_headers_difference {
|
||||
return None;
|
||||
}
|
||||
|
||||
// if relay is already asked to sync headers, don't do anything yet
|
||||
if required_header_number > best_finalized_source_header_at_target {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((
|
||||
best_finalized_source_header_at_target + One::one(),
|
||||
best_finalized_source_header_at_source,
|
||||
))
|
||||
}
|
||||
|
||||
/// Try to find mandatory header in the inclusive headers range and, if one is found, ask to relay it.
|
||||
///
|
||||
/// Returns `true` if header was found and (asked to be) relayed and `false` otherwise.
|
||||
async fn relay_mandatory_header_from_range<SourceChain: Chain, P>(
|
||||
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||
required_header_number: &RequiredHeaderNumberRef<SourceChain>,
|
||||
best_finalized_source_header_at_target: String,
|
||||
range: (SourceChain::BlockNumber, SourceChain::BlockNumber),
|
||||
relay_task_name: &str,
|
||||
) -> Result<bool, relay_substrate_client::Error>
|
||||
where
|
||||
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||
{
|
||||
// search for mandatory header first
|
||||
let mandatory_source_header_number = find_mandatory_header_in_range(finality_source, range).await?;
|
||||
|
||||
// if there are no mandatory headers - we have nothing to do
|
||||
let mandatory_source_header_number = match mandatory_source_header_number {
|
||||
Some(mandatory_source_header_number) => mandatory_source_header_number,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
// `find_mandatory_header` call may take a while => check if `required_header_number` is still
|
||||
// less than our `mandatory_source_header_number` before logging anything
|
||||
let mut required_header_number = required_header_number.lock().await;
|
||||
if *required_header_number >= mandatory_source_header_number {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
log::trace!(
|
||||
target: "bridge",
|
||||
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the mandatory {}",
|
||||
SourceChain::NAME,
|
||||
relay_task_name,
|
||||
best_finalized_source_header_at_target,
|
||||
range.1,
|
||||
mandatory_source_header_number,
|
||||
);
|
||||
|
||||
*required_header_number = mandatory_source_header_number;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Read best finalized source block number from source client.
|
||||
///
|
||||
/// Returns `None` if we have failed to read the number.
|
||||
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
|
||||
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||
relay_task_name: &str,
|
||||
) -> Result<SourceChain::BlockNumber, <SubstrateFinalitySource<SourceChain, P> as RelayClient>::Error>
|
||||
) -> Result<SourceChain::BlockNumber, relay_substrate_client::Error>
|
||||
where
|
||||
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||
{
|
||||
finality_source.best_finalized_block_number().await.map_err(|error| {
|
||||
log::error!(
|
||||
target: "bridge",
|
||||
"Failed to read best finalized source header from source in {} relay: {:?}",
|
||||
relay_task_name,
|
||||
error,
|
||||
);
|
||||
finality_source
|
||||
.on_chain_best_finalized_block_number()
|
||||
.await
|
||||
.map_err(|error| {
|
||||
log::error!(
|
||||
target: "bridge",
|
||||
"Failed to read best finalized source header from source in {} relay: {:?}",
|
||||
relay_task_name,
|
||||
error,
|
||||
);
|
||||
|
||||
error
|
||||
})
|
||||
error
|
||||
})
|
||||
}
|
||||
|
||||
/// Read best finalized source block number from target client.
|
||||
@@ -279,68 +391,28 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// What to do with the on-demand relay task?
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum OnDemandRelayAction {
|
||||
Start,
|
||||
Stop,
|
||||
None,
|
||||
}
|
||||
|
||||
fn select_on_demand_relay_action<C: Chain>(
|
||||
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
||||
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
||||
mut required_source_header_at_target: C::BlockNumber,
|
||||
maximal_headers_difference: C::BlockNumber,
|
||||
relay_task_name: &str,
|
||||
is_active: bool,
|
||||
) -> OnDemandRelayAction {
|
||||
// if we have been unable to read header number from the target, then let's assume
|
||||
// that it is the same as required header number. Otherwise we risk submitting
|
||||
// unneeded transactions
|
||||
let best_finalized_source_header_at_target =
|
||||
best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target);
|
||||
|
||||
// if we have been unable to read header number from the source, then let's assume
|
||||
// that it is the same as at the target
|
||||
let best_finalized_source_header_at_source =
|
||||
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
||||
|
||||
// if there are too many source headers missing from the target node, require some
|
||||
// new headers at target
|
||||
//
|
||||
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
||||
// headers when there are undelivered messages/confirmations. But security model of the
|
||||
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
|
||||
// that it'll see authorities-change header before unbonding period will end for previous
|
||||
// authorities set.
|
||||
let current_headers_difference = best_finalized_source_header_at_source
|
||||
.checked_sub(&best_finalized_source_header_at_target)
|
||||
.unwrap_or_else(Zero::zero);
|
||||
if current_headers_difference > maximal_headers_difference {
|
||||
required_source_header_at_target = best_finalized_source_header_at_source;
|
||||
|
||||
// don't log if relay is already running
|
||||
if !is_active {
|
||||
log::trace!(
|
||||
target: "bridge",
|
||||
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
|
||||
C::NAME,
|
||||
relay_task_name,
|
||||
best_finalized_source_header_at_source,
|
||||
best_finalized_source_header_at_target,
|
||||
best_finalized_source_header_at_source,
|
||||
);
|
||||
/// Read first mandatory header in given inclusive range.
|
||||
///
|
||||
/// Returns `Ok(None)` if there were no mandatory headers in the range.
|
||||
async fn find_mandatory_header_in_range<SourceChain: Chain, P>(
|
||||
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||
range: (SourceChain::BlockNumber, SourceChain::BlockNumber),
|
||||
) -> Result<Option<SourceChain::BlockNumber>, relay_substrate_client::Error>
|
||||
where
|
||||
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||
{
|
||||
let mut current = range.0;
|
||||
while current <= range.1 {
|
||||
let header: SyncHeader<SourceChain::Header> = finality_source.client().header_by_number(current).await?.into();
|
||||
if header.is_mandatory() {
|
||||
return Ok(Some(current));
|
||||
}
|
||||
|
||||
current += One::one();
|
||||
}
|
||||
|
||||
// now let's select what to do with relay
|
||||
let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target;
|
||||
match (needs_to_be_active, is_active) {
|
||||
(true, false) => OnDemandRelayAction::Start,
|
||||
(false, true) => OnDemandRelayAction::Stop,
|
||||
_ => OnDemandRelayAction::None,
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// On-demand headers relay task name.
|
||||
@@ -348,61 +420,6 @@ fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> Str
|
||||
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
||||
}
|
||||
|
||||
/// Start on-demand headers relay task.
|
||||
fn start_on_demand_headers_relay<SourceChain: Chain, TargetChain: Chain, TargetSign>(
|
||||
task_name: String,
|
||||
relay_exited_tx: oneshot::Sender<()>,
|
||||
source_client: Client<SourceChain>,
|
||||
target_client: Client<TargetChain>,
|
||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||
) -> Option<async_std::task::JoinHandle<()>>
|
||||
where
|
||||
SourceChain::BlockNumber: BlockNumberBase,
|
||||
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>: SubstrateFinalitySyncPipeline<
|
||||
Hash = HashOf<SourceChain>,
|
||||
Number = BlockNumberOf<SourceChain>,
|
||||
Header = SyncHeader<SourceChain::Header>,
|
||||
FinalityProof = GrandpaJustification<SourceChain::Header>,
|
||||
TargetChain = TargetChain,
|
||||
>,
|
||||
TargetSign: 'static,
|
||||
{
|
||||
let headers_relay_future =
|
||||
crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled());
|
||||
let closure_task_name = task_name.clone();
|
||||
async_std::task::Builder::new()
|
||||
.name(task_name.clone())
|
||||
.spawn(async move {
|
||||
log::info!(target: "bridge", "Starting {} headers relay", closure_task_name);
|
||||
let result = headers_relay_future.await;
|
||||
log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result);
|
||||
let _ = relay_exited_tx.send(());
|
||||
})
|
||||
.map_err(|error| {
|
||||
log::error!(
|
||||
target: "bridge",
|
||||
"Failed to start {} relay: {:?}",
|
||||
task_name,
|
||||
error,
|
||||
);
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
/// Stop on-demand headers relay task.
|
||||
async fn stop_on_demand_headers_relay(task: Option<async_std::task::JoinHandle<()>>) {
|
||||
if let Some(task) = task {
|
||||
let task_name = task
|
||||
.task()
|
||||
.name()
|
||||
.expect("on-demand tasks are always started with name; qed")
|
||||
.to_string();
|
||||
log::trace!(target: "bridge", "Cancelling {} headers relay", task_name);
|
||||
task.cancel().await;
|
||||
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -412,42 +429,19 @@ mod tests {
|
||||
const AT_SOURCE: Option<bp_millau::BlockNumber> = Some(10);
|
||||
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
||||
|
||||
#[test]
|
||||
fn starts_relay_when_headers_are_required() {
|
||||
#[async_std::test]
|
||||
async fn mandatory_headers_scan_range_selects_range_if_too_many_headers_are_missing() {
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", false),
|
||||
OnDemandRelayAction::Start,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", true),
|
||||
OnDemandRelayAction::None,
|
||||
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 5, &Arc::new(Mutex::new(0))).await,
|
||||
Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn starts_relay_when_too_many_headers_missing() {
|
||||
#[async_std::test]
|
||||
async fn mandatory_headers_scan_range_selects_nothing_if_enough_headers_are_relayed() {
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", false),
|
||||
OnDemandRelayAction::Start,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", true),
|
||||
OnDemandRelayAction::None,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stops_relay_if_required_header_is_synced() {
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true),
|
||||
OnDemandRelayAction::Stop,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false),
|
||||
OnDemandRelayAction::None,
|
||||
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 10, &Arc::new(Mutex::new(0))).await,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user