From 69d41127bc3c4eae5a166a5b77f74d7faa46029a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 21 Jul 2021 15:13:11 +0300 Subject: [PATCH] Fix Westend -> Millau sync (#1064) * read justifications from stream using channel + task that fills that channel * Arc -> Mutex --- bridges/relays/client-substrate/src/client.rs | 51 ++++++++++++++----- bridges/relays/finality/src/finality_loop.rs | 45 +++++++++++++++- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index 554810844a..7449dda100 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -23,8 +23,9 @@ use crate::{ConnectionParams, Error, Result}; use async_std::sync::{Arc, Mutex}; use codec::Decode; use frame_system::AccountInfo; +use futures::{SinkExt, StreamExt}; use jsonrpsee_ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, DeserializeOwned}; -use jsonrpsee_ws_client::{Subscription, WsClient as RpcClient, WsClientBuilder as RpcClientBuilder}; +use jsonrpsee_ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder}; use num_traits::{Bounded, Zero}; use pallet_balances::AccountData; use pallet_transaction_payment::InclusionFee; @@ -38,7 +39,7 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// Opaque justifications subscription type. -pub struct JustificationsSubscription(tokio::runtime::Handle, Arc>>); +pub struct JustificationsSubscription(Mutex>>); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; @@ -365,7 +366,7 @@ impl Client { /// Return new justifications stream. pub async fn subscribe_justifications(&self) -> Result { - let subscription = self + let mut subscription = self .jsonrpsee_execute(move |client| async move { Ok(client .subscribe( @@ -376,10 +377,38 @@ impl Client { .await?) }) .await?; - Ok(JustificationsSubscription( - self.tokio.handle().clone(), - Arc::new(Mutex::new(subscription)), - )) + let (mut sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); + self.tokio.spawn(async move { + loop { + match subscription.next().await { + Ok(Some(justification)) => { + if sender.send(Some(justification)).await.is_err() { + break; + } + } + Ok(None) => { + log::trace!( + target: "bridge", + "{} justifications subscription stream has returned None. Stream needs to be restarted.", + C::NAME, + ); + let _ = sender.send(None).await; + break; + } + Err(e) => { + log::trace!( + target: "bridge", + "{} justifications subscription stream has returned '{:?}'. Stream needs to be restarted.", + C::NAME, + e, + ); + let _ = sender.send(None).await; + break; + } + } + } + }); + Ok(JustificationsSubscription(Mutex::new(receiver))) } /// Execute jsonrpsee future in tokio context. @@ -399,10 +428,8 @@ impl Client { impl JustificationsSubscription { /// Return next justification from the subscription. pub async fn next(&self) -> Result> { - let subscription = self.1.clone(); - self.0 - .spawn(async move { subscription.lock().await.next().await }) - .await? - .map_err(Error::RpcError) + let mut receiver = self.0.lock().await; + let justification = receiver.next().await; + Ok(justification.unwrap_or(None)) } } diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 462632fa3f..4805e1536c 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -497,6 +497,9 @@ pub(crate) fn read_finality_proofs_from_stream, recent_finality_proofs: &mut FinalityProofs

, ) { + let mut proofs_count = 0; + let mut first_header_number = None; + let mut last_header_number = None; loop { let next_proof = finality_proofs_stream.stream.next(); let finality_proof = match next_proof.now_or_never() { @@ -508,7 +511,25 @@ pub(crate) fn read_finality_proofs_from_stream break, }; - recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof)); + let target_header_number = finality_proof.target_header_number(); + if first_header_number.is_none() { + first_header_number = Some(target_header_number); + } + last_header_number = Some(target_header_number); + proofs_count += 1; + + recent_finality_proofs.push((target_header_number, finality_proof)); + } + + if proofs_count != 0 { + log::trace!( + target: "bridge", + "Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]", + proofs_count, + P::SOURCE_NAME, + first_header_number, + last_header_number, + ); } } @@ -520,6 +541,12 @@ pub(crate) fn select_better_recent_finality_proof( selected_finality_proof: Option<(P::Header, P::FinalityProof)>, ) -> Option<(P::Header, P::FinalityProof)> { if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() { + log::trace!( + target: "bridge", + "Can not improve selected {} finality proof {:?}. No unjustified headers and recent proofs", + P::SOURCE_NAME, + selected_finality_proof.as_ref().map(|(h, _)| h.number()), + ); return selected_finality_proof; } @@ -543,7 +570,21 @@ pub(crate) fn select_better_recent_finality_proof( .binary_search_by_key(intersection.end(), |(number, _)| *number) .unwrap_or_else(|index| index.saturating_sub(1)); let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; - if !intersection.contains(selected_header_number) { + let has_selected_finality_proof = intersection.contains(selected_header_number); + log::trace!( + target: "bridge", + "Trying to improve selected {} finality proof {:?}. Headers range: [{:?}; {:?}]. Proofs range: [{:?}; {:?}].\ + Trying to improve to: {:?}. Result: {}", + P::SOURCE_NAME, + selected_finality_proof.as_ref().map(|(h, _)| h.number()), + unjustified_range_begin, + unjustified_range_end, + buffered_range_begin, + buffered_range_end, + selected_header_number, + if has_selected_finality_proof { "improved" } else { "failed" }, + ); + if !has_selected_finality_proof { return selected_finality_proof; }