From c69c682a4c6ef80a18e7a9e23bebec082145f61e Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 16 Jun 2021 22:03:03 +0300 Subject: [PATCH] Add --only-mandatory-headers mode (#1004) --- .../bin-substrate/src/cli/relay_headers.rs | 12 +- .../bin-substrate/src/finality_pipeline.rs | 2 + .../bin-substrate/src/on_demand_headers.rs | 180 ++++++++++++++---- .../client-substrate/src/finality_source.rs | 5 + bridges/relays/finality/src/finality_loop.rs | 9 +- .../finality/src/finality_loop_tests.rs | 110 +++++++++-- 6 files changed, 260 insertions(+), 58 deletions(-) diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers.rs b/bridges/relays/bin-substrate/src/cli/relay_headers.rs index 6806588288..ec521c2918 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers.rs @@ -24,6 +24,9 @@ pub struct RelayHeaders { /// A bridge instance to relay headers for. #[structopt(possible_values = &RelayHeadersBridge::variants(), case_insensitive = true)] bridge: RelayHeadersBridge, + /// If passed, only mandatory headers (headers that are changing the GRANDPA authorities set) are relayed. + #[structopt(long)] + only_mandatory_headers: bool, #[structopt(flatten)] source: SourceConnectionParams, #[structopt(flatten)] @@ -100,7 +103,14 @@ impl RelayHeaders { let finality = Finality::new(target_client.clone(), target_sign); finality.start_relay_guards(); - crate::finality_pipeline::run(finality, source_client, target_client, metrics_params).await + crate::finality_pipeline::run( + finality, + source_client, + target_client, + self.only_mandatory_headers, + metrics_params, + ) + .await }) } } diff --git a/bridges/relays/bin-substrate/src/finality_pipeline.rs b/bridges/relays/bin-substrate/src/finality_pipeline.rs index fe0004bbbd..19fa0917df 100644 --- a/bridges/relays/bin-substrate/src/finality_pipeline.rs +++ b/bridges/relays/bin-substrate/src/finality_pipeline.rs @@ -119,6 +119,7 @@ pub async fn run( pipeline: P, source_client: Client, target_client: Client, + only_mandatory_headers: bool, metrics_params: MetricsParams, ) -> anyhow::Result<()> where @@ -147,6 +148,7 @@ where tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, stall_timeout: STALL_TIMEOUT, + only_mandatory_headers, }, metrics_params, futures::future::pending(), diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs index 5730fc3869..58ef268a29 100644 --- a/bridges/relays/bin-substrate/src/on_demand_headers.rs +++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs @@ -24,11 +24,11 @@ use crate::finality_target::SubstrateFinalityTarget; use async_std::sync::{Arc, Mutex}; use bp_header_chain::justification::GrandpaJustification; use finality_relay::{ - FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient, + FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient, SourceHeader, TargetClient as FinalityTargetClient, }; use futures::{select, FutureExt}; -use num_traits::{CheckedSub, Zero}; +use num_traits::{CheckedSub, One, Zero}; use relay_substrate_client::{ finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef}, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader, @@ -139,6 +139,7 @@ async fn background_task( SubstrateFinalityToSubstrate, >::new(source_client.clone(), Some(required_header_number.clone())); let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone()); + let mut latest_non_mandatory_at_source = Zero::zero(); let mut restart_relay = true; let finality_relay_task = futures::future::Fuse::terminated(); @@ -181,15 +182,48 @@ async fn background_task( continue; } - // update required header - update_required_header_number_if_too_many_are_missing::( + // submit mandatory header if some headers are missing + let best_finalized_source_header_at_target_fmt = format!("{:?}", best_finalized_source_header_at_target); + let mandatory_scan_range = mandatory_headers_scan_range::( best_finalized_source_header_at_source.ok(), best_finalized_source_header_at_target.ok(), maximal_headers_difference, &required_header_number, - &relay_task_name, ) .await; + if let Some(mandatory_scan_range) = mandatory_scan_range { + let relay_mandatory_header_result = relay_mandatory_header_from_range( + &finality_source, + &required_header_number, + best_finalized_source_header_at_target_fmt, + ( + std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source), + mandatory_scan_range.1, + ), + &relay_task_name, + ) + .await; + match relay_mandatory_header_result { + Ok(true) => (), + Ok(false) => { + // there are no (or we don't need to relay them) mandatory headers in the range + // => to avoid scanning the same headers over and over again, remember that + latest_non_mandatory_at_source = mandatory_scan_range.1; + } + Err(e) => { + if e.is_connection_error() { + relay_utils::relay_loop::reconnect_failed_client( + FailedClient::Source, + relay_utils::relay_loop::RECONNECT_DELAY, + &mut finality_source, + &mut finality_target, + ) + .await; + continue; + } + } + } + } // start/restart relay if restart_relay { @@ -201,6 +235,7 @@ async fn background_task( tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, stall_timeout: STALL_TIMEOUT, + only_mandatory_headers: false, }, MetricsParams::disabled(), futures::future::pending(), @@ -213,29 +248,29 @@ async fn background_task( } } -/// If there are too many source headers missing at target, we ask for syncing more headers. -async fn update_required_header_number_if_too_many_are_missing( +/// Returns `Some()` with inclusive range of headers which must be scanned for manadatory headers +/// and the first of such headers must be submitted to the target node. +async fn mandatory_headers_scan_range( best_finalized_source_header_at_source: Option, best_finalized_source_header_at_target: Option, maximal_headers_difference: C::BlockNumber, required_header_number: &RequiredHeaderNumberRef, - relay_task_name: &str, -) { - let mut required_header_number = required_header_number.lock().await; +) -> Option<(C::BlockNumber, C::BlockNumber)> { + let required_header_number = *required_header_number.lock().await; // if we have been unable to read header number from the target, then let's assume // that it is the same as required header number. Otherwise we risk submitting // unneeded transactions let best_finalized_source_header_at_target = - best_finalized_source_header_at_target.unwrap_or(*required_header_number); + best_finalized_source_header_at_target.unwrap_or(required_header_number); // if we have been unable to read header number from the source, then let's assume // that it is the same as at the target let best_finalized_source_header_at_source = best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target); - // if there are too many source headers missing from the target node, require some - // new headers at target + // if there are too many source headers missing from the target node, sync mandatory + // headers to target // // why do we need that? When complex headers+messages relay is used, it'll normally only relay // headers when there are undelivered messages/confirmations. But security model of the @@ -245,22 +280,63 @@ async fn update_required_header_number_if_too_many_are_missing( let current_headers_difference = best_finalized_source_header_at_source .checked_sub(&best_finalized_source_header_at_target) .unwrap_or_else(Zero::zero); - if current_headers_difference > maximal_headers_difference { - // if relay is already asked to sync headers, don't log anything - if *required_header_number <= best_finalized_source_header_at_target { - log::trace!( - target: "bridge", - "Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}", - C::NAME, - relay_task_name, - best_finalized_source_header_at_source, - best_finalized_source_header_at_target, - best_finalized_source_header_at_source, - ); - - *required_header_number = best_finalized_source_header_at_source; - } + if current_headers_difference <= maximal_headers_difference { + return None; } + + // if relay is already asked to sync headers, don't do anything yet + if required_header_number > best_finalized_source_header_at_target { + return None; + } + + Some(( + best_finalized_source_header_at_target + One::one(), + best_finalized_source_header_at_source, + )) +} + +/// Try to find mandatory header in the inclusive headers range and, if one is found, ask to relay it. +/// +/// Returns `true` if header was found and (asked to be) relayed and `false` otherwise. +async fn relay_mandatory_header_from_range( + finality_source: &SubstrateFinalitySource, + required_header_number: &RequiredHeaderNumberRef, + best_finalized_source_header_at_target: String, + range: (SourceChain::BlockNumber, SourceChain::BlockNumber), + relay_task_name: &str, +) -> Result +where + SubstrateFinalitySource: FinalitySourceClient

, + P: FinalitySyncPipeline, +{ + // search for mandatory header first + let mandatory_source_header_number = find_mandatory_header_in_range(finality_source, range).await?; + + // if there are no mandatory headers - we have nothing to do + let mandatory_source_header_number = match mandatory_source_header_number { + Some(mandatory_source_header_number) => mandatory_source_header_number, + None => return Ok(false), + }; + + // `find_mandatory_header` call may take a while => check if `required_header_number` is still + // less than our `mandatory_source_header_number` before logging anything + let mut required_header_number = required_header_number.lock().await; + if *required_header_number >= mandatory_source_header_number { + return Ok(false); + } + + log::trace!( + target: "bridge", + "Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the mandatory {}", + SourceChain::NAME, + relay_task_name, + best_finalized_source_header_at_target, + range.1, + mandatory_source_header_number, + ); + + *required_header_number = mandatory_source_header_number; + Ok(true) } /// Read best finalized source block number from source client. @@ -315,6 +391,30 @@ where }) } +/// Read first mandatory header in given inclusive range. +/// +/// Returns `Ok(None)` if there were no mandatory headers in the range. +async fn find_mandatory_header_in_range( + finality_source: &SubstrateFinalitySource, + range: (SourceChain::BlockNumber, SourceChain::BlockNumber), +) -> Result, relay_substrate_client::Error> +where + SubstrateFinalitySource: FinalitySourceClient

, + P: FinalitySyncPipeline, +{ + let mut current = range.0; + while current <= range.1 { + let header: SyncHeader = finality_source.client().header_by_number(current).await?.into(); + if header.is_mandatory() { + return Ok(Some(current)); + } + + current += One::one(); + } + + Ok(None) +} + /// On-demand headers relay task name. fn on_demand_headers_relay_name() -> String { format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME) @@ -330,16 +430,18 @@ mod tests { const AT_TARGET: Option = Some(1); #[async_std::test] - async fn updates_required_header_when_too_many_headers_missing() { - let required_header_number = Arc::new(Mutex::new(0)); - update_required_header_number_if_too_many_are_missing::( - AT_SOURCE, - AT_TARGET, - 5, - &required_header_number, - "test", - ) - .await; - assert_eq!(*required_header_number.lock().await, AT_SOURCE.unwrap()); + async fn mandatory_headers_scan_range_selects_range_if_too_many_headers_are_missing() { + assert_eq!( + mandatory_headers_scan_range::(AT_SOURCE, AT_TARGET, 5, &Arc::new(Mutex::new(0))).await, + Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())), + ); + } + + #[async_std::test] + async fn mandatory_headers_scan_range_selects_nothing_if_enough_headers_are_relayed() { + assert_eq!( + mandatory_headers_scan_range::(AT_SOURCE, AT_TARGET, 10, &Arc::new(Mutex::new(0))).await, + None, + ); } } diff --git a/bridges/relays/client-substrate/src/finality_source.rs b/bridges/relays/client-substrate/src/finality_source.rs index 140e1f17a6..72a11ae990 100644 --- a/bridges/relays/client-substrate/src/finality_source.rs +++ b/bridges/relays/client-substrate/src/finality_source.rs @@ -51,6 +51,11 @@ impl FinalitySource { } } + /// Returns reference to the underlying RPC client. + pub fn client(&self) -> &Client { + &self.client + } + /// Returns best finalized block number. pub async fn on_chain_best_finalized_block_number(&self) -> Result { // we **CAN** continue to relay finality proofs if source node is out of sync, because diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index cce3283990..3ea729d123 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -58,6 +58,8 @@ pub struct FinalitySyncParams { pub recent_finality_proofs_limit: usize, /// Timeout before we treat our transactions as lost and restart the whole sync process. pub stall_timeout: Duration, + /// If true, only mandatory headers are relayed. + pub only_mandatory_headers: bool, } /// Source client used in finality synchronization loop. @@ -364,7 +366,7 @@ where } } -async fn select_header_to_submit( +pub(crate) async fn select_header_to_submit( source_client: &SC, target_client: &TC, finality_proofs_stream: &mut RestartableFinalityProofsStream, @@ -397,6 +399,11 @@ where .await?; let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof { SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))), + _ if sync_params.only_mandatory_headers => { + // we are not reading finality proofs from the stream, so eventually it'll break + // but we don't care about transient proofs at all, so it is acceptable + return Ok(None); + } SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => { (unjustified_headers, Some((header, finality_proof))) } diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index 72b3cdde3d..e7e0cdb39f 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -20,7 +20,8 @@ use crate::finality_loop::{ prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof, - FinalityProofs, FinalitySyncParams, SourceClient, TargetClient, + select_header_to_submit, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, + TargetClient, }; use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; @@ -165,8 +166,11 @@ impl TargetClient for TestTargetClient { } } -fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData { - let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); +fn prepare_test_clients( + exit_sender: futures::channel::mpsc::UnboundedSender<()>, + state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, + source_headers: HashMap)>, +) -> (TestSourceClient, TestTargetClient) { let internal_state_function: Arc = Arc::new(move |data| { if state_function(data) { exit_sender.unbounded_send(()).unwrap(); @@ -174,7 +178,30 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync }); let clients_data = Arc::new(Mutex::new(ClientsData { source_best_block_number: 10, - source_headers: vec![ + source_headers, + source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], + + target_best_block_number: 5, + target_headers: vec![], + })); + ( + TestSourceClient { + on_method_call: internal_state_function.clone(), + data: clients_data.clone(), + }, + TestTargetClient { + on_method_call: internal_state_function, + data: clients_data, + }, + ) +} + +fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData { + let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + state_function, + vec![ (6, (TestSourceHeader(false, 6), None)), (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))), @@ -183,25 +210,15 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync ] .into_iter() .collect(), - source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], - - target_best_block_number: 5, - target_headers: vec![], - })); - let source_client = TestSourceClient { - on_method_call: internal_state_function.clone(), - data: clients_data.clone(), - }; - let target_client = TestTargetClient { - on_method_call: internal_state_function, - data: clients_data.clone(), - }; + ); let sync_params = FinalitySyncParams { tick: Duration::from_secs(0), recent_finality_proofs_limit: 1024, stall_timeout: Duration::from_secs(1), + only_mandatory_headers: false, }; + let clients_data = source_client.data.clone(); let _ = async_std::task::block_on(run( source_client, target_client, @@ -259,6 +276,65 @@ fn finality_sync_loop_works() { ); } +fn run_only_mandatory_headers_mode_test( + only_mandatory_headers: bool, + has_mandatory_headers: bool, +) -> Option<(TestSourceHeader, TestFinalityProof)> { + let (exit_sender, _) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + |_| false, + vec![ + (6, (TestSourceHeader(false, 6), Some(TestFinalityProof(6)))), + (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), + ( + 8, + (TestSourceHeader(has_mandatory_headers, 8), Some(TestFinalityProof(8))), + ), + (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), + (10, (TestSourceHeader(false, 10), Some(TestFinalityProof(10)))), + ] + .into_iter() + .collect(), + ); + async_std::task::block_on(select_header_to_submit( + &source_client, + &target_client, + &mut RestartableFinalityProofsStream::from(futures::stream::empty().boxed()), + &mut vec![], + 10, + 5, + &FinalitySyncParams { + tick: Duration::from_secs(0), + recent_finality_proofs_limit: 0, + stall_timeout: Duration::from_secs(0), + only_mandatory_headers, + }, + )) + .unwrap() +} + +#[test] +fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required() { + assert_eq!(run_only_mandatory_headers_mode_test(true, false), None); + assert_eq!( + run_only_mandatory_headers_mode_test(false, false), + Some((TestSourceHeader(false, 10), TestFinalityProof(10))), + ); +} + +#[test] +fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() { + assert_eq!( + run_only_mandatory_headers_mode_test(true, true), + Some((TestSourceHeader(true, 8), TestFinalityProof(8))), + ); + assert_eq!( + run_only_mandatory_headers_mode_test(false, true), + Some((TestSourceHeader(true, 8), TestFinalityProof(8))), + ); +} + #[test] fn select_better_recent_finality_proof_works() { // if there are no unjustified headers, nothing is changed