mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 21:01:02 +00:00
Add --only-mandatory-headers mode (#1004)
This commit is contained in:
committed by
Bastian Köcher
parent
09df16612b
commit
c69c682a4c
@@ -24,6 +24,9 @@ pub struct RelayHeaders {
|
|||||||
/// A bridge instance to relay headers for.
|
/// A bridge instance to relay headers for.
|
||||||
#[structopt(possible_values = &RelayHeadersBridge::variants(), case_insensitive = true)]
|
#[structopt(possible_values = &RelayHeadersBridge::variants(), case_insensitive = true)]
|
||||||
bridge: RelayHeadersBridge,
|
bridge: RelayHeadersBridge,
|
||||||
|
/// If passed, only mandatory headers (headers that are changing the GRANDPA authorities set) are relayed.
|
||||||
|
#[structopt(long)]
|
||||||
|
only_mandatory_headers: bool,
|
||||||
#[structopt(flatten)]
|
#[structopt(flatten)]
|
||||||
source: SourceConnectionParams,
|
source: SourceConnectionParams,
|
||||||
#[structopt(flatten)]
|
#[structopt(flatten)]
|
||||||
@@ -100,7 +103,14 @@ impl RelayHeaders {
|
|||||||
let finality = Finality::new(target_client.clone(), target_sign);
|
let finality = Finality::new(target_client.clone(), target_sign);
|
||||||
finality.start_relay_guards();
|
finality.start_relay_guards();
|
||||||
|
|
||||||
crate::finality_pipeline::run(finality, source_client, target_client, metrics_params).await
|
crate::finality_pipeline::run(
|
||||||
|
finality,
|
||||||
|
source_client,
|
||||||
|
target_client,
|
||||||
|
self.only_mandatory_headers,
|
||||||
|
metrics_params,
|
||||||
|
)
|
||||||
|
.await
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -119,6 +119,7 @@ pub async fn run<SourceChain, TargetChain, P>(
|
|||||||
pipeline: P,
|
pipeline: P,
|
||||||
source_client: Client<SourceChain>,
|
source_client: Client<SourceChain>,
|
||||||
target_client: Client<TargetChain>,
|
target_client: Client<TargetChain>,
|
||||||
|
only_mandatory_headers: bool,
|
||||||
metrics_params: MetricsParams,
|
metrics_params: MetricsParams,
|
||||||
) -> anyhow::Result<()>
|
) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
@@ -147,6 +148,7 @@ where
|
|||||||
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
||||||
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
||||||
stall_timeout: STALL_TIMEOUT,
|
stall_timeout: STALL_TIMEOUT,
|
||||||
|
only_mandatory_headers,
|
||||||
},
|
},
|
||||||
metrics_params,
|
metrics_params,
|
||||||
futures::future::pending(),
|
futures::future::pending(),
|
||||||
|
|||||||
@@ -24,11 +24,11 @@ use crate::finality_target::SubstrateFinalityTarget;
|
|||||||
use async_std::sync::{Arc, Mutex};
|
use async_std::sync::{Arc, Mutex};
|
||||||
use bp_header_chain::justification::GrandpaJustification;
|
use bp_header_chain::justification::GrandpaJustification;
|
||||||
use finality_relay::{
|
use finality_relay::{
|
||||||
FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient,
|
FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient, SourceHeader,
|
||||||
TargetClient as FinalityTargetClient,
|
TargetClient as FinalityTargetClient,
|
||||||
};
|
};
|
||||||
use futures::{select, FutureExt};
|
use futures::{select, FutureExt};
|
||||||
use num_traits::{CheckedSub, Zero};
|
use num_traits::{CheckedSub, One, Zero};
|
||||||
use relay_substrate_client::{
|
use relay_substrate_client::{
|
||||||
finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef},
|
finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef},
|
||||||
BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader,
|
BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader,
|
||||||
@@ -139,6 +139,7 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
|
||||||
>::new(source_client.clone(), Some(required_header_number.clone()));
|
>::new(source_client.clone(), Some(required_header_number.clone()));
|
||||||
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
|
||||||
|
let mut latest_non_mandatory_at_source = Zero::zero();
|
||||||
|
|
||||||
let mut restart_relay = true;
|
let mut restart_relay = true;
|
||||||
let finality_relay_task = futures::future::Fuse::terminated();
|
let finality_relay_task = futures::future::Fuse::terminated();
|
||||||
@@ -181,15 +182,48 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update required header
|
// submit mandatory header if some headers are missing
|
||||||
update_required_header_number_if_too_many_are_missing::<SourceChain>(
|
let best_finalized_source_header_at_target_fmt = format!("{:?}", best_finalized_source_header_at_target);
|
||||||
|
let mandatory_scan_range = mandatory_headers_scan_range::<SourceChain>(
|
||||||
best_finalized_source_header_at_source.ok(),
|
best_finalized_source_header_at_source.ok(),
|
||||||
best_finalized_source_header_at_target.ok(),
|
best_finalized_source_header_at_target.ok(),
|
||||||
maximal_headers_difference,
|
maximal_headers_difference,
|
||||||
&required_header_number,
|
&required_header_number,
|
||||||
&relay_task_name,
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
if let Some(mandatory_scan_range) = mandatory_scan_range {
|
||||||
|
let relay_mandatory_header_result = relay_mandatory_header_from_range(
|
||||||
|
&finality_source,
|
||||||
|
&required_header_number,
|
||||||
|
best_finalized_source_header_at_target_fmt,
|
||||||
|
(
|
||||||
|
std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source),
|
||||||
|
mandatory_scan_range.1,
|
||||||
|
),
|
||||||
|
&relay_task_name,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match relay_mandatory_header_result {
|
||||||
|
Ok(true) => (),
|
||||||
|
Ok(false) => {
|
||||||
|
// there are no (or we don't need to relay them) mandatory headers in the range
|
||||||
|
// => to avoid scanning the same headers over and over again, remember that
|
||||||
|
latest_non_mandatory_at_source = mandatory_scan_range.1;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if e.is_connection_error() {
|
||||||
|
relay_utils::relay_loop::reconnect_failed_client(
|
||||||
|
FailedClient::Source,
|
||||||
|
relay_utils::relay_loop::RECONNECT_DELAY,
|
||||||
|
&mut finality_source,
|
||||||
|
&mut finality_target,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// start/restart relay
|
// start/restart relay
|
||||||
if restart_relay {
|
if restart_relay {
|
||||||
@@ -201,6 +235,7 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
|
||||||
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
|
||||||
stall_timeout: STALL_TIMEOUT,
|
stall_timeout: STALL_TIMEOUT,
|
||||||
|
only_mandatory_headers: false,
|
||||||
},
|
},
|
||||||
MetricsParams::disabled(),
|
MetricsParams::disabled(),
|
||||||
futures::future::pending(),
|
futures::future::pending(),
|
||||||
@@ -213,29 +248,29 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If there are too many source headers missing at target, we ask for syncing more headers.
|
/// Returns `Some()` with inclusive range of headers which must be scanned for manadatory headers
|
||||||
async fn update_required_header_number_if_too_many_are_missing<C: Chain>(
|
/// and the first of such headers must be submitted to the target node.
|
||||||
|
async fn mandatory_headers_scan_range<C: Chain>(
|
||||||
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
best_finalized_source_header_at_source: Option<C::BlockNumber>,
|
||||||
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
best_finalized_source_header_at_target: Option<C::BlockNumber>,
|
||||||
maximal_headers_difference: C::BlockNumber,
|
maximal_headers_difference: C::BlockNumber,
|
||||||
required_header_number: &RequiredHeaderNumberRef<C>,
|
required_header_number: &RequiredHeaderNumberRef<C>,
|
||||||
relay_task_name: &str,
|
) -> Option<(C::BlockNumber, C::BlockNumber)> {
|
||||||
) {
|
let required_header_number = *required_header_number.lock().await;
|
||||||
let mut required_header_number = required_header_number.lock().await;
|
|
||||||
|
|
||||||
// if we have been unable to read header number from the target, then let's assume
|
// if we have been unable to read header number from the target, then let's assume
|
||||||
// that it is the same as required header number. Otherwise we risk submitting
|
// that it is the same as required header number. Otherwise we risk submitting
|
||||||
// unneeded transactions
|
// unneeded transactions
|
||||||
let best_finalized_source_header_at_target =
|
let best_finalized_source_header_at_target =
|
||||||
best_finalized_source_header_at_target.unwrap_or(*required_header_number);
|
best_finalized_source_header_at_target.unwrap_or(required_header_number);
|
||||||
|
|
||||||
// if we have been unable to read header number from the source, then let's assume
|
// if we have been unable to read header number from the source, then let's assume
|
||||||
// that it is the same as at the target
|
// that it is the same as at the target
|
||||||
let best_finalized_source_header_at_source =
|
let best_finalized_source_header_at_source =
|
||||||
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
|
||||||
|
|
||||||
// if there are too many source headers missing from the target node, require some
|
// if there are too many source headers missing from the target node, sync mandatory
|
||||||
// new headers at target
|
// headers to target
|
||||||
//
|
//
|
||||||
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
|
||||||
// headers when there are undelivered messages/confirmations. But security model of the
|
// headers when there are undelivered messages/confirmations. But security model of the
|
||||||
@@ -245,22 +280,63 @@ async fn update_required_header_number_if_too_many_are_missing<C: Chain>(
|
|||||||
let current_headers_difference = best_finalized_source_header_at_source
|
let current_headers_difference = best_finalized_source_header_at_source
|
||||||
.checked_sub(&best_finalized_source_header_at_target)
|
.checked_sub(&best_finalized_source_header_at_target)
|
||||||
.unwrap_or_else(Zero::zero);
|
.unwrap_or_else(Zero::zero);
|
||||||
if current_headers_difference > maximal_headers_difference {
|
if current_headers_difference <= maximal_headers_difference {
|
||||||
// if relay is already asked to sync headers, don't log anything
|
return None;
|
||||||
if *required_header_number <= best_finalized_source_header_at_target {
|
|
||||||
log::trace!(
|
|
||||||
target: "bridge",
|
|
||||||
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
|
|
||||||
C::NAME,
|
|
||||||
relay_task_name,
|
|
||||||
best_finalized_source_header_at_source,
|
|
||||||
best_finalized_source_header_at_target,
|
|
||||||
best_finalized_source_header_at_source,
|
|
||||||
);
|
|
||||||
|
|
||||||
*required_header_number = best_finalized_source_header_at_source;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if relay is already asked to sync headers, don't do anything yet
|
||||||
|
if required_header_number > best_finalized_source_header_at_target {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some((
|
||||||
|
best_finalized_source_header_at_target + One::one(),
|
||||||
|
best_finalized_source_header_at_source,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to find mandatory header in the inclusive headers range and, if one is found, ask to relay it.
|
||||||
|
///
|
||||||
|
/// Returns `true` if header was found and (asked to be) relayed and `false` otherwise.
|
||||||
|
async fn relay_mandatory_header_from_range<SourceChain: Chain, P>(
|
||||||
|
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||||
|
required_header_number: &RequiredHeaderNumberRef<SourceChain>,
|
||||||
|
best_finalized_source_header_at_target: String,
|
||||||
|
range: (SourceChain::BlockNumber, SourceChain::BlockNumber),
|
||||||
|
relay_task_name: &str,
|
||||||
|
) -> Result<bool, relay_substrate_client::Error>
|
||||||
|
where
|
||||||
|
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||||
|
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||||
|
{
|
||||||
|
// search for mandatory header first
|
||||||
|
let mandatory_source_header_number = find_mandatory_header_in_range(finality_source, range).await?;
|
||||||
|
|
||||||
|
// if there are no mandatory headers - we have nothing to do
|
||||||
|
let mandatory_source_header_number = match mandatory_source_header_number {
|
||||||
|
Some(mandatory_source_header_number) => mandatory_source_header_number,
|
||||||
|
None => return Ok(false),
|
||||||
|
};
|
||||||
|
|
||||||
|
// `find_mandatory_header` call may take a while => check if `required_header_number` is still
|
||||||
|
// less than our `mandatory_source_header_number` before logging anything
|
||||||
|
let mut required_header_number = required_header_number.lock().await;
|
||||||
|
if *required_header_number >= mandatory_source_header_number {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
log::trace!(
|
||||||
|
target: "bridge",
|
||||||
|
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the mandatory {}",
|
||||||
|
SourceChain::NAME,
|
||||||
|
relay_task_name,
|
||||||
|
best_finalized_source_header_at_target,
|
||||||
|
range.1,
|
||||||
|
mandatory_source_header_number,
|
||||||
|
);
|
||||||
|
|
||||||
|
*required_header_number = mandatory_source_header_number;
|
||||||
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read best finalized source block number from source client.
|
/// Read best finalized source block number from source client.
|
||||||
@@ -315,6 +391,30 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read first mandatory header in given inclusive range.
|
||||||
|
///
|
||||||
|
/// Returns `Ok(None)` if there were no mandatory headers in the range.
|
||||||
|
async fn find_mandatory_header_in_range<SourceChain: Chain, P>(
|
||||||
|
finality_source: &SubstrateFinalitySource<SourceChain, P>,
|
||||||
|
range: (SourceChain::BlockNumber, SourceChain::BlockNumber),
|
||||||
|
) -> Result<Option<SourceChain::BlockNumber>, relay_substrate_client::Error>
|
||||||
|
where
|
||||||
|
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
|
||||||
|
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
|
||||||
|
{
|
||||||
|
let mut current = range.0;
|
||||||
|
while current <= range.1 {
|
||||||
|
let header: SyncHeader<SourceChain::Header> = finality_source.client().header_by_number(current).await?.into();
|
||||||
|
if header.is_mandatory() {
|
||||||
|
return Ok(Some(current));
|
||||||
|
}
|
||||||
|
|
||||||
|
current += One::one();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
/// On-demand headers relay task name.
|
/// On-demand headers relay task name.
|
||||||
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
|
||||||
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
|
||||||
@@ -330,16 +430,18 @@ mod tests {
|
|||||||
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
|
||||||
|
|
||||||
#[async_std::test]
|
#[async_std::test]
|
||||||
async fn updates_required_header_when_too_many_headers_missing() {
|
async fn mandatory_headers_scan_range_selects_range_if_too_many_headers_are_missing() {
|
||||||
let required_header_number = Arc::new(Mutex::new(0));
|
assert_eq!(
|
||||||
update_required_header_number_if_too_many_are_missing::<TestChain>(
|
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 5, &Arc::new(Mutex::new(0))).await,
|
||||||
AT_SOURCE,
|
Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())),
|
||||||
AT_TARGET,
|
);
|
||||||
5,
|
}
|
||||||
&required_header_number,
|
|
||||||
"test",
|
#[async_std::test]
|
||||||
)
|
async fn mandatory_headers_scan_range_selects_nothing_if_enough_headers_are_relayed() {
|
||||||
.await;
|
assert_eq!(
|
||||||
assert_eq!(*required_header_number.lock().await, AT_SOURCE.unwrap());
|
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 10, &Arc::new(Mutex::new(0))).await,
|
||||||
|
None,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,6 +51,11 @@ impl<C: Chain, P> FinalitySource<C, P> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns reference to the underlying RPC client.
|
||||||
|
pub fn client(&self) -> &Client<C> {
|
||||||
|
&self.client
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns best finalized block number.
|
/// Returns best finalized block number.
|
||||||
pub async fn on_chain_best_finalized_block_number(&self) -> Result<C::BlockNumber, Error> {
|
pub async fn on_chain_best_finalized_block_number(&self) -> Result<C::BlockNumber, Error> {
|
||||||
// we **CAN** continue to relay finality proofs if source node is out of sync, because
|
// we **CAN** continue to relay finality proofs if source node is out of sync, because
|
||||||
|
|||||||
@@ -58,6 +58,8 @@ pub struct FinalitySyncParams {
|
|||||||
pub recent_finality_proofs_limit: usize,
|
pub recent_finality_proofs_limit: usize,
|
||||||
/// Timeout before we treat our transactions as lost and restart the whole sync process.
|
/// Timeout before we treat our transactions as lost and restart the whole sync process.
|
||||||
pub stall_timeout: Duration,
|
pub stall_timeout: Duration,
|
||||||
|
/// If true, only mandatory headers are relayed.
|
||||||
|
pub only_mandatory_headers: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Source client used in finality synchronization loop.
|
/// Source client used in finality synchronization loop.
|
||||||
@@ -364,7 +366,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn select_header_to_submit<P, SC, TC>(
|
pub(crate) async fn select_header_to_submit<P, SC, TC>(
|
||||||
source_client: &SC,
|
source_client: &SC,
|
||||||
target_client: &TC,
|
target_client: &TC,
|
||||||
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
|
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
|
||||||
@@ -397,6 +399,11 @@ where
|
|||||||
.await?;
|
.await?;
|
||||||
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
|
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
|
||||||
SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
|
SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
|
||||||
|
_ if sync_params.only_mandatory_headers => {
|
||||||
|
// we are not reading finality proofs from the stream, so eventually it'll break
|
||||||
|
// but we don't care about transient proofs at all, so it is acceptable
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
|
SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
|
||||||
(unjustified_headers, Some((header, finality_proof)))
|
(unjustified_headers, Some((header, finality_proof)))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,8 @@
|
|||||||
|
|
||||||
use crate::finality_loop::{
|
use crate::finality_loop::{
|
||||||
prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof,
|
prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof,
|
||||||
FinalityProofs, FinalitySyncParams, SourceClient, TargetClient,
|
select_header_to_submit, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient,
|
||||||
|
TargetClient,
|
||||||
};
|
};
|
||||||
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
|
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
|
||||||
|
|
||||||
@@ -165,8 +166,11 @@ impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData {
|
fn prepare_test_clients(
|
||||||
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
|
exit_sender: futures::channel::mpsc::UnboundedSender<()>,
|
||||||
|
state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
|
||||||
|
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
|
||||||
|
) -> (TestSourceClient, TestTargetClient) {
|
||||||
let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> = Arc::new(move |data| {
|
let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> = Arc::new(move |data| {
|
||||||
if state_function(data) {
|
if state_function(data) {
|
||||||
exit_sender.unbounded_send(()).unwrap();
|
exit_sender.unbounded_send(()).unwrap();
|
||||||
@@ -174,7 +178,30 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
|
|||||||
});
|
});
|
||||||
let clients_data = Arc::new(Mutex::new(ClientsData {
|
let clients_data = Arc::new(Mutex::new(ClientsData {
|
||||||
source_best_block_number: 10,
|
source_best_block_number: 10,
|
||||||
source_headers: vec![
|
source_headers,
|
||||||
|
source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
|
||||||
|
|
||||||
|
target_best_block_number: 5,
|
||||||
|
target_headers: vec![],
|
||||||
|
}));
|
||||||
|
(
|
||||||
|
TestSourceClient {
|
||||||
|
on_method_call: internal_state_function.clone(),
|
||||||
|
data: clients_data.clone(),
|
||||||
|
},
|
||||||
|
TestTargetClient {
|
||||||
|
on_method_call: internal_state_function,
|
||||||
|
data: clients_data,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData {
|
||||||
|
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
|
||||||
|
let (source_client, target_client) = prepare_test_clients(
|
||||||
|
exit_sender,
|
||||||
|
state_function,
|
||||||
|
vec![
|
||||||
(6, (TestSourceHeader(false, 6), None)),
|
(6, (TestSourceHeader(false, 6), None)),
|
||||||
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
|
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
|
||||||
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))),
|
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))),
|
||||||
@@ -183,25 +210,15 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
|
|||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect(),
|
.collect(),
|
||||||
source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
|
);
|
||||||
|
|
||||||
target_best_block_number: 5,
|
|
||||||
target_headers: vec![],
|
|
||||||
}));
|
|
||||||
let source_client = TestSourceClient {
|
|
||||||
on_method_call: internal_state_function.clone(),
|
|
||||||
data: clients_data.clone(),
|
|
||||||
};
|
|
||||||
let target_client = TestTargetClient {
|
|
||||||
on_method_call: internal_state_function,
|
|
||||||
data: clients_data.clone(),
|
|
||||||
};
|
|
||||||
let sync_params = FinalitySyncParams {
|
let sync_params = FinalitySyncParams {
|
||||||
tick: Duration::from_secs(0),
|
tick: Duration::from_secs(0),
|
||||||
recent_finality_proofs_limit: 1024,
|
recent_finality_proofs_limit: 1024,
|
||||||
stall_timeout: Duration::from_secs(1),
|
stall_timeout: Duration::from_secs(1),
|
||||||
|
only_mandatory_headers: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let clients_data = source_client.data.clone();
|
||||||
let _ = async_std::task::block_on(run(
|
let _ = async_std::task::block_on(run(
|
||||||
source_client,
|
source_client,
|
||||||
target_client,
|
target_client,
|
||||||
@@ -259,6 +276,65 @@ fn finality_sync_loop_works() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn run_only_mandatory_headers_mode_test(
|
||||||
|
only_mandatory_headers: bool,
|
||||||
|
has_mandatory_headers: bool,
|
||||||
|
) -> Option<(TestSourceHeader, TestFinalityProof)> {
|
||||||
|
let (exit_sender, _) = futures::channel::mpsc::unbounded();
|
||||||
|
let (source_client, target_client) = prepare_test_clients(
|
||||||
|
exit_sender,
|
||||||
|
|_| false,
|
||||||
|
vec![
|
||||||
|
(6, (TestSourceHeader(false, 6), Some(TestFinalityProof(6)))),
|
||||||
|
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
|
||||||
|
(
|
||||||
|
8,
|
||||||
|
(TestSourceHeader(has_mandatory_headers, 8), Some(TestFinalityProof(8))),
|
||||||
|
),
|
||||||
|
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))),
|
||||||
|
(10, (TestSourceHeader(false, 10), Some(TestFinalityProof(10)))),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
async_std::task::block_on(select_header_to_submit(
|
||||||
|
&source_client,
|
||||||
|
&target_client,
|
||||||
|
&mut RestartableFinalityProofsStream::from(futures::stream::empty().boxed()),
|
||||||
|
&mut vec![],
|
||||||
|
10,
|
||||||
|
5,
|
||||||
|
&FinalitySyncParams {
|
||||||
|
tick: Duration::from_secs(0),
|
||||||
|
recent_finality_proofs_limit: 0,
|
||||||
|
stall_timeout: Duration::from_secs(0),
|
||||||
|
only_mandatory_headers,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required() {
|
||||||
|
assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
|
||||||
|
assert_eq!(
|
||||||
|
run_only_mandatory_headers_mode_test(false, false),
|
||||||
|
Some((TestSourceHeader(false, 10), TestFinalityProof(10))),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() {
|
||||||
|
assert_eq!(
|
||||||
|
run_only_mandatory_headers_mode_test(true, true),
|
||||||
|
Some((TestSourceHeader(true, 8), TestFinalityProof(8))),
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
run_only_mandatory_headers_mode_test(false, true),
|
||||||
|
Some((TestSourceHeader(true, 8), TestFinalityProof(8))),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn select_better_recent_finality_proof_works() {
|
fn select_better_recent_finality_proof_works() {
|
||||||
// if there are no unjustified headers, nothing is changed
|
// if there are no unjustified headers, nothing is changed
|
||||||
|
|||||||
Reference in New Issue
Block a user