From 0e34a11309823a272b7d913600d12ea496675de2 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 26 Apr 2021 12:52:30 +0300 Subject: [PATCH] Relay at least one header for every source chain session (#923) * relay at least one header for every session * Update relays/bin-substrate/src/on_demand_headers.rs Co-authored-by: Hernando Castano Co-authored-by: Hernando Castano --- .../src/cli/relay_headers_and_messages.rs | 5 + .../bin-substrate/src/on_demand_headers.rs | 229 +++++++++++++++--- 2 files changed, 207 insertions(+), 27 deletions(-) diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs index 98ff1268fa..f2238ba576 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs @@ -96,6 +96,9 @@ macro_rules! select_bridge { type LeftToRightMessages = crate::chains::millau_messages_to_rialto::MillauMessagesToRialto; type RightToLeftMessages = crate::chains::rialto_messages_to_millau::RialtoMessagesToMillau; + const MAX_MISSING_LEFT_HEADERS_AT_RIGHT: bp_millau::BlockNumber = bp_millau::SESSION_LENGTH; + const MAX_MISSING_RIGHT_HEADERS_AT_LEFT: bp_rialto::BlockNumber = bp_rialto::SESSION_LENGTH; + use crate::chains::millau_messages_to_rialto::run as left_to_right_messages; use crate::chains::rialto_messages_to_millau::run as right_to_left_messages; @@ -131,11 +134,13 @@ impl RelayHeadersAndMessages { left_client.clone(), right_client.clone(), LeftToRightFinality::new(right_client.clone(), right_sign.clone()), + MAX_MISSING_LEFT_HEADERS_AT_RIGHT, ); let right_to_left_on_demand_headers = OnDemandHeadersRelay::new( right_client.clone(), left_client.clone(), RightToLeftFinality::new(left_client.clone(), left_sign.clone()), + MAX_MISSING_RIGHT_HEADERS_AT_LEFT, ); let left_to_right_messages = left_to_right_messages(MessagesRelayParams { diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs index 4c86b6a170..d0aff845de 100644 --- a/bridges/relays/bin-substrate/src/on_demand_headers.rs +++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs @@ -20,13 +20,18 @@ use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityT use crate::finality_target::SubstrateFinalityTarget; use bp_header_chain::justification::GrandpaJustification; -use finality_relay::TargetClient as FinalityTargetClient; +use finality_relay::{ + FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient, +}; use futures::{ channel::{mpsc, oneshot}, select, FutureExt, StreamExt, }; -use num_traits::Zero; -use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader}; +use num_traits::{CheckedSub, Zero}; +use relay_substrate_client::{ + finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, + SyncHeader, +}; use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId}; use std::fmt::Debug; @@ -49,6 +54,7 @@ impl OnDemandHeadersRelay { source_client: Client, target_client: Client, pipeline: SubstrateFinalityToSubstrate, + maximal_headers_difference: SourceChain::BlockNumber, ) -> Self where SourceChain: Chain + Debug, @@ -68,7 +74,14 @@ impl OnDemandHeadersRelay { { let (required_header_tx, required_header_rx) = mpsc::channel(1); async_std::task::spawn(async move { - background_task(source_client, target_client, pipeline, required_header_rx).await; + background_task( + source_client, + target_client, + pipeline, + maximal_headers_difference, + required_header_rx, + ) + .await; }); let background_task_name = format!( @@ -100,6 +113,7 @@ async fn background_task( source_client: Client, target_client: Client, pipeline: SubstrateFinalityToSubstrate, + maximal_headers_difference: SourceChain::BlockNumber, mut required_header_rx: mpsc::Receiver>, ) where SourceChain: Chain + Debug, @@ -118,6 +132,10 @@ async fn background_task( FinalityTargetClient>, { let relay_task_name = on_demand_headers_relay_name::(); + let finality_source = SubstrateFinalitySource::< + _, + SubstrateFinalityToSubstrate, + >::new(source_client.clone()); let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone()); let mut active_headers_relay = None; @@ -150,30 +168,25 @@ async fn background_task( }, } - // read best finalized source block from target - let available_header_number = match finality_target.best_finalized_source_block_number().await { - Ok(available_header_number) => available_header_number, - Err(error) => { - log::error!( - target: "bridge", - "Failed to read best finalized {} header from {} in {} relay: {:?}", - SourceChain::NAME, - TargetChain::NAME, - relay_task_name, - error, - ); + // read best finalized source header number from source + let best_finalized_source_header_at_source = + best_finalized_source_header_at_source(&finality_source, &relay_task_name).await; - // we don't know what's happening with target client, so better to stop on-demand relay than - // submit unneeded transactions - // => assume that required header is known to the target node - required_header_number - } - }; + // read best finalized source header number from target + let best_finalized_source_header_at_target = + best_finalized_source_header_at_target::(&finality_target, &relay_task_name).await; // start or stop headers relay if required - let activate = required_header_number > available_header_number; - match (activate, active_headers_relay.is_some()) { - (true, false) => { + let action = select_on_demand_relay_action::( + best_finalized_source_header_at_source, + best_finalized_source_header_at_target, + required_header_number, + maximal_headers_difference, + &relay_task_name, + active_headers_relay.is_some(), + ); + match action { + OnDemandRelayAction::Start => { let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel(); active_headers_relay = start_on_demand_headers_relay( relay_task_name.clone(), @@ -186,14 +199,127 @@ async fn background_task( relay_exited_rx = new_relay_exited_rx.right_future(); } } - (false, true) => { + OnDemandRelayAction::Stop => { stop_on_demand_headers_relay(active_headers_relay.take()).await; } - _ => (), + OnDemandRelayAction::None => (), } } } +/// Read best finalized source block number from source client. +/// +/// Returns `None` if we have failed to read the number. +async fn best_finalized_source_header_at_source( + finality_source: &SubstrateFinalitySource, + relay_task_name: &str, +) -> Option +where + SubstrateFinalitySource: FinalitySourceClient

, + P: FinalitySyncPipeline, +{ + finality_source + .best_finalized_block_number() + .await + .map(Some) + .unwrap_or_else(|error| { + log::error!( + target: "bridge", + "Failed to read best finalized source header from source in {} relay: {:?}", + relay_task_name, + error, + ); + + None + }) +} + +/// Read best finalized source block number from target client. +/// +/// Returns `None` if we have failed to read the number. +async fn best_finalized_source_header_at_target( + finality_target: &SubstrateFinalityTarget, + relay_task_name: &str, +) -> Option +where + SubstrateFinalityTarget: FinalityTargetClient

, + P: FinalitySyncPipeline, +{ + finality_target + .best_finalized_source_block_number() + .await + .map(Some) + .unwrap_or_else(|error| { + log::error!( + target: "bridge", + "Failed to read best finalized source header from target in {} relay: {:?}", + relay_task_name, + error, + ); + + None + }) +} + +/// What to do with the on-demand relay task? +#[derive(Debug, PartialEq)] +enum OnDemandRelayAction { + Start, + Stop, + None, +} + +fn select_on_demand_relay_action( + best_finalized_source_header_at_source: Option, + best_finalized_source_header_at_target: Option, + mut required_source_header_at_target: C::BlockNumber, + maximal_headers_difference: C::BlockNumber, + relay_task_name: &str, + is_active: bool, +) -> OnDemandRelayAction { + // if we have been unable to read header number from the target, then let's assume + // that it is the same as required header number. Otherwise we risk submitting + // unneeded transactions + let best_finalized_source_header_at_target = + best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target); + + // if we have been unable to read header number from the source, then let's assume + // that it is the same as at the target + let best_finalized_source_header_at_source = + best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target); + + // if there are too many source headers missing from the target node, require some + // new headers at target + // + // why do we need that? When complex headers+messages relay is used, it'll normally only relay + // headers when there are undelivered messages/confirmations. But security model of the + // `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and + // that it'll see authorities-change header before unbonding period will end for previous + // authorities set. + let current_headers_difference = best_finalized_source_header_at_source + .checked_sub(&best_finalized_source_header_at_target) + .unwrap_or_else(Zero::zero); + if current_headers_difference > maximal_headers_difference { + log::trace!( + target: "bridge", + "Too many {} headers missing at target in {} relay. Going to sync up to the {}", + C::NAME, + relay_task_name, + best_finalized_source_header_at_source, + ); + + required_source_header_at_target = best_finalized_source_header_at_source; + } + + // now let's select what to do with relay + let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target; + match (needs_to_be_active, is_active) { + (true, false) => OnDemandRelayAction::Start, + (false, true) => OnDemandRelayAction::Stop, + _ => OnDemandRelayAction::None, + } +} + /// On-demand headers relay task name. fn on_demand_headers_relay_name() -> String { format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME) @@ -253,3 +379,52 @@ async fn stop_on_demand_headers_relay(task: Option = Some(10); + const AT_TARGET: Option = Some(1); + + #[test] + fn starts_relay_when_headers_are_required() { + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", false), + OnDemandRelayAction::Start, + ); + + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", true), + OnDemandRelayAction::None, + ); + } + + #[test] + fn starts_relay_when_too_many_headers_missing() { + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", false), + OnDemandRelayAction::Start, + ); + + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", true), + OnDemandRelayAction::None, + ); + } + + #[test] + fn stops_relay_if_required_header_is_synced() { + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true), + OnDemandRelayAction::Stop, + ); + + assert_eq!( + select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false), + OnDemandRelayAction::None, + ); + } +}