Subscribe to justifications in Millau->Rialto headers sync (#394)

* maintain MillauHeadersToRialto sync by subscribing to Millau justifications

* more tracing in maintain

* Update relays/substrate/src/headers_maintain.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/substrate/src/headers_maintain.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* -Please

* -TODO

* revert raise recursion limit

* updated comment

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-10-26 19:36:29 +03:00
committed by Bastian Köcher
parent e2d9b6393d
commit 1b96e51679
19 changed files with 508 additions and 33 deletions
+2 -1
View File
@@ -292,7 +292,8 @@ impl pallet_sudo::Trait for Runtime {
} }
parameter_types! { parameter_types! {
pub const Period: BlockNumber = 4; /// Authorities are changing every 5 minutes.
pub const Period: BlockNumber = 5 * MINUTES;
pub const Offset: BlockNumber = 0; pub const Offset: BlockNumber = 0;
} }
+1 -1
View File
@@ -36,7 +36,7 @@ pub fn initial_header() -> Header {
Header { Header {
parent_hash: Default::default(), parent_hash: Default::default(),
number: Default::default(), number: Default::default(),
state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(), state_root: hex!("234a17bbd3fbaff8f0a799a6c8f0bdba1979e242fb2ed66d15945acb84947cbd").into(),
extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(), extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(),
digest: Default::default(), digest: Default::default(),
} }
@@ -44,6 +44,15 @@ pub enum Error {
InvalidPrecommitAncestries, InvalidPrecommitAncestries,
} }
/// Decode justification target.
pub fn decode_justification_target<Header: HeaderT>(
raw_justification: &[u8],
) -> Result<(Header::Hash, Header::Number), Error> {
GrandpaJustification::<Header>::decode(&mut &raw_justification[..])
.map(|justification| (justification.commit.target_hash, justification.commit.target_number))
.map_err(|_| Error::JustificationDecode)
}
/// Verify that justification, that is generated by given authority set, finalizes given header. /// Verify that justification, that is generated by given authority set, finalizes given header.
pub fn verify_justification<Header: HeaderT>( pub fn verify_justification<Header: HeaderT>(
finalized_target: (Header::Hash, Header::Number), finalized_target: (Header::Hash, Header::Number),
+2
View File
@@ -41,6 +41,8 @@ use sp_std::{marker::PhantomData, prelude::*};
// Re-export since the node uses these when configuring genesis // Re-export since the node uses these when configuring genesis
pub use storage::{AuthoritySet, ScheduledChange}; pub use storage::{AuthoritySet, ScheduledChange};
pub use justification::decode_justification_target;
mod justification; mod justification;
mod storage; mod storage;
mod storage_proof; mod storage_proof;
+2
View File
@@ -62,6 +62,8 @@ impl Chain for Millau {
/// Name of the `MillauHeaderApi::best_block` runtime method. /// Name of the `MillauHeaderApi::best_block` runtime method.
pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block"; pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block";
/// Name of the `MillauHeaderApi::finalized_block` runtime method.
pub const FINALIZED_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_finalized_block";
/// Name of the `MillauHeaderApi::is_known_block` runtime method. /// Name of the `MillauHeaderApi::is_known_block` runtime method.
pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block"; pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block";
/// Name of the `MillauHeaderApi::incomplete_headers` runtime method. /// Name of the `MillauHeaderApi::incomplete_headers` runtime method.
@@ -254,6 +254,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
consts::ETHEREUM_TICK_INTERVAL, consts::ETHEREUM_TICK_INTERVAL,
target, target,
consts::SUBSTRATE_TICK_INTERVAL, consts::SUBSTRATE_TICK_INTERVAL,
(),
sync_params, sync_params,
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
@@ -177,6 +177,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
consts::SUBSTRATE_TICK_INTERVAL, consts::SUBSTRATE_TICK_INTERVAL,
target, target,
consts::ETHEREUM_TICK_INTERVAL, consts::ETHEREUM_TICK_INTERVAL,
(),
sync_params, sync_params,
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
@@ -496,6 +496,11 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
} }
} }
/// Returns true if given header requires completion data.
pub fn requires_completion_data(&self, id: &HeaderIdOf<P>) -> bool {
self.incomplete_headers.contains_key(id)
}
/// Returns id of the header for which we want to fetch completion data. /// Returns id of the header for which we want to fetch completion data.
pub fn incomplete_header(&mut self) -> Option<HeaderIdOf<P>> { pub fn incomplete_header(&mut self) -> Option<HeaderIdOf<P>> {
queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| { queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| {
+35 -7
View File
@@ -16,7 +16,7 @@
//! Entrypoint for running headers synchronization loop. //! Entrypoint for running headers synchronization loop.
use crate::sync::HeadersSyncParams; use crate::sync::{HeadersSync, HeadersSyncParams};
use crate::sync_loop_metrics::SyncLoopMetrics; use crate::sync_loop_metrics::SyncLoopMetrics;
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}; use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};
@@ -48,10 +48,12 @@ const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// Delay after we have seen update of best source header at target node, /// Delay after we have seen update of best source header at target node,
/// for us to treat sync stalled. ONLY when relay operates in backup mode. /// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Interval between calling sync maintain procedure.
const MAINTAIN_INTERVAL: Duration = Duration::from_secs(30);
/// Source client trait. /// Source client trait.
#[async_trait] #[async_trait]
pub trait SourceClient<P: HeadersSyncPipeline>: Sized { pub trait SourceClient<P: HeadersSyncPipeline> {
/// Type of error this clients returns. /// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError; type Error: std::fmt::Debug + MaybeConnectionError;
@@ -78,7 +80,7 @@ pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
/// Target client trait. /// Target client trait.
#[async_trait] #[async_trait]
pub trait TargetClient<P: HeadersSyncPipeline>: Sized { pub trait TargetClient<P: HeadersSyncPipeline> {
/// Type of error this clients returns. /// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError; type Error: std::fmt::Debug + MaybeConnectionError;
@@ -102,12 +104,24 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
async fn requires_extra(&self, header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, bool), Self::Error>; async fn requires_extra(&self, header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
} }
/// Synchronization maintain procedure.
#[async_trait]
pub trait SyncMaintain<P: HeadersSyncPipeline>: Send + Sync {
/// Run custom maintain procedures. This is guaranteed to be called when both source and target
/// clients are unoccupied.
async fn maintain(&self, _sync: &mut HeadersSync<P>) {}
}
impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {}
/// Run headers synchronization. /// Run headers synchronization.
#[allow(clippy::too_many_arguments)]
pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>( pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
source_tick: Duration, source_tick: Duration,
target_client: TC, target_client: TC,
target_tick: Duration, target_tick: Duration,
sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams, sync_params: HeadersSyncParams,
metrics_params: Option<MetricsParams>, metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()>,
@@ -116,7 +130,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut progress_context = (Instant::now(), None, None); let mut progress_context = (Instant::now(), None, None);
local_pool.run_until(async move { local_pool.run_until(async move {
let mut sync = crate::sync::HeadersSync::<P>::new(sync_params); let mut sync = HeadersSync::<P>::new(sync_params);
let mut stall_countdown = None; let mut stall_countdown = None;
let mut last_update_time = Instant::now(); let mut last_update_time = Instant::now();
@@ -154,6 +168,9 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let target_go_offline_future = futures::future::Fuse::terminated(); let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick).fuse(); let target_tick_stream = interval(target_tick).fuse();
let mut maintain_required = false;
let maintain_stream = interval(MAINTAIN_INTERVAL).fuse();
let exit_signal = exit_signal.fuse(); let exit_signal = exit_signal.fuse();
futures::pin_mut!( futures::pin_mut!(
@@ -172,6 +189,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_complete_header_future, target_complete_header_future,
target_go_offline_future, target_go_offline_future,
target_tick_stream, target_tick_stream,
maintain_stream,
exit_signal exit_signal
); );
@@ -373,6 +391,9 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_incomplete_headers_required = true; target_incomplete_headers_required = true;
}, },
_ = maintain_stream.next() => {
maintain_required = true;
},
_ = exit_signal => { _ = exit_signal => {
return; return;
} }
@@ -387,9 +408,16 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
// print progress // print progress
progress_context = print_sync_progress(progress_context, &sync); progress_context = print_sync_progress(progress_context, &sync);
// run maintain procedures
if maintain_required && source_client_is_online && target_client_is_online {
log::debug!(target: "bridge", "Maintaining headers sync loop");
maintain_required = false;
sync_maintain.maintain(&mut sync).await;
}
// If the target client is accepting requests we update the requests that // If the target client is accepting requests we update the requests that
// we want it to run // we want it to run
if target_client_is_online { if !maintain_required && target_client_is_online {
// NOTE: Is is important to reset this so that we only have one // NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents // request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same // race conditions like receiving two transactions with the same
@@ -476,7 +504,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
// If the source client is accepting requests we update the requests that // If the source client is accepting requests we update the requests that
// we want it to run // we want it to run
if source_client_is_online { if !maintain_required && source_client_is_online {
// NOTE: Is is important to reset this so that we only have one // NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents // request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same // race conditions like receiving two transactions with the same
@@ -561,7 +589,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
/// Print synchronization progress. /// Print synchronization progress.
fn print_sync_progress<P: HeadersSyncPipeline>( fn print_sync_progress<P: HeadersSyncPipeline>(
progress_context: (Instant, Option<P::Number>, Option<P::Number>), progress_context: (Instant, Option<P::Number>, Option<P::Number>),
eth_sync: &crate::sync::HeadersSync<P>, eth_sync: &HeadersSync<P>,
) -> (Instant, Option<P::Number>, Option<P::Number>) { ) -> (Instant, Option<P::Number>, Option<P::Number>) {
let (prev_time, prev_best_header, prev_target_header) = progress_context; let (prev_time, prev_best_header, prev_target_header) = progress_context;
let now_time = Instant::now(); let now_time = Instant::now();
@@ -479,6 +479,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
test_tick(), test_tick(),
target, target,
test_tick(), test_tick(),
(),
crate::sync::tests::default_sync_params(), crate::sync::tests::default_sync_params(),
None, None,
exit_receiver.into_future().map(|(_, _)| ()), exit_receiver.into_future().map(|(_, _)| ()),
@@ -76,7 +76,7 @@ pub trait HeadersSyncPipeline: Clone + Send + Sync {
/// 4) header and extra data are submitted in single transaction. /// 4) header and extra data are submitted in single transaction.
/// ///
/// Example: Ethereum transactions receipts. /// Example: Ethereum transactions receipts.
type Extra: Clone + PartialEq + std::fmt::Debug; type Extra: Clone + Send + Sync + PartialEq + std::fmt::Debug;
/// Type of data required to 'complete' header that we're receiving from the source node: /// Type of data required to 'complete' header that we're receiving from the source node:
/// 1) completion data is required for some headers; /// 1) completion data is required for some headers;
/// 2) target node can't answer if it'll require completion data before header is accepted; /// 2) target node can't answer if it'll require completion data before header is accepted;
@@ -84,7 +84,7 @@ pub trait HeadersSyncPipeline: Clone + Send + Sync {
/// 4) header and completion data are submitted in separate transactions. /// 4) header and completion data are submitted in separate transactions.
/// ///
/// Example: Substrate GRANDPA justifications. /// Example: Substrate GRANDPA justifications.
type Completion: Clone + std::fmt::Debug; type Completion: Clone + Send + Sync + std::fmt::Debug;
/// Function used to estimate size of target-encoded header. /// Function used to estimate size of target-encoded header.
fn estimate_size(source: &QueuedHeader<Self>) -> usize; fn estimate_size(source: &QueuedHeader<Self>) -> usize;
+16 -1
View File
@@ -23,12 +23,15 @@ use crate::{ConnectionParams, Result};
use jsonrpsee::common::DeserializeOwned; use jsonrpsee::common::DeserializeOwned;
use jsonrpsee::raw::RawClient; use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::ws::WsTransportClient; use jsonrpsee::transport::ws::WsTransportClient;
use jsonrpsee::Client as RpcClient; use jsonrpsee::{client::Subscription, Client as RpcClient};
use num_traits::Zero; use num_traits::Zero;
use sp_core::Bytes; use sp_core::Bytes;
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
/// Opaque justifications subscription type.
pub type JustificationsSubscription = Subscription<Bytes>;
/// Opaque GRANDPA authorities set. /// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>; pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
@@ -135,4 +138,16 @@ where
.await .await
.map_err(Into::into) .map_err(Into::into)
} }
/// Return new justifications stream.
pub async fn subscribe_justifications(self) -> Result<JustificationsSubscription> {
Ok(self
.client
.subscribe(
"grandpa_subscribeJustifications",
jsonrpsee::common::Params::None,
"grandpa_unsubscribeJustifications",
)
.await?)
}
} }
+1 -1
View File
@@ -29,7 +29,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub enum Error { pub enum Error {
/// Web socket connection error. /// Web socket connection error.
WsConnectionError(WsNewDnsError), WsConnectionError(WsNewDnsError),
/// An error that can occur when making an HTTP request to /// An error that can occur when making a request to
/// an JSON-RPC server. /// an JSON-RPC server.
Request(RequestError), Request(RequestError),
/// The response from the server could not be SCALE decoded. /// The response from the server could not be SCALE decoded.
+1 -1
View File
@@ -26,7 +26,7 @@ mod rpc;
pub mod headers_source; pub mod headers_source;
pub use crate::chain::{BlockWithJustification, Chain, TransactionSignScheme}; pub use crate::chain::{BlockWithJustification, Chain, TransactionSignScheme};
pub use crate::client::{Client, OpaqueGrandpaAuthoritiesSet}; pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet};
pub use crate::error::{Error, Result}; pub use crate::error::{Error, Result};
pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf}; pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf};
+1
View File
@@ -20,6 +20,7 @@ bp-millau = { path = "../../primitives/millau" }
bp-rialto = { path = "../../primitives/rialto" } bp-rialto = { path = "../../primitives/rialto" }
headers-relay = { path = "../headers-relay" } headers-relay = { path = "../headers-relay" }
messages-relay = { path = "../messages-relay" } messages-relay = { path = "../messages-relay" }
pallet-substrate-bridge = { path = "../../modules/substrate" }
relay-millau-client = { path = "../millau-client" } relay-millau-client = { path = "../millau-client" }
relay-rialto-client = { path = "../rialto-client" } relay-rialto-client = { path = "../rialto-client" }
relay-substrate-client = { path = "../substrate-client" } relay-substrate-client = { path = "../substrate-client" }
@@ -0,0 +1,382 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate-to-Substrate headers synchronization maintain procedure.
//!
//! Regular headers synchronization only depends on persistent justifications
//! that are generated when authorities set changes. This happens rarely on
//! real-word chains. So some other way to finalize headers is required.
//!
//! Full nodes are listening to GRANDPA messages, so they may have track authorities
//! votes on their own. They're returning both persistent and ephemeral justifications
//! (justifications that are not stored in the database and not broadcasted over network)
//! throught `grandpa_subscribeJustifications` RPC subscription.
//!
//! The idea of this maintain procedure is that when we see justification that 'improves'
//! best finalized header on the target chain, we submit this justification to the target
//! node.
use crate::headers_target::SubstrateHeadersSyncPipeline;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use codec::{Decode, Encode};
use futures::future::{poll_fn, FutureExt, TryFutureExt};
use headers_relay::{
sync::HeadersSync,
sync_loop::SyncMaintain,
sync_types::{HeaderIdOf, HeaderStatus},
};
use relay_substrate_client::{Chain, Client, Error as SubstrateError, JustificationsSubscription};
use relay_utils::HeaderId;
use sp_core::Bytes;
use sp_runtime::{traits::Header as HeaderT, DeserializeOwned, Justification};
use std::{collections::VecDeque, task::Poll};
/// Substrate-to-Substrate headers synchronization maintain procedure.
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, C: Chain> {
pipeline: P,
target_client: Client<C>,
justifications: Arc<Mutex<Justifications<P>>>,
}
/// Future and already received justifications from the source chain.
struct Justifications<P: SubstrateHeadersSyncPipeline> {
/// Justifications stream.
stream: JustificationsSubscription,
/// Justifications that we have read from the stream but have not sent to the
/// target node, because their targets were still not synced.
queue: VecDeque<(HeaderIdOf<P>, Justification)>,
}
impl<P: SubstrateHeadersSyncPipeline, C: Chain> SubstrateHeadersToSubstrateMaintain<P, C> {
/// Create new maintain procedure.
pub fn new(pipeline: P, target_client: Client<C>, justifications: JustificationsSubscription) -> Self {
SubstrateHeadersToSubstrateMaintain {
pipeline,
target_client,
justifications: Arc::new(Mutex::new(Justifications {
stream: justifications,
queue: VecDeque::new(),
})),
}
}
}
#[async_trait]
impl<P, C> SyncMaintain<P> for SubstrateHeadersToSubstrateMaintain<P, C>
where
C: Chain,
C::Header: DeserializeOwned,
C::Index: DeserializeOwned,
P::Number: Decode + From<C::BlockNumber>,
P::Hash: Decode + From<C::Hash>,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
async fn maintain(&self, sync: &mut HeadersSync<P>) {
// lock justifications before doing anything else
let mut justifications = match self.justifications.try_lock() {
Some(justifications) => justifications,
None => {
// this should never happen, as we use single-thread executor
log::warn!(target: "bridge", "Failed to acquire {} justifications lock", P::SOURCE_NAME);
return;
}
};
// we need to read best finalized header from the target node to be able to
// choose justification to submit
let best_finalized = match best_finalized_header_id::<P, _>(&self.target_client).await {
Ok(best_finalized) => best_finalized,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to read best finalized {} block from maintain: {:?}",
P::SOURCE_NAME,
error,
);
return;
}
};
log::debug!(
target: "bridge",
"Read best finalized {} block from {}: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_finalized,
);
// Select justification to submit to the target node. We're submitting at most one justification
// on every maintain call. So maintain rate directly affects finalization rate.
let justification_to_submit = poll_fn(|context| {
// read justifications from the stream and push to the queue
justifications.read_from_stream::<C::Header>(context);
// remove all obsolete justifications from the queue
remove_obsolete::<P>(&mut justifications.queue, best_finalized);
// select justification to submit
Poll::Ready(select_justification(&mut justifications.queue, sync))
})
.await;
// finally - submit selected justification
if let Some((target, justification)) = justification_to_submit {
let submit_result = self
.pipeline
.make_complete_header_transaction(target, justification)
.and_then(|tx| self.target_client.submit_extrinsic(Bytes(tx.encode())))
.await;
match submit_result {
Ok(_) => log::debug!(
target: "bridge",
"Submitted justification received over {} subscription. Target: {:?}",
P::SOURCE_NAME,
target,
),
Err(error) => log::warn!(
target: "bridge",
"Failed to submit justification received over {} subscription for {:?}: {:?}",
P::SOURCE_NAME,
target,
error,
),
}
}
}
}
impl<P> Justifications<P>
where
P::Number: Decode,
P::Hash: Decode,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
/// Read justifications from the subscription stream without blocking.
fn read_from_stream<'a, Header>(&mut self, context: &mut std::task::Context<'a>)
where
Header: HeaderT,
Header::Number: Into<P::Number>,
Header::Hash: Into<P::Hash>,
{
loop {
let maybe_next_justification = self.stream.next();
futures::pin_mut!(maybe_next_justification);
let maybe_next_justification = maybe_next_justification.poll_unpin(context);
let justification = match maybe_next_justification {
Poll::Ready(justification) => justification,
Poll::Pending => return,
};
// decode justification target
let target = pallet_substrate_bridge::decode_justification_target::<Header>(&justification);
let target = match target {
Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to decode justification from {} subscription: {:?}",
P::SOURCE_NAME,
error,
);
continue;
}
};
log::debug!(
target: "bridge",
"Received {} justification over subscription. Target: {:?}",
P::SOURCE_NAME,
target,
);
self.queue.push_back((target, justification.0));
}
}
}
/// Clean queue of all justifications that are justifying already finalized blocks.
fn remove_obsolete<P: SubstrateHeadersSyncPipeline>(
queue: &mut VecDeque<(HeaderIdOf<P>, Justification)>,
best_finalized: HeaderIdOf<P>,
) {
while queue
.front()
.map(|(target, _)| target.0 <= best_finalized.0)
.unwrap_or(false)
{
queue.pop_front();
}
}
/// Select appropriate justification that would improve best finalized block on target node.
///
/// It is assumed that the selected justification will be submitted to the target node. The
/// justification itself and all preceeding justifications are removed from the queue.
fn select_justification<P>(
queue: &mut VecDeque<(HeaderIdOf<P>, Justification)>,
sync: &mut HeadersSync<P>,
) -> Option<(HeaderIdOf<P>, Justification)>
where
P: SubstrateHeadersSyncPipeline<Completion = Justification>,
{
let mut selected_justification = None;
while let Some((target, justification)) = queue.pop_front() {
// if we're waiting for this justification, report it
if sync.headers().requires_completion_data(&target) {
sync.headers_mut().completion_response(&target, Some(justification));
// we won't submit previous justifications as we going to submit justification for
// next header
selected_justification = None;
// we won't submit next justifications as we need to submit previous justifications
// first
break;
}
// if we know that the header is already synced (it is known to the target node), let's
// select it for submission. We still may select better justification on the next iteration.
if sync.headers().status(&target) == HeaderStatus::Synced {
selected_justification = Some((target, justification));
continue;
}
// finally - return justification back to the queue
queue.push_back((target, justification));
break;
}
selected_justification
}
/// Returns best finalized source header on the target chain.
async fn best_finalized_header_id<P, C>(client: &Client<C>) -> Result<HeaderIdOf<P>, SubstrateError>
where
P: SubstrateHeadersSyncPipeline,
P::Number: Decode + From<C::BlockNumber>,
P::Hash: Decode + From<C::Hash>,
C: Chain,
C::Header: DeserializeOwned,
C::Index: DeserializeOwned,
{
let call = P::FINALIZED_BLOCK_METHOD.into();
let data = Bytes(Vec::new());
let encoded_response = client.state_call(call, data, None).await?;
let decoded_response: (C::BlockNumber, C::Hash) =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
let best_header_id = HeaderId(decoded_response.0.into(), decoded_response.1.into());
Ok(best_header_id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::millau_headers_to_rialto::{sync_params, MillauHeadersToRialto};
fn parent_hash(index: u8) -> bp_millau::Hash {
if index == 1 {
Default::default()
} else {
header(index - 1).hash()
}
}
fn header_hash(index: u8) -> bp_millau::Hash {
header(index).hash()
}
fn header(index: u8) -> bp_millau::Header {
bp_millau::Header::new(
index as _,
Default::default(),
Default::default(),
parent_hash(index),
Default::default(),
)
}
#[test]
fn obsolete_justifications_are_removed() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
remove_obsolete::<MillauHeadersToRialto>(&mut queue, HeaderId(2, header_hash(2)));
assert_eq!(
queue,
vec![(HeaderId(3, header_hash(3)), vec![3])]
.into_iter()
.collect::<VecDeque<_>>(),
);
}
#[test]
fn latest_justification_is_selected() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
let mut sync = HeadersSync::<MillauHeadersToRialto>::new(sync_params());
sync.headers_mut().header_response(header(1).into());
sync.headers_mut().header_response(header(2).into());
sync.headers_mut().header_response(header(3).into());
sync.target_best_header_response(HeaderId(2, header_hash(2)));
assert_eq!(
select_justification(&mut queue, &mut sync),
Some((HeaderId(2, header_hash(2)), vec![2])),
);
}
#[test]
fn required_justification_is_reported() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
let mut sync = HeadersSync::<MillauHeadersToRialto>::new(sync_params());
sync.headers_mut().header_response(header(1).into());
sync.headers_mut().header_response(header(2).into());
sync.headers_mut().header_response(header(3).into());
sync.headers_mut()
.incomplete_headers_response(vec![HeaderId(2, header_hash(2))].into_iter().collect());
sync.target_best_header_response(HeaderId(2, header_hash(2)));
assert_eq!(sync.headers_mut().header_to_complete(), None,);
assert_eq!(select_justification(&mut queue, &mut sync), None,);
assert_eq!(
sync.headers_mut().header_to_complete(),
Some((HeaderId(2, header_hash(2)), &vec![2])),
);
}
}
@@ -36,6 +36,8 @@ use std::collections::HashSet;
pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline { pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline {
/// Name of the `best_block` runtime method. /// Name of the `best_block` runtime method.
const BEST_BLOCK_METHOD: &'static str; const BEST_BLOCK_METHOD: &'static str;
/// Name of the `finalized_block` runtime method.
const FINALIZED_BLOCK_METHOD: &'static str;
/// Name of the `is_known_block` runtime method. /// Name of the `is_known_block` runtime method.
const IS_KNOWN_BLOCK_METHOD: &'static str; const IS_KNOWN_BLOCK_METHOD: &'static str;
/// Name of the `incomplete_headers` runtime method. /// Name of the `incomplete_headers` runtime method.
+2 -1
View File
@@ -28,6 +28,7 @@ pub type MillauClient = relay_substrate_client::Client<relay_millau_client::Mill
pub type RialtoClient = relay_substrate_client::Client<relay_rialto_client::Rialto>; pub type RialtoClient = relay_substrate_client::Client<relay_rialto_client::Rialto>;
mod cli; mod cli;
mod headers_maintain;
mod headers_target; mod headers_target;
mod millau_headers_to_rialto; mod millau_headers_to_rialto;
@@ -63,7 +64,7 @@ async fn run_command(command: cli::Command) -> Result<(), String> {
rialto_sign.rialto_signer_password.as_deref(), rialto_sign.rialto_signer_password.as_deref(),
) )
.map_err(|e| format!("Failed to parse rialto-signer: {:?}", e))?; .map_err(|e| format!("Failed to parse rialto-signer: {:?}", e))?;
millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()); millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await;
} }
} }
@@ -17,12 +17,16 @@
//! Millau-to-Rialto headers sync entrypoint. //! Millau-to-Rialto headers sync entrypoint.
use crate::{ use crate::{
headers_maintain::SubstrateHeadersToSubstrateMaintain,
headers_target::{SubstrateHeadersSyncPipeline, SubstrateHeadersTarget}, headers_target::{SubstrateHeadersSyncPipeline, SubstrateHeadersTarget},
MillauClient, RialtoClient, MillauClient, RialtoClient,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bp_millau::{BEST_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, IS_KNOWN_MILLAU_BLOCK_METHOD}; use bp_millau::{
BEST_MILLAU_BLOCK_METHOD, FINALIZED_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD,
IS_KNOWN_MILLAU_BLOCK_METHOD,
};
use codec::Encode; use codec::Encode;
use headers_relay::{ use headers_relay::{
sync::{HeadersSyncParams, TargetTransactionMode}, sync::{HeadersSyncParams, TargetTransactionMode},
@@ -39,7 +43,7 @@ use std::time::Duration;
/// Millau-to-Rialto headers pipeline. /// Millau-to-Rialto headers pipeline.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct MillauHeadersToRialto { pub struct MillauHeadersToRialto {
client: RialtoClient, client: RialtoClient,
sign: RialtoSigningParams, sign: RialtoSigningParams,
} }
@@ -62,6 +66,7 @@ impl HeadersSyncPipeline for MillauHeadersToRialto {
#[async_trait] #[async_trait]
impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto { impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto {
const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCK_METHOD; const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCK_METHOD;
const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_MILLAU_BLOCK_METHOD;
const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD; const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD;
const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD; const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD;
@@ -100,8 +105,20 @@ type MillauSourceClient = HeadersSource<Millau, MillauHeadersToRialto>;
/// Rialto node as headers target. /// Rialto node as headers target.
type RialtoTargetClient = SubstrateHeadersTarget<Rialto, MillauHeadersToRialto>; type RialtoTargetClient = SubstrateHeadersTarget<Rialto, MillauHeadersToRialto>;
/// Return sync parameters for Millau-to-Rialto headers sync.
pub fn sync_params() -> HeadersSyncParams {
HeadersSyncParams {
max_future_headers_to_download: 32,
max_headers_in_submitted_status: 8,
max_headers_in_single_submit: 1,
max_headers_size_in_single_submit: 1024 * 1024,
prune_depth: 256,
target_tx_mode: TargetTransactionMode::Signed,
}
}
/// Run Millau-to-Rialto headers sync. /// Run Millau-to-Rialto headers sync.
pub fn run( pub async fn run(
millau_client: MillauClient, millau_client: MillauClient,
rialto_client: RialtoClient, rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams, rialto_sign: RialtoSigningParams,
@@ -109,27 +126,34 @@ pub fn run(
) { ) {
let millau_tick = Duration::from_secs(5); let millau_tick = Duration::from_secs(5);
let rialto_tick = Duration::from_secs(5); let rialto_tick = Duration::from_secs(5);
let sync_params = HeadersSyncParams {
max_future_headers_to_download: 32, let millau_justifications = match millau_client.clone().subscribe_justifications().await {
max_headers_in_submitted_status: 8, Ok(millau_justifications) => millau_justifications,
max_headers_in_single_submit: 1, Err(error) => {
max_headers_size_in_single_submit: 1024 * 1024, log::warn!(
prune_depth: 256, target: "bridge",
target_tx_mode: TargetTransactionMode::Signed, "Failed to subscribe to Millau justifications: {:?}",
error,
);
return;
}
}; };
let pipeline = MillauHeadersToRialto {
client: rialto_client.clone(),
sign: rialto_sign,
};
let sync_maintain =
SubstrateHeadersToSubstrateMaintain::new(pipeline.clone(), rialto_client.clone(), millau_justifications);
headers_relay::sync_loop::run( headers_relay::sync_loop::run(
MillauSourceClient::new(millau_client), MillauSourceClient::new(millau_client),
millau_tick, millau_tick,
RialtoTargetClient::new( RialtoTargetClient::new(rialto_client, pipeline),
rialto_client.clone(),
MillauHeadersToRialto {
client: rialto_client,
sign: rialto_sign,
},
),
rialto_tick, rialto_tick,
sync_params, sync_maintain,
sync_params(),
metrics_params, metrics_params,
futures::future::pending(), futures::future::pending(),
); );