Files
pezkuwi-sdk/pezbridges/relays/teyrchains/src/teyrchains_loop.rs
T
pezkuwichain 479010094e fix(ci): resolve all quick-checks failures
- Remove missing cli crate from workspace members
- Fix TOML array syntax errors in pvf and benchmarking-cli Cargo.toml
- Fix Rust import ordering with cargo fmt
- Fix feature propagation with zepter (try-runtime, runtime-benchmarks, std)
2026-01-04 17:22:12 +03:00

1267 lines
37 KiB
Rust

// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{teyrchains_loop_metrics::TeyrchainsLoopMetrics, TeyrchainsPipeline};
use async_trait::async_trait;
use futures::{
future::{FutureExt, Shared},
poll, select_biased,
};
use pezbp_pezkuwi_core::{
teyrchains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use relay_bizinikiwi_client::{BlockNumberOf, Chain, HeaderIdOf, TeyrchainBase};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
TrackedTransactionStatus, TransactionTracker,
};
use std::{future::Future, pin::Pin, task::Poll};
/// Teyrchain header availability at a certain chain.
#[derive(Clone, Copy, Debug)]
pub enum AvailableHeader<T> {
/// The client can not report actual teyrchain head at this moment.
///
/// It is a "mild" error, which may appear when e.g. on-demand teyrchains relay is used.
/// This variant must be treated as "we don't want to update teyrchain head value at the
/// target chain at this moment".
Unavailable,
/// There's no teyrchain header at the relay chain.
///
/// Normally it means that the teyrchain is not registered there.
Missing,
/// Teyrchain head with given hash is available at the source chain.
Available(T),
}
impl<T> AvailableHeader<T> {
/// Return available header.
pub fn as_available(&self) -> Option<&T> {
match *self {
AvailableHeader::Available(ref header) => Some(header),
_ => None,
}
}
}
impl<T> From<Option<T>> for AvailableHeader<T> {
fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
match maybe_header {
Some(header) => AvailableHeader::Available(header),
None => AvailableHeader::Missing,
}
}
}
/// Source client used in teyrchain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: TeyrchainsPipeline>: RelayClient {
/// Returns `Ok(true)` if client is in synced state.
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
/// Get teyrchain head id at given block.
async fn teyrchain_head(
&self,
at_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<AvailableHeader<HeaderIdOf<P::SourceTeyrchain>>, Self::Error>;
/// Get teyrchain head proof at given block.
async fn prove_teyrchain_head(
&self,
at_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
}
/// Target client used in teyrchain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: TeyrchainsPipeline>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
/// is `Some(_)`, the returned
async fn best_finalized_source_relay_chain_block(
&self,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
/// Get free source **relay** headers submission interval, if it is configured in the
/// target runtime. We assume that the target chain will accept teyrchain header, proved
/// at such relay header for free.
async fn free_source_relay_headers_interval(
&self,
) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
/// Get teyrchain head id at given block.
async fn teyrchain_head(
&self,
at_block: HeaderIdOf<P::TargetChain>,
) -> Result<
Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceTeyrchain>)>,
Self::Error,
>;
/// Submit teyrchain heads proof.
async fn submit_teyrchain_head_proof(
&self,
at_source_block: HeaderIdOf<P::SourceRelayChain>,
para_head_hash: ParaHash,
proof: ParaHeadsProof,
is_free_execution_expected: bool,
) -> Result<Self::TransactionTracker, Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the teyrchains
/// sync loop.
pub fn metrics_prefix<P: TeyrchainsPipeline>() -> String {
format!(
"{}_to_{}_Teyrchains_{}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
P::SourceTeyrchain::TEYRCHAIN_ID
)
}
/// Relay single teyrchain head.
pub async fn relay_single_head<P: TeyrchainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<(), ()>
where
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
let tx_tracker =
submit_selected_head::<P, _>(&source_client, &target_client, at_relay_block, false)
.await
.map_err(drop)?;
match tx_tracker.wait().await {
TrackedTransactionStatus::Finalized(_) => Ok(()),
TrackedTransactionStatus::Lost => {
tracing::error!(
target: "bridge",
source=%P::SourceTeyrchain::NAME,
target=%P::TargetChain::NAME,
?at_relay_block,
"Transaction with header at relay header is considered lost"
);
Err(())
},
}
}
/// Run teyrchain heads synchronization.
pub async fn run<P: TeyrchainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
only_free_headers: bool,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
tracing::info!(
target: "bridge",
source=%P::SourceTeyrchain::NAME,
target=%P::TargetChain::NAME,
?only_free_headers,
"Starting source -> target finality proof relay: relaying headers"
);
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(TeyrchainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
metrics,
only_free_headers,
exit_signal.clone(),
)
})
.await
}
/// Run teyrchain heads synchronization.
async fn run_until_connection_lost<P: TeyrchainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics: Option<TeyrchainsLoopMetrics>,
only_free_headers: bool,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
// free teyrchain header = header, available (proved) at free relay chain block. Let's
// read interval of free source relay chain blocks from target client
let free_source_relay_headers_interval = if only_free_headers {
let free_source_relay_headers_interval =
target_client.free_source_relay_headers_interval().await.map_err(|e| {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
"Failed to read free headers interval"
);
FailedClient::Target
})?;
match free_source_relay_headers_interval {
Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
tracing::trace!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
?free_source_relay_headers_interval,
"Free headers interval"
);
free_source_relay_headers_interval
},
_ => {
tracing::warn!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
?free_source_relay_headers_interval,
"Invalid free headers interval"
);
return Err(FailedClient::Target);
},
}
} else {
// ignore - we don't need it
0
};
let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
// regular errors.
loop {
// Either wait for new block, or exit signal.
// Please note that we are prioritizing the exit signal since if both events happen at once
// it doesn't make sense to perform one more loop iteration.
select_biased! {
_ = exit_signal => return Ok(()),
_ = async_std::task::sleep(min_block_interval).fuse() => {},
}
// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
// much redundant transactions
match source_client.ensure_synced().await {
Ok(true) => (),
Ok(false) => {
tracing::warn!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
"Client is syncing. Won't do anything until it is synced"
);
continue;
},
Err(e) => {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
"Client has failed to return its sync status"
);
return Err(FailedClient::Source);
},
}
// if we have active transaction, we'll need to wait until it is mined or dropped
let best_target_block = target_client.best_block().await.map_err(|e| {
tracing::warn!(target: "bridge", error=?e, source=%P::SourceRelayChain::NAME, "Failed to read best block");
FailedClient::Target
})?;
let (relay_of_head_at_target, head_at_target) =
read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
// check if our transaction has been mined
if let Some(tracker) = submitted_heads_tracker.take() {
match tracker.update(&best_target_block, &head_at_target).await {
SubmittedHeadStatus::Waiting(tracker) => {
// no news about our transaction and we shall keep waiting
submitted_heads_tracker = Some(tracker);
continue;
},
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
// all heads have been updated, we don't need this tracker anymore
},
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
tracing::warn!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
"Teyrchains synchronization has stalled. Going to restart",
);
return Err(FailedClient::Both);
},
}
}
// in all-headers strategy we'll be submitting para head, available at
// `best_finalized_relay_block_at_target`
let best_finalized_relay_block_at_target = target_client
.best_finalized_source_relay_chain_block(&best_target_block)
.await
.map_err(|e| {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
"Failed to read best finalized block"
);
FailedClient::Target
})?;
// ..but if we only need to submit free headers, we need to submit para
// head, available at best free source relay chain header, known to the
// target chain
let prove_at_relay_block = if only_free_headers {
match relay_of_head_at_target {
Some(relay_of_head_at_target) => {
// find last free relay chain header in the range that we are interested in
let scan_range_begin = relay_of_head_at_target.number();
let scan_range_end = best_finalized_relay_block_at_target.number();
if scan_range_end.saturating_sub(scan_range_begin)
< free_source_relay_headers_interval
{
// there are no new **free** relay chain headers in the range
tracing::trace!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
target=%P::TargetChain::NAME,
?scan_range_begin,
?scan_range_end,
"Waiting for new free headers: scanned"
);
continue;
}
// we may submit new teyrchain head for free
best_finalized_relay_block_at_target
},
None => {
// no teyrchain head at target => let's submit first one
best_finalized_relay_block_at_target
},
}
} else {
best_finalized_relay_block_at_target
};
// now let's check if we need to update teyrchain head at all
let head_at_source =
read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
let is_update_required = is_update_required::<P>(
head_at_source,
head_at_target,
prove_at_relay_block,
best_target_block,
);
if is_update_required {
let transaction_tracker = submit_selected_head::<P, _>(
&source_client,
&target_client,
prove_at_relay_block,
only_free_headers,
)
.await?;
submitted_heads_tracker =
Some(SubmittedHeadsTracker::<P>::new(head_at_source, transaction_tracker));
}
}
}
/// Prove and submit teyrchain head at given relay chain block.
async fn submit_selected_head<P: TeyrchainsPipeline, TC: TargetClient<P>>(
source_client: &impl SourceClient<P>,
target_client: &TC,
prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
only_free_headers: bool,
) -> Result<TC::TransactionTracker, FailedClient> {
let (head_proof, head_hash) =
source_client.prove_teyrchain_head(prove_at_relay_block).await.map_err(|e| {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
"Failed to prove teyrchain heads",
);
FailedClient::Source
})?;
tracing::info!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
?prove_at_relay_block,
?head_hash,
"Submitting teyrchain head update transaction. Para hash at source relay"
);
target_client
.submit_teyrchain_head_proof(prove_at_relay_block, head_hash, head_proof, only_free_headers)
.await
.map_err(|e| {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
"Failed to submit teyrchain heads proof"
);
FailedClient::Target
})
}
/// Returns `true` if we need to submit teyrchain-head-update transaction.
fn is_update_required<P: TeyrchainsPipeline>(
head_at_source: AvailableHeader<HeaderIdOf<P::SourceTeyrchain>>,
head_at_target: Option<HeaderIdOf<P::SourceTeyrchain>>,
prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
best_target_block: HeaderIdOf<P::TargetChain>,
) -> bool
where
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
tracing::trace!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
?prove_at_relay_block,
?head_at_source,
?best_target_block,
?head_at_target,
"Checking if teyrchain needs update"
);
let needs_update = match (head_at_source, head_at_target) {
(AvailableHeader::Unavailable, _) => {
// source client has politely asked us not to update current teyrchain head
// at the target chain
false
},
(AvailableHeader::Available(head_at_source), Some(head_at_target))
if head_at_source.number() > head_at_target.number() =>
{
// source client knows head that is better than the head known to the target
// client
true
},
(AvailableHeader::Available(_), Some(_)) => {
// this is normal case when relay has recently updated heads, when teyrchain is
// not progressing, or when our source client is still syncing
false
},
(AvailableHeader::Available(_), None) => {
// teyrchain is not yet known to the target client. This is true when teyrchain
// or bridge has been just onboarded/started
true
},
(AvailableHeader::Missing, Some(_)) => {
// teyrchain/parathread has been offboarded removed from the system. It needs to
// be propagated to the target client
true
},
(AvailableHeader::Missing, None) => {
// all's good - teyrchain is unknown to both clients
false
},
};
if needs_update {
tracing::trace!(
target: "bridge",
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
?head_at_source,
?head_at_target,
"Teyrchain needs update"
);
}
needs_update
}
/// Reads teyrchain head from the source client.
async fn read_head_at_source<P: TeyrchainsPipeline>(
source_client: &impl SourceClient<P>,
metrics: Option<&TeyrchainsLoopMetrics>,
at_relay_block: &HeaderIdOf<P::SourceRelayChain>,
) -> Result<AvailableHeader<HeaderIdOf<P::SourceTeyrchain>>, FailedClient> {
let para_head = source_client.teyrchain_head(*at_relay_block).await;
match para_head {
Ok(AvailableHeader::Available(para_head)) => {
if let Some(metrics) = metrics {
metrics.update_best_teyrchain_block_at_source(
ParaId(P::SourceTeyrchain::TEYRCHAIN_ID),
para_head.number(),
);
}
Ok(AvailableHeader::Available(para_head))
},
Ok(r) => Ok(r),
Err(e) => {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
para_id=?P::SourceTeyrchain::TEYRCHAIN_ID,
"Failed to read head of teyrchain"
);
Err(FailedClient::Source)
},
}
}
/// Reads teyrchain head from the target client. Also returns source relay chain header
/// that has been used to prove that head.
async fn read_head_at_target<P: TeyrchainsPipeline>(
target_client: &impl TargetClient<P>,
metrics: Option<&TeyrchainsLoopMetrics>,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<
(Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceTeyrchain>>),
FailedClient,
> {
let para_head_id = target_client.teyrchain_head(*at_block).await;
match para_head_id {
Ok(Some((relay_header_id, para_head_id))) => {
if let Some(metrics) = metrics {
metrics.update_best_teyrchain_block_at_target(
ParaId(P::SourceTeyrchain::TEYRCHAIN_ID),
para_head_id.number(),
);
}
Ok((Some(relay_header_id), Some(para_head_id)))
},
Ok(None) => Ok((None, None)),
Err(e) => {
tracing::warn!(
target: "bridge",
error=?e,
source=%P::SourceRelayChain::NAME,
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
"Failed to read head of teyrchain"
);
Err(FailedClient::Target)
},
}
}
/// Submitted heads status.
enum SubmittedHeadStatus<P: TeyrchainsPipeline> {
/// Heads are not yet updated.
Waiting(SubmittedHeadsTracker<P>),
/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
}
/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
type SharedTransactionTracker<P> = Shared<
Pin<
Box<
dyn Future<
Output = TrackedTransactionStatus<
HeaderIdOf<<P as TeyrchainsPipeline>::TargetChain>,
>,
> + Send,
>,
>,
>;
/// Submitted teyrchain heads transaction.
struct SubmittedHeadsTracker<P: TeyrchainsPipeline> {
/// Teyrchain header id that we have submitted.
submitted_head: AvailableHeader<HeaderIdOf<P::SourceTeyrchain>>,
/// Future that waits for submitted transaction finality or loss.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
transaction_tracker: SharedTransactionTracker<P>,
}
impl<P: TeyrchainsPipeline> SubmittedHeadsTracker<P> {
/// Creates new teyrchain heads transaction tracker.
pub fn new(
submitted_head: AvailableHeader<HeaderIdOf<P::SourceTeyrchain>>,
transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
) -> Self {
SubmittedHeadsTracker {
submitted_head,
transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
}
}
/// Returns `None` if all submitted teyrchain heads have been updated.
pub async fn update(
self,
at_target_block: &HeaderIdOf<P::TargetChain>,
head_at_target: &Option<HeaderIdOf<P::SourceTeyrchain>>,
) -> SubmittedHeadStatus<P> {
// check if our head has been updated
let is_head_updated = match (self.submitted_head, head_at_target) {
(AvailableHeader::Available(submitted_head), Some(head_at_target))
if head_at_target.number() >= submitted_head.number() =>
{
true
},
(AvailableHeader::Missing, None) => true,
_ => false,
};
if is_head_updated {
tracing::trace!(
target: "bridge",
para_id=%P::SourceTeyrchain::TEYRCHAIN_ID,
target=%P::TargetChain::NAME,
?head_at_target,
"Head of teyrchain has been updated"
);
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(
*at_target_block,
));
}
// if underlying transaction tracker has reported that the transaction is lost, we may
// then restart our sync
let transaction_tracker = self.transaction_tracker.clone();
match poll!(transaction_tracker) {
Poll::Ready(TrackedTransactionStatus::Lost) => {
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
},
Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
// so we are here and our transaction is mined+finalized, but some of heads were not
// updated => we're considering our loop as stalled
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost);
},
_ => (),
}
SubmittedHeadStatus::Waiting(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::{Arc, Mutex};
use futures::{SinkExt, StreamExt};
use pezsp_core::H256;
use relay_bizinikiwi_client::test_chain::{TestChain, TestTeyrchain};
use relay_utils::{HeaderId, MaybeConnectionError};
use std::collections::HashMap;
const PARA_10_HASH: ParaHash = H256([10u8; 32]);
const PARA_20_HASH: ParaHash = H256([20u8; 32]);
#[derive(Clone, Debug)]
enum TestError {
Error,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestTeyrchainsPipeline;
impl TeyrchainsPipeline for TestTeyrchainsPipeline {
type SourceRelayChain = TestChain;
type SourceTeyrchain = TestTeyrchain;
type TargetChain = TestChain;
}
#[derive(Clone, Debug)]
struct TestClient {
data: Arc<Mutex<TestClientData>>,
}
#[derive(Clone, Debug)]
struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = HeaderIdOf<TestChain>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
match self.0 {
Some(status) => status,
None => futures::future::pending().await,
}
}
}
#[derive(Clone, Debug)]
struct TestClientData {
source_sync_status: Result<bool, TestError>,
source_head: HashMap<
BlockNumberOf<TestChain>,
Result<AvailableHeader<HeaderIdOf<TestTeyrchain>>, TestError>,
>,
source_proof: Result<(), TestError>,
target_free_source_relay_headers_interval:
Result<Option<BlockNumberOf<TestChain>>, TestError>,
target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
#[allow(clippy::type_complexity)]
target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestTeyrchain>)>, TestError>,
target_submit_result: Result<(), TestError>,
submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
}
impl TestClientData {
pub fn minimal() -> Self {
TestClientData {
source_sync_status: Ok(true),
source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
.into_iter()
.collect(),
source_proof: Ok(()),
target_free_source_relay_headers_interval: Ok(None),
target_best_block: Ok(HeaderId(0, Default::default())),
target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
target_head: Ok(None),
target_submit_result: Ok(()),
submitted_proof_at_source_relay_block: None,
exit_signal_sender: None,
}
}
pub fn with_exit_signal_sender(
sender: futures::channel::mpsc::UnboundedSender<()>,
) -> Self {
let mut client = Self::minimal();
client.exit_signal_sender = Some(Box::new(sender));
client
}
}
impl From<TestClientData> for TestClient {
fn from(data: TestClientData) -> TestClient {
TestClient { data: Arc::new(Mutex::new(data)) }
}
}
#[async_trait]
impl RelayClient for TestClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unimplemented!()
}
}
#[async_trait]
impl SourceClient<TestTeyrchainsPipeline> for TestClient {
async fn ensure_synced(&self) -> Result<bool, TestError> {
self.data.lock().await.source_sync_status.clone()
}
async fn teyrchain_head(
&self,
at_block: HeaderIdOf<TestChain>,
) -> Result<AvailableHeader<HeaderIdOf<TestTeyrchain>>, TestError> {
self.data
.lock()
.await
.source_head
.get(&at_block.0)
.expect(&format!("SourceClient::teyrchain_head({})", at_block.0))
.clone()
}
async fn prove_teyrchain_head(
&self,
at_block: HeaderIdOf<TestChain>,
) -> Result<(ParaHeadsProof, ParaHash), TestError> {
let head_result =
SourceClient::<TestTeyrchainsPipeline>::teyrchain_head(self, at_block).await?;
let head = head_result.as_available().unwrap();
let proof = (ParaHeadsProof { storage_proof: Default::default() }, head.hash());
self.data.lock().await.source_proof.clone().map(|_| proof)
}
}
#[async_trait]
impl TargetClient<TestTeyrchainsPipeline> for TestClient {
type TransactionTracker = TestTransactionTracker;
async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
self.data.lock().await.target_best_block.clone()
}
async fn best_finalized_source_relay_chain_block(
&self,
_at_block: &HeaderIdOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, TestError> {
self.data.lock().await.target_best_finalized_source_block.clone()
}
async fn free_source_relay_headers_interval(
&self,
) -> Result<Option<BlockNumberOf<TestTeyrchain>>, TestError> {
self.data.lock().await.target_free_source_relay_headers_interval.clone()
}
async fn teyrchain_head(
&self,
_at_block: HeaderIdOf<TestChain>,
) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestTeyrchain>)>, TestError> {
self.data.lock().await.target_head.clone()
}
async fn submit_teyrchain_head_proof(
&self,
at_source_block: HeaderIdOf<TestChain>,
_updated_teyrchain_head: ParaHash,
_proof: ParaHeadsProof,
_is_free_execution_expected: bool,
) -> Result<TestTransactionTracker, Self::Error> {
let mut data = self.data.lock().await;
data.target_submit_result.clone()?;
data.submitted_proof_at_source_relay_block = Some(at_source_block);
if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(TestTransactionTracker(Some(
TrackedTransactionStatus::Finalized(Default::default()),
)))
}
}
#[test]
fn when_source_client_fails_to_return_sync_state() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_sync_status = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_fails_to_return_best_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_heads() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_head = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_best_finalized_source_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_finalized_source_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_source_client_fails_to_read_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_head.insert(0, Err(TestError::Error));
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_source_client_fails_to_prove_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_proof = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_rejects_update_transaction() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_submit_result = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn minimal_working_case() {
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
None,
false,
exit_signal.into_future().map(|(_, _)| ()),
)),
Ok(()),
);
}
#[async_std::test]
async fn free_headers_are_relayed() {
// prepare following case:
// 1) best source relay at target: 95
// 2) best source teyrchain at target: 5 at relay 50
// 3) free headers interval: 10
// 4) at source relay chain block 90 source teyrchain block is 9
// +
// 5) best finalized source relay chain block is 95
// 6) at source relay chain block 95 source teyrchain block is 42
// =>
// teyrchain block 42 would have been relayed, because 95 - 50 > 10
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
let clients_data = TestClientData {
source_sync_status: Ok(true),
source_head: vec![
(90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
(95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
]
.into_iter()
.collect(),
source_proof: Ok(()),
target_free_source_relay_headers_interval: Ok(Some(10)),
target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),
target_submit_result: Ok(()),
submitted_proof_at_source_relay_block: None,
exit_signal_sender: Some(Box::new(exit_signal_sender)),
};
let source_client = TestClient::from(clients_data.clone());
let target_client = TestClient::from(clients_data);
assert_eq!(
run_until_connection_lost(
source_client,
target_client.clone(),
None,
true,
exit_signal.into_future().map(|(_, _)| ()),
)
.await,
Ok(()),
);
assert_eq!(
target_client
.data
.lock()
.await
.submitted_proof_at_source_relay_block
.map(|id| id.0),
Some(95)
);
// now source relay block chain 104 is mined with teyrchain head #84
// => since 104 - 95 < 10, there are no free headers
// => nothing is submitted
let mut clients_data: TestClientData = target_client.data.lock().await.clone();
clients_data
.source_head
.insert(104, Ok(AvailableHeader::Available(HeaderId(84, [84u8; 32].into()))));
clients_data.target_best_finalized_source_block = Ok(HeaderId(104, [104u8; 32].into()));
clients_data.target_head =
Ok(Some((HeaderId(95, [95u8; 32].into()), HeaderId(42, [42u8; 32].into()))));
clients_data.target_best_block = Ok(HeaderId(255, [255u8; 32].into()));
clients_data.exit_signal_sender = None;
let source_client = TestClient::from(clients_data.clone());
let target_client = TestClient::from(clients_data);
assert_eq!(
run_until_connection_lost(
source_client,
target_client.clone(),
None,
true,
async_std::task::sleep(std::time::Duration::from_millis(100)),
)
.await,
Ok(()),
);
assert_eq!(
target_client
.data
.lock()
.await
.submitted_proof_at_source_relay_block
.map(|id| id.0),
Some(95)
);
}
fn test_tx_tracker() -> SubmittedHeadsTracker<TestTeyrchainsPipeline> {
SubmittedHeadsTracker::new(
AvailableHeader::Available(HeaderId(20, PARA_20_HASH)),
TestTransactionTracker(None),
)
}
impl From<SubmittedHeadStatus<TestTeyrchainsPipeline>> for Option<()> {
fn from(status: SubmittedHeadStatus<TestTeyrchainsPipeline>) -> Option<()> {
match status {
SubmittedHeadStatus::Waiting(_) => Some(()),
_ => None,
}
}
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_none_value() {
assert_eq!(
Some(()),
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await
.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_old_value() {
assert_eq!(
Some(()),
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await
.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_same_value() {
assert!(matches!(
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
));
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_better_value() {
assert!(matches!(
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_lost() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
.boxed()
.shared();
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[test]
fn teyrchain_is_not_updated_if_it_is_unavailable() {
assert!(!is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Unavailable,
None,
Default::default(),
Default::default(),
));
assert!(!is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Unavailable,
Some(HeaderId(10, PARA_10_HASH)),
Default::default(),
Default::default(),
));
}
#[test]
fn teyrchain_is_not_updated_if_it_is_unknown_to_both_clients() {
assert!(!is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Missing,
None,
Default::default(),
Default::default(),
),);
}
#[test]
fn teyrchain_is_not_updated_if_target_has_better_head() {
assert!(!is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Available(HeaderId(10, Default::default())),
Some(HeaderId(20, Default::default())),
Default::default(),
Default::default(),
),);
}
#[test]
fn teyrchain_is_updated_after_offboarding() {
assert!(is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Missing,
Some(HeaderId(20, Default::default())),
Default::default(),
Default::default(),
),);
}
#[test]
fn teyrchain_is_updated_after_onboarding() {
assert!(is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Available(HeaderId(30, Default::default())),
None,
Default::default(),
Default::default(),
),);
}
#[test]
fn teyrchain_is_updated_if_newer_head_is_known() {
assert!(is_update_required::<TestTeyrchainsPipeline>(
AvailableHeader::Available(HeaderId(40, Default::default())),
Some(HeaderId(30, Default::default())),
Default::default(),
Default::default(),
),);
}
}