Extract minimal Rialto-Sub and Millau-Sub clients (#365)

* extracting sub clients

* fmt + lost docs

* revert enum BridgeInstance

* apply suggestions from review

* explicite debug impl

* remove unused imports from Millau

* fix typo

* fix instance + API name

* Update relays/ethereum/src/ethereum_sync_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* separate crates for millau and rialto client

* cargo fmt

* fix

* fmt

* remove no_std support

* fix compilation again

* Update relays/substrate-client/Cargo.toml

* Update relay clients to Substrate 2.0

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Hernando Castano <castano.ha@gmail.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-09-28 23:49:15 +03:00
committed by Bastian Köcher
parent 7f7d62d813
commit 52c1913fff
26 changed files with 1042 additions and 751 deletions
@@ -14,29 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate -> Ethereum synchronization.
//! Rialto-Substrate -> Ethereum PoA synchronization.
use crate::ethereum_client::EthereumHighLevelRpc;
use crate::instances::BridgeInstance;
use crate::rpc::SubstrateRpc;
use crate::rpc_errors::RpcError;
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
use crate::substrate_types::{
GrandpaJustification, Hash, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline,
SubstrateSyncHeader as Header,
};
use async_trait::async_trait;
use codec::Encode;
use headers_relay::{
sync::HeadersSyncParams,
sync_loop::{SourceClient, TargetClient},
sync_types::{SourceHeader, SubmittedHeaders},
sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders},
};
use relay_ethereum_client::{
types::Address, Client as EthereumClient, ConnectionParams as EthereumConnectionParams,
SigningParams as EthereumSigningParams,
};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SyncHeader as RialtoSyncHeader};
use relay_substrate_client::{Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams};
use relay_utils::metrics::MetricsParams;
use sp_runtime::Justification;
use std::fmt::Debug;
use std::{collections::HashSet, time::Duration};
@@ -71,18 +68,39 @@ pub struct SubstrateSyncParams {
pub sync_params: HeadersSyncParams,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
/// Instance of the bridge pallet being synchronized.
pub instance: Box<dyn BridgeInstance>,
}
/// Substrate synchronization pipeline.
#[derive(Clone, Copy, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct SubstrateHeadersSyncPipeline;
impl HeadersSyncPipeline for SubstrateHeadersSyncPipeline {
const SOURCE_NAME: &'static str = "Substrate";
const TARGET_NAME: &'static str = "Ethereum";
type Hash = rialto_runtime::Hash;
type Number = rialto_runtime::BlockNumber;
type Header = RialtoSyncHeader;
type Extra = ();
type Completion = Justification;
fn estimate_size(source: &QueuedHeader<Self>) -> usize {
source.header().encode().len()
}
}
/// Queued substrate header ID.
pub type QueuedRialtoHeader = QueuedHeader<SubstrateHeadersSyncPipeline>;
/// Substrate client as headers source.
struct SubstrateHeadersSource {
/// Substrate node client.
client: SubstrateRpcClient,
client: SubstrateClient<Rialto>,
}
impl SubstrateHeadersSource {
fn new(client: SubstrateRpcClient) -> Self {
fn new(client: SubstrateClient<Rialto>) -> Self {
Self { client }
}
}
@@ -91,22 +109,30 @@ impl SubstrateHeadersSource {
impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
type Error = RpcError;
async fn best_block_number(&self) -> Result<Number, Self::Error> {
async fn best_block_number(&self) -> Result<rialto_runtime::BlockNumber, Self::Error> {
Ok(self.client.best_header().await?.number)
}
async fn header_by_hash(&self, hash: Hash) -> Result<Header, Self::Error> {
self.client.header_by_hash(hash).await.map(Into::into)
async fn header_by_hash(&self, hash: rialto_runtime::Hash) -> Result<RialtoSyncHeader, Self::Error> {
self.client
.header_by_hash(hash)
.await
.map(Into::into)
.map_err(Into::into)
}
async fn header_by_number(&self, number: Number) -> Result<Header, Self::Error> {
self.client.header_by_number(number).await.map(Into::into)
async fn header_by_number(&self, number: rialto_runtime::BlockNumber) -> Result<RialtoSyncHeader, Self::Error> {
self.client
.header_by_number(number)
.await
.map(Into::into)
.map_err(Into::into)
}
async fn header_completion(
&self,
id: SubstrateHeaderId,
) -> Result<(SubstrateHeaderId, Option<GrandpaJustification>), Self::Error> {
id: RialtoHeaderId,
) -> Result<(RialtoHeaderId, Option<Justification>), Self::Error> {
let hash = id.1;
let signed_block = self.client.get_block(Some(hash)).await?;
let grandpa_justification = signed_block.justification;
@@ -116,9 +142,9 @@ impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
async fn header_extra(
&self,
id: SubstrateHeaderId,
_header: QueuedSubstrateHeader,
) -> Result<(SubstrateHeaderId, ()), Self::Error> {
id: RialtoHeaderId,
_header: QueuedRialtoHeader,
) -> Result<(RialtoHeaderId, ()), Self::Error> {
Ok((id, ()))
}
}
@@ -147,38 +173,35 @@ impl EthereumHeadersTarget {
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
type Error = RpcError;
async fn best_header_id(&self) -> Result<SubstrateHeaderId, Self::Error> {
async fn best_header_id(&self) -> Result<RialtoHeaderId, Self::Error> {
self.client.best_substrate_block(self.contract).await
}
async fn is_known_header(&self, id: SubstrateHeaderId) -> Result<(SubstrateHeaderId, bool), Self::Error> {
async fn is_known_header(&self, id: RialtoHeaderId) -> Result<(RialtoHeaderId, bool), Self::Error> {
self.client.substrate_header_known(self.contract, id).await
}
async fn submit_headers(
&self,
headers: Vec<QueuedSubstrateHeader>,
) -> SubmittedHeaders<SubstrateHeaderId, Self::Error> {
async fn submit_headers(&self, headers: Vec<QueuedRialtoHeader>) -> SubmittedHeaders<RialtoHeaderId, Self::Error> {
self.client
.submit_substrate_headers(self.sign_params.clone(), self.contract, headers)
.await
}
async fn incomplete_headers_ids(&self) -> Result<HashSet<SubstrateHeaderId>, Self::Error> {
async fn incomplete_headers_ids(&self) -> Result<HashSet<RialtoHeaderId>, Self::Error> {
self.client.incomplete_substrate_headers(self.contract).await
}
async fn complete_header(
&self,
id: SubstrateHeaderId,
completion: GrandpaJustification,
) -> Result<SubstrateHeaderId, Self::Error> {
id: RialtoHeaderId,
completion: Justification,
) -> Result<RialtoHeaderId, Self::Error> {
self.client
.complete_substrate_header(self.sign_params.clone(), self.contract, id, completion)
.await
}
async fn requires_extra(&self, header: QueuedSubstrateHeader) -> Result<(SubstrateHeaderId, bool), Self::Error> {
async fn requires_extra(&self, header: QueuedRialtoHeader) -> Result<(RialtoHeaderId, bool), Self::Error> {
Ok((header.header().id(), false))
}
}
@@ -192,11 +215,10 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
eth_contract_address,
sync_params,
metrics_params,
instance,
} = params;
let eth_client = EthereumClient::new(eth_params);
let sub_client = async_std::task::block_on(async { SubstrateRpcClient::new(sub_params, instance).await })?;
let sub_client = async_std::task::block_on(async { SubstrateClient::<Rialto>::new(sub_params).await })?;
let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign);
let source = SubstrateHeadersSource::new(sub_client);