Fix Westend -> Millau sync (#1064)

* read justifications from stream using channel + task that fills that channel

* Arc<Mutex> -> Mutex
This commit is contained in:
Svyatoslav Nikolsky
2021-07-21 15:13:11 +03:00
committed by Bastian Köcher
parent 09d30894d1
commit 69d41127bc
2 changed files with 82 additions and 14 deletions
+39 -12
View File
@@ -23,8 +23,9 @@ use crate::{ConnectionParams, Error, Result};
use async_std::sync::{Arc, Mutex};
use codec::Decode;
use frame_system::AccountInfo;
use futures::{SinkExt, StreamExt};
use jsonrpsee_ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, DeserializeOwned};
use jsonrpsee_ws_client::{Subscription, WsClient as RpcClient, WsClientBuilder as RpcClientBuilder};
use jsonrpsee_ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder};
use num_traits::{Bounded, Zero};
use pallet_balances::AccountData;
use pallet_transaction_payment::InclusionFee;
@@ -38,7 +39,7 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub struct JustificationsSubscription(tokio::runtime::Handle, Arc<Mutex<Subscription<Bytes>>>);
pub struct JustificationsSubscription(Mutex<futures::channel::mpsc::Receiver<Option<Bytes>>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
@@ -365,7 +366,7 @@ impl<C: Chain> Client<C> {
/// Return new justifications stream.
pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> {
let subscription = self
let mut subscription = self
.jsonrpsee_execute(move |client| async move {
Ok(client
.subscribe(
@@ -376,10 +377,38 @@ impl<C: Chain> Client<C> {
.await?)
})
.await?;
Ok(JustificationsSubscription(
self.tokio.handle().clone(),
Arc::new(Mutex::new(subscription)),
))
let (mut sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(async move {
loop {
match subscription.next().await {
Ok(Some(justification)) => {
if sender.send(Some(justification)).await.is_err() {
break;
}
}
Ok(None) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned None. Stream needs to be restarted.",
C::NAME,
);
let _ = sender.send(None).await;
break;
}
Err(e) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned '{:?}'. Stream needs to be restarted.",
C::NAME,
e,
);
let _ = sender.send(None).await;
break;
}
}
}
});
Ok(JustificationsSubscription(Mutex::new(receiver)))
}
/// Execute jsonrpsee future in tokio context.
@@ -399,10 +428,8 @@ impl<C: Chain> Client<C> {
impl JustificationsSubscription {
/// Return next justification from the subscription.
pub async fn next(&self) -> Result<Option<Bytes>> {
let subscription = self.1.clone();
self.0
.spawn(async move { subscription.lock().await.next().await })
.await?
.map_err(Error::RpcError)
let mut receiver = self.0.lock().await;
let justification = receiver.next().await;
Ok(justification.unwrap_or(None))
}
}