use transaction tracker in parachains relay (#1575)

* use transaction tracker in parachains relay

* actually return tx tracker from target client implementation
This commit is contained in:
Svyatoslav Nikolsky
2022-09-22 15:21:51 +03:00
committed by Bastian Köcher
parent 6ab6a876a1
commit 86be60ad40
2 changed files with 139 additions and 73 deletions
@@ -34,7 +34,7 @@ use parachains_relay::{
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
HeaderIdOf, HeaderOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme,
UnsignedTransaction,
TransactionTracker, UnsignedTransaction,
};
use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
use sp_core::{Bytes, Pair};
@@ -86,6 +86,8 @@ where
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error> {
let best_header = self.client.best_header().await?;
let best_id = best_header.id();
@@ -172,7 +174,7 @@ where
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
updated_parachains: Vec<(ParaId, ParaHash)>,
proof: ParaHeadsProof,
) -> Result<(), Self::Error> {
) -> Result<Self::TransactionTracker, Self::Error> {
let genesis_hash = *self.client.genesis_hash();
let transaction_params = self.transaction_params.clone();
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?;
@@ -182,7 +184,7 @@ where
proof,
);
self.client
.submit_signed_extrinsic(
.submit_and_watch_signed_extrinsic(
self.transaction_params.signer.public().into(),
SignParam::<P::TransactionSignScheme> {
spec_version,
@@ -196,6 +198,5 @@ where
},
)
.await
.map(drop)
}
}
+133 -68
View File
@@ -22,13 +22,21 @@ use bp_polkadot_core::{
parachains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use futures::{future::FutureExt, select};
use futures::{
future::{FutureExt, Shared},
poll, select,
};
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
TrackedTransactionStatus, TransactionTracker,
};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
time::{Duration, Instant},
pin::Pin,
task::Poll,
time::Duration,
};
/// Parachain heads synchronization params.
@@ -115,6 +123,9 @@ pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker;
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
@@ -141,7 +152,7 @@ pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
at_source_block: HeaderIdOf<P::SourceChain>,
updated_parachains: Vec<(ParaId, ParaHash)>,
proof: ParaHeadsProof,
) -> Result<(), Self::Error>;
) -> Result<Self::TransactionTracker, Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
@@ -196,7 +207,7 @@ where
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut tx_tracker: Option<TransactionTracker<P>> = None;
let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
futures::pin_mut!(exit_signal);
@@ -246,9 +257,29 @@ where
&sync_params.parachains,
)
.await?;
tx_tracker = tx_tracker.take().and_then(|tx_tracker| tx_tracker.update(&heads_at_target));
if tx_tracker.is_some() {
// check if our transaction has been mined
if let Some(tracker) = submitted_heads_tracker.take() {
match tracker.update(&heads_at_target).await {
SubmittedHeadsStatus::Waiting(tracker) => {
// no news about our transaction and we shall keep waiting
submitted_heads_tracker = Some(tracker);
continue
},
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) => {
// all heads have been updated, we don't need this tracker anymore
},
SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) => {
log::warn!(
target: "bridge",
"Parachains synchronization from {} to {} has stalled. Going to restart",
P::SourceChain::NAME,
P::TargetChain::NAME,
);
return Err(FailedClient::Both)
},
}
}
// we have no active transaction and may need to update heads, but do we have something for
@@ -317,7 +348,7 @@ where
"Incorrect parachains SourceClient implementation"
);
target_client
let transaction_tracker = target_client
.submit_parachain_heads_proof(
best_finalized_relay_block,
updated_ids.iter().cloned().zip(head_hashes).collect(),
@@ -334,11 +365,10 @@ where
);
FailedClient::Target
})?;
tx_tracker = Some(TransactionTracker::<P>::new(
submitted_heads_tracker = Some(SubmittedHeadsTracker::<P>::new(
updated_ids,
best_finalized_relay_block.0,
sync_params.stall_timeout,
transaction_tracker,
));
}
}
@@ -494,19 +524,27 @@ async fn read_heads_at_target<P: ParachainsPipeline>(
Ok(para_best_head_hashes)
}
/// Parachain heads transaction tracker.
struct TransactionTracker<P: ParachainsPipeline> {
/// Submitted heads status.
enum SubmittedHeadsStatus<P: ParachainsPipeline> {
/// Heads are not yet updated.
Waiting(SubmittedHeadsTracker<P>),
/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
Final(TrackedTransactionStatus),
}
/// Submitted parachain heads transaction.
struct SubmittedHeadsTracker<P: ParachainsPipeline> {
/// Ids of parachains which heads were updated in the tracked transaction.
awaiting_update: BTreeSet<ParaId>,
/// Number of relay chain block that has been used to craft parachain heads proof.
relay_block_number: BlockNumberOf<P::SourceChain>,
/// Transaction submit time.
submitted_at: Instant,
/// Transaction death time.
death_time: Instant,
/// Future that waits for submitted transaction finality or loss.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
transaction_tracker: Shared<Pin<Box<dyn Future<Output = TrackedTransactionStatus> + Send>>>,
}
impl<P: ParachainsPipeline> TransactionTracker<P>
impl<P: ParachainsPipeline> SubmittedHeadsTracker<P>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
@@ -514,22 +552,20 @@ where
pub fn new(
awaiting_update: impl IntoIterator<Item = ParaId>,
relay_block_number: BlockNumberOf<P::SourceChain>,
stall_timeout: Duration,
transaction_tracker: impl TransactionTracker + 'static,
) -> Self {
let now = Instant::now();
TransactionTracker {
SubmittedHeadsTracker {
awaiting_update: awaiting_update.into_iter().collect(),
relay_block_number,
submitted_at: now,
death_time: now + stall_timeout,
transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
}
}
/// Returns `None` if all parachain heads have been updated or we consider our transaction dead.
pub fn update(
/// Returns `None` if all submitted parachain heads have been updated.
pub async fn update(
mut self,
heads_at_target: &BTreeMap<ParaId, Option<BestParaHeadHash>>,
) -> Option<Self> {
) -> SubmittedHeadsStatus<P> {
// remove all pending heads that were synced
for (para, best_para_head) in heads_at_target {
if best_para_head
@@ -554,23 +590,17 @@ where
// if we have synced all required heads, we are done
if self.awaiting_update.is_empty() {
return None
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized)
}
// if our transaction is dead now, we may start over again
let now = Instant::now();
if now >= self.death_time {
log::warn!(
target: "bridge",
"Parachain heads update transaction {} has been lost: no updates for {}s",
P::TargetChain::NAME,
(now - self.submitted_at).as_secs(),
);
return None
// if underlying transaction tracker has reported that the transaction is lost, we may
// then restart our sync
let transaction_tracker = self.transaction_tracker.clone();
if let Poll::Ready(TrackedTransactionStatus::Lost) = poll!(transaction_tracker) {
return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost)
}
Some(self)
SubmittedHeadsStatus::Waiting(self)
}
}
@@ -613,6 +643,16 @@ mod tests {
data: Arc<Mutex<TestClientData>>,
}
#[derive(Clone, Debug)]
struct TestTransactionTracker(TrackedTransactionStatus);
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
self.0
}
}
#[derive(Clone, Debug)]
struct TestClientData {
source_sync_status: Result<bool, TestError>,
@@ -711,6 +751,8 @@ mod tests {
#[async_trait]
impl TargetClient<TestParachainsPipeline> for TestClient {
type TransactionTracker = TestTransactionTracker;
async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
self.data.lock().await.target_best_block.clone()
}
@@ -736,13 +778,14 @@ mod tests {
_at_source_block: HeaderIdOf<TestChain>,
_updated_parachains: Vec<(ParaId, ParaHash)>,
_proof: ParaHeadsProof,
) -> Result<(), Self::Error> {
self.data.lock().await.target_submit_result.clone()?;
) -> Result<TestTransactionTracker, Self::Error> {
let mut data = self.data.lock().await;
data.target_submit_result.clone()?;
if let Some(mut exit_signal_sender) = self.data.lock().await.exit_signal_sender.take() {
if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(())
Ok(TestTransactionTracker(TrackedTransactionStatus::Finalized))
}
}
@@ -891,27 +934,35 @@ mod tests {
const PARA_1_ID: u32 = PARA_ID + 1;
const SOURCE_BLOCK_NUMBER: u32 = 100;
fn test_tx_tracker() -> TransactionTracker<TestParachainsPipeline> {
TransactionTracker::new(
fn test_tx_tracker() -> SubmittedHeadsTracker<TestParachainsPipeline> {
SubmittedHeadsTracker::new(
vec![ParaId(PARA_ID), ParaId(PARA_1_ID)],
SOURCE_BLOCK_NUMBER,
Duration::from_secs(1),
TestTransactionTracker(TrackedTransactionStatus::Finalized),
)
}
#[test]
fn tx_tracker_update_when_nothing_is_updated() {
impl From<SubmittedHeadsStatus<TestParachainsPipeline>> for Option<BTreeSet<ParaId>> {
fn from(status: SubmittedHeadsStatus<TestParachainsPipeline>) -> Option<BTreeSet<ParaId>> {
match status {
SubmittedHeadsStatus::Waiting(tracker) => Some(tracker.awaiting_update),
_ => None,
}
}
}
#[async_std::test]
async fn tx_tracker_update_when_nothing_is_updated() {
assert_eq!(
test_tx_tracker()
.update(&vec![].into_iter().collect())
.map(|t| t.awaiting_update),
Some(test_tx_tracker().awaiting_update),
test_tx_tracker().update(&vec![].into_iter().collect()).await.into(),
);
}
#[test]
fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() {
#[async_std::test]
async fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() {
assert_eq!(
Some(test_tx_tracker().awaiting_update),
test_tx_tracker()
.update(
&vec![(
@@ -924,14 +975,15 @@ mod tests {
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
Some(test_tx_tracker().awaiting_update),
.await
.into(),
);
}
#[test]
fn tx_tracker_update_when_one_of_heads_is_updated() {
#[async_std::test]
async fn tx_tracker_update_when_one_of_heads_is_updated() {
assert_eq!(
Some(vec![ParaId(PARA_1_ID)].into_iter().collect::<BTreeSet<_>>()),
test_tx_tracker()
.update(
&vec![(
@@ -944,14 +996,15 @@ mod tests {
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
Some(vec![ParaId(PARA_1_ID)].into_iter().collect()),
.await
.into(),
);
}
#[test]
fn tx_tracker_update_when_all_heads_are_updated() {
#[async_std::test]
async fn tx_tracker_update_when_all_heads_are_updated() {
assert_eq!(
Option::<BTreeSet<_>>::None,
test_tx_tracker()
.update(
&vec![
@@ -973,21 +1026,33 @@ mod tests {
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
None,
.await
.into(),
);
}
#[test]
fn tx_tracker_update_when_tx_is_stalled() {
#[async_std::test]
async fn tx_tracker_update_when_tx_is_stalled() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.death_time = Instant::now();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
assert_eq!(
tx_tracker.update(&vec![].into_iter().collect()).map(|t| t.awaiting_update),
None,
Option::<BTreeSet<_>>::None,
tx_tracker.update(&vec![].into_iter().collect()).await.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_finalized() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Finalized).boxed().shared();
assert!(matches!(
tx_tracker.update(&vec![].into_iter().collect()).await,
SubmittedHeadsStatus::Waiting(_),
));
}
#[test]
fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
assert_eq!(