mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 02:51:01 +00:00
Relay at least one header for every source chain session (#923)
* relay at least one header for every session * Update relays/bin-substrate/src/on_demand_headers.rs Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com> Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
committed by
Bastian Köcher
parent
4916ec3432
commit
0e34a11309
@@ -96,6 +96,9 @@ macro_rules! select_bridge {
|
|||||||
type LeftToRightMessages = crate::chains::millau_messages_to_rialto::MillauMessagesToRialto;
|
type LeftToRightMessages = crate::chains::millau_messages_to_rialto::MillauMessagesToRialto;
|
||||||
type RightToLeftMessages = crate::chains::rialto_messages_to_millau::RialtoMessagesToMillau;
|
type RightToLeftMessages = crate::chains::rialto_messages_to_millau::RialtoMessagesToMillau;
|
||||||
|
|
||||||
|
const MAX_MISSING_LEFT_HEADERS_AT_RIGHT: bp_millau::BlockNumber = bp_millau::SESSION_LENGTH;
|
||||||
|
const MAX_MISSING_RIGHT_HEADERS_AT_LEFT: bp_rialto::BlockNumber = bp_rialto::SESSION_LENGTH;
|
||||||
|
|
||||||
use crate::chains::millau_messages_to_rialto::run as left_to_right_messages;
|
use crate::chains::millau_messages_to_rialto::run as left_to_right_messages;
|
||||||
use crate::chains::rialto_messages_to_millau::run as right_to_left_messages;
|
use crate::chains::rialto_messages_to_millau::run as right_to_left_messages;
|
||||||
|
|
||||||
@@ -131,11 +134,13 @@ impl RelayHeadersAndMessages {
|
|||||||
left_client.clone(),
|
left_client.clone(),
|
||||||
right_client.clone(),
|
right_client.clone(),
|
||||||
LeftToRightFinality::new(right_client.clone(), right_sign.clone()),
|
LeftToRightFinality::new(right_client.clone(), right_sign.clone()),
|
||||||
|
MAX_MISSING_LEFT_HEADERS_AT_RIGHT,
|
||||||
);
|
);
|
||||||
let right_to_left_on_demand_headers = OnDemandHeadersRelay::new(
|
let right_to_left_on_demand_headers = OnDemandHeadersRelay::new(
|
||||||
right_client.clone(),
|
right_client.clone(),
|
||||||
left_client.clone(),
|
left_client.clone(),
|
||||||
RightToLeftFinality::new(left_client.clone(), left_sign.clone()),
|
RightToLeftFinality::new(left_client.clone(), left_sign.clone()),
|
||||||
|
MAX_MISSING_RIGHT_HEADERS_AT_LEFT,
|
||||||
);
|
);
|
||||||
|
|
||||||
let left_to_right_messages = left_to_right_messages(MessagesRelayParams {
|
let left_to_right_messages = left_to_right_messages(MessagesRelayParams {
|
||||||
|
|||||||
@@ -20,13 +20,18 @@ use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityT
|
|||||||
use crate::finality_target::SubstrateFinalityTarget;
|
use crate::finality_target::SubstrateFinalityTarget;
|
||||||
|
|
||||||
use bp_header_chain::justification::GrandpaJustification;
|
use bp_header_chain::justification::GrandpaJustification;
|
||||||
use finality_relay::TargetClient as FinalityTargetClient;
|
use finality_relay::{
|
||||||
|
FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient,
|
||||||
|
};
|
||||||
use futures::{
|
use futures::{
|
||||||
channel::{mpsc, oneshot},
|
channel::{mpsc, oneshot},
|
||||||
select, FutureExt, StreamExt,
|
select, FutureExt, StreamExt,
|
||||||
};
|
};
|
||||||
use num_traits::Zero;
|
use num_traits::{CheckedSub, Zero};
|
||||||
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader};
|
use relay_substrate_client::{
|
||||||
|
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
|
||||||
|
SyncHeader,
|
||||||
|
};
|
||||||
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
|
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
@@ -49,6 +54,7 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
|||||||
source_client: Client<SourceChain>,
|
source_client: Client<SourceChain>,
|
||||||
target_client: Client<TargetChain>,
|
target_client: Client<TargetChain>,
|
||||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
|
maximal_headers_difference: SourceChain::BlockNumber,
|
||||||
) -> Self
|
) -> Self
|
||||||
where
|
where
|
||||||
SourceChain: Chain + Debug,
|
SourceChain: Chain + Debug,
|
||||||
@@ -68,7 +74,14 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
|
|||||||
{
|
{
|
||||||
let (required_header_tx, required_header_rx) = mpsc::channel(1);
|
let (required_header_tx, required_header_rx) = mpsc::channel(1);
|
||||||
async_std::task::spawn(async move {
|
async_std::task::spawn(async move {
|
||||||
background_task(source_client, target_client, pipeline, required_header_rx).await;
|
background_task(
|
||||||
|
source_client,
|
||||||
|
target_client,
|
||||||
|
pipeline,
|
||||||
|
maximal_headers_difference,
|
||||||
|
required_header_rx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let background_task_name = format!(
|
let background_task_name = format!(
|
||||||
@@ -100,6 +113,7 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
source_client: Client<SourceChain>,
|
source_client: Client<SourceChain>,
|
||||||
target_client: Client<TargetChain>,
|
target_client: Client<TargetChain>,
|
||||||
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
|
maximal_headers_difference: SourceChain::BlockNumber,
|
||||||
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
|
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
|
||||||
) where
|
) where
|
||||||
SourceChain: Chain + Debug,
|
SourceChain: Chain + Debug,
|
||||||
@@ -118,6 +132,10 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
|
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
|
||||||
{
|
{
|
||||||
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
|
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
|
||||||
|
let finality_source = SubstrateFinalitySource::<
|
||||||
|
_,
|
||||||
|
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
|
>::new(source_client.clone());
|
||||||
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
||||||
|
|
||||||
let mut active_headers_relay = None;
|
let mut active_headers_relay = None;
|
||||||
@@ -150,30 +168,25 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// read best finalized source block from target
|
// read best finalized source header number from source
|
||||||
let available_header_number = match finality_target.best_finalized_source_block_number().await {
|
let best_finalized_source_header_at_source =
|
||||||
Ok(available_header_number) => available_header_number,
|
best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
|
||||||
Err(error) => {
|
|
||||||
log::error!(
|
|
||||||
target: "bridge",
|
|
||||||
"Failed to read best finalized {} header from {} in {} relay: {:?}",
|
|
||||||
SourceChain::NAME,
|
|
||||||
TargetChain::NAME,
|
|
||||||
relay_task_name,
|
|
||||||
error,
|
|
||||||
);
|
|
||||||
|
|
||||||
// we don't know what's happening with target client, so better to stop on-demand relay than
|
// read best finalized source header number from target
|
||||||
// submit unneeded transactions
|
let best_finalized_source_header_at_target =
|
||||||
// => assume that required header is known to the target node
|
best_finalized_source_header_at_target::<SourceChain, _, _>(&finality_target, &relay_task_name).await;
|
||||||
required_header_number
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// start or stop headers relay if required
|
// start or stop headers relay if required
|
||||||
let activate = required_header_number > available_header_number;
|
let action = select_on_demand_relay_action::<SourceChain>(
|
||||||
match (activate, active_headers_relay.is_some()) {
|
best_finalized_source_header_at_source,
|
||||||
(true, false) => {
|
best_finalized_source_header_at_target,
|
||||||
|
required_header_number,
|
||||||
|
maximal_headers_difference,
|
||||||
|
&relay_task_name,
|
||||||
|
active_headers_relay.is_some(),
|
||||||
|
);
|
||||||
|
match action {
|
||||||
|
OnDemandRelayAction::Start => {
|
||||||
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
|
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
|
||||||
active_headers_relay = start_on_demand_headers_relay(
|
active_headers_relay = start_on_demand_headers_relay(
|
||||||
relay_task_name.clone(),
|
relay_task_name.clone(),
|
||||||
@@ -186,14 +199,127 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
relay_exited_rx = new_relay_exited_rx.right_future();
|
relay_exited_rx = new_relay_exited_rx.right_future();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(false, true) => {
|
OnDemandRelayAction::Stop => {
|
||||||
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
stop_on_demand_headers_relay(active_headers_relay.take()).await;
|
||||||
}
|
}
|
||||||
_ => (),
|
OnDemandRelayAction::None => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read best finalized source block number from source client.
|
||||||
|
///
|
||||||
|
/// Returns `None` if we have failed to read the number.
|
||||||
|
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
|
||||||
|
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||||
|
relay_task_name: &str,
|
||||||
|
) -> Option<SourceChain::BlockNumber>
|
||||||
|
where
|
||||||
|
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||||
|
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||||
|
{
|
||||||
|
finality_source
|
||||||
|
.best_finalized_block_number()
|
||||||
|
.await
|
||||||
|
.map(Some)
|
||||||
|
.unwrap_or_else(|error| {
|
||||||
|
log::error!(
|
||||||
|
target: "bridge",
|
||||||
|
"Failed to read best finalized source header from source in {} relay: {:?}",
|
||||||
|
relay_task_name,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read best finalized source block number from target client.
|
||||||
|
///
|
||||||
|
/// Returns `None` if we have failed to read the number.
|
||||||
|
async fn best_finalized_source_header_at_target<SourceChain: Chain, TargetChain: Chain, P>(
|
||||||
|
finality_target: &SubstrateFinalityTarget<TargetChain, P>,
|
||||||
|
relay_task_name: &str,
|
||||||
|
) -> Option<SourceChain::BlockNumber>
|
||||||
|
where
|
||||||
|
SubstrateFinalityTarget<TargetChain, P>: FinalityTargetClient<P>,
|
||||||
|
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||||
|
{
|
||||||
|
finality_target
|
||||||
|
.best_finalized_source_block_number()
|
||||||
|
.await
|
||||||
|
.map(Some)
|
||||||
|
.unwrap_or_else(|error| {
|
||||||
|
log::error!(
|
||||||
|
target: "bridge",
|
||||||
|
"Failed to read best finalized source header from target in {} relay: {:?}",
|
||||||
|
relay_task_name,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// What to do with the on-demand relay task?
|
||||||
|
#[derive(Debug, PartialEq)]
|
||||||
|
enum OnDemandRelayAction {
|
||||||
|
Start,
|
||||||
|
Stop,
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_on_demand_relay_action<C: Chain>(
|
||||||
|
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
||||||
|
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
||||||
|
mut required_source_header_at_target: C::BlockNumber,
|
||||||
|
maximal_headers_difference: C::BlockNumber,
|
||||||
|
relay_task_name: &str,
|
||||||
|
is_active: bool,
|
||||||
|
) -> OnDemandRelayAction {
|
||||||
|
// if we have been unable to read header number from the target, then let's assume
|
||||||
|
// that it is the same as required header number. Otherwise we risk submitting
|
||||||
|
// unneeded transactions
|
||||||
|
let best_finalized_source_header_at_target =
|
||||||
|
best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target);
|
||||||
|
|
||||||
|
// if we have been unable to read header number from the source, then let's assume
|
||||||
|
// that it is the same as at the target
|
||||||
|
let best_finalized_source_header_at_source =
|
||||||
|
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
||||||
|
|
||||||
|
// if there are too many source headers missing from the target node, require some
|
||||||
|
// new headers at target
|
||||||
|
//
|
||||||
|
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
||||||
|
// headers when there are undelivered messages/confirmations. But security model of the
|
||||||
|
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
|
||||||
|
// that it'll see authorities-change header before unbonding period will end for previous
|
||||||
|
// authorities set.
|
||||||
|
let current_headers_difference = best_finalized_source_header_at_source
|
||||||
|
.checked_sub(&best_finalized_source_header_at_target)
|
||||||
|
.unwrap_or_else(Zero::zero);
|
||||||
|
if current_headers_difference > maximal_headers_difference {
|
||||||
|
log::trace!(
|
||||||
|
target: "bridge",
|
||||||
|
"Too many {} headers missing at target in {} relay. Going to sync up to the {}",
|
||||||
|
C::NAME,
|
||||||
|
relay_task_name,
|
||||||
|
best_finalized_source_header_at_source,
|
||||||
|
);
|
||||||
|
|
||||||
|
required_source_header_at_target = best_finalized_source_header_at_source;
|
||||||
|
}
|
||||||
|
|
||||||
|
// now let's select what to do with relay
|
||||||
|
let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target;
|
||||||
|
match (needs_to_be_active, is_active) {
|
||||||
|
(true, false) => OnDemandRelayAction::Start,
|
||||||
|
(false, true) => OnDemandRelayAction::Stop,
|
||||||
|
_ => OnDemandRelayAction::None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// On-demand headers relay task name.
|
/// On-demand headers relay task name.
|
||||||
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
||||||
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
||||||
@@ -253,3 +379,52 @@ async fn stop_on_demand_headers_relay(task: Option<async_std::task::JoinHandle<(
|
|||||||
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
|
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
type TestChain = relay_millau_client::Millau;
|
||||||
|
|
||||||
|
const AT_SOURCE: Option<bp_millau::BlockNumber> = Some(10);
|
||||||
|
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn starts_relay_when_headers_are_required() {
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", false),
|
||||||
|
OnDemandRelayAction::Start,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", true),
|
||||||
|
OnDemandRelayAction::None,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn starts_relay_when_too_many_headers_missing() {
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", false),
|
||||||
|
OnDemandRelayAction::Start,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", true),
|
||||||
|
OnDemandRelayAction::None,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stops_relay_if_required_header_is_synced() {
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true),
|
||||||
|
OnDemandRelayAction::Stop,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false),
|
||||||
|
OnDemandRelayAction::None,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user