Migrate to jsonrpsee v2 (#787)

* POC jsonrpsee v2

* POC update ws client

* connect to eth nodes using ws

* fix for subscriptions

* reverted unncecessary changes

* reference jsonrpsee from crates.io

* fixed eth port in deployments

* fmt

* order deps

* remove unnecessary comment

* clone is no longer required for subscriptions

* treat RpcError::Internal as connection error

* resubscribe on terminate

* Update deployments/bridges/poa-rialto/entrypoints/poa-exchange-tx-generator-entrypoint.sh

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2021-03-05 00:00:26 +03:00
committed by Bastian Köcher
parent 6cfd87783e
commit a2b8bb191b
18 changed files with 178 additions and 131 deletions
@@ -47,8 +47,10 @@ use sp_runtime::{traits::Header as HeaderT, Justification};
use std::{collections::VecDeque, marker::PhantomData, task::Poll};
/// Substrate-to-Substrate headers synchronization maintain procedure.
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> {
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain>
{
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
justifications: Arc<Mutex<Justifications<P>>>,
_marker: PhantomData<SourceChain>,
@@ -56,20 +58,23 @@ pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline,
/// Future and already received justifications from the source chain.
struct Justifications<P: SubstrateHeadersSyncPipeline> {
/// Justifications stream.
stream: JustificationsSubscription,
/// Justifications stream. None if it hasn't been initialized yet, or it has been dropped
/// by the rpc library.
stream: Option<JustificationsSubscription>,
/// Justifications that we have read from the stream but have not sent to the
/// target node, because their targets were still not synced.
queue: VecDeque<(HeaderIdOf<P>, Justification)>,
}
impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain>
impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain>
SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
{
/// Create new maintain procedure.
pub fn new(pipeline: P, target_client: Client<TargetChain>, justifications: JustificationsSubscription) -> Self {
pub async fn new(pipeline: P, source_client: Client<SourceChain>, target_client: Client<TargetChain>) -> Self {
let justifications = subscribe_justifications(&source_client).await;
SubstrateHeadersToSubstrateMaintain {
pipeline,
source_client,
target_client,
justifications: Arc::new(Mutex::new(Justifications {
stream: justifications,
@@ -81,12 +86,13 @@ impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain>
}
#[async_trait]
impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> Clone
impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain> Clone
for SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
{
fn clone(&self) -> Self {
SubstrateHeadersToSubstrateMaintain {
pipeline: self.pipeline.clone(),
source_client: self.source_client.clone(),
target_client: self.target_client.clone(),
justifications: self.justifications.clone(),
_marker: Default::default(),
@@ -141,18 +147,23 @@ where
// Select justification to submit to the target node. We're submitting at most one justification
// on every maintain call. So maintain rate directly affects finalization rate.
let justification_to_submit = poll_fn(|context| {
let (resubscribe, justification_to_submit) = poll_fn(|context| {
// read justifications from the stream and push to the queue
justifications.read_from_stream::<SourceChain::Header>(context);
let resubscribe = !justifications.read_from_stream::<SourceChain::Header>(context);
// remove all obsolete justifications from the queue
remove_obsolete::<P>(&mut justifications.queue, best_finalized);
// select justification to submit
Poll::Ready(select_justification(&mut justifications.queue, sync))
Poll::Ready((resubscribe, select_justification(&mut justifications.queue, sync)))
})
.await;
// if justifications subscription has been dropped, resubscribe
if resubscribe {
justifications.stream = subscribe_justifications(&self.source_client).await;
}
// finally - submit selected justification
if let Some((target, justification)) = justification_to_submit {
let submit_result = self
@@ -187,20 +198,42 @@ where
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
/// Read justifications from the subscription stream without blocking.
fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>)
///
/// Returns `true` if justifications stream is still readable and `false` if it has been
/// dropped by the RPC crate && we need to resubscribe.
#[must_use]
fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) -> bool
where
SourceHeader: HeaderT,
SourceHeader::Number: Into<P::Number>,
SourceHeader::Hash: Into<P::Hash>,
{
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => return false,
};
loop {
let maybe_next_justification = self.stream.next();
let maybe_next_justification = stream.next();
futures::pin_mut!(maybe_next_justification);
let maybe_next_justification = maybe_next_justification.poll_unpin(context);
let justification = match maybe_next_justification {
Poll::Ready(justification) => justification,
Poll::Pending => return,
Poll::Pending => return true,
};
let justification = match justification {
Some(justification) => justification,
None => {
log::warn!(
target: "bridge",
"{} justifications stream has been dropped. Will be trying to resubscribe",
P::SOURCE_NAME,
);
return false;
}
};
// decode justification target
@@ -302,6 +335,31 @@ where
Ok(best_header_id)
}
/// Subscribe to justifications stream at source node.
async fn subscribe_justifications<C: Chain>(client: &Client<C>) -> Option<JustificationsSubscription> {
match client.subscribe_justifications().await {
Ok(source_justifications) => {
log::debug!(
target: "bridge",
"Successfully (re)subscribed to {} justifications",
C::NAME,
);
Some(source_justifications)
}
Err(error) => {
log::warn!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}",
C::NAME,
error,
);
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -139,25 +139,12 @@ pub async fn run<SourceChain, TargetChain, P>(
BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain,
{
let source_justifications = match source_client.clone().subscribe_justifications().await {
Ok(source_justifications) => source_justifications,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}",
SourceChain::NAME,
error,
);
return;
}
};
let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new(
pipeline.clone(),
source_client.clone(),
target_client.clone(),
source_justifications,
);
)
.await;
log::info!(
target: "bridge",