Flatten back the structure (#837)

* Remove chains.

* Move relay clients.

* Flatten generic.

* Fix fmt.
This commit is contained in:
Tomasz Drwięga
2021-03-23 14:17:33 +01:00
committed by Bastian Köcher
parent 647eb80165
commit acb872fbb0
109 changed files with 87 additions and 86 deletions
@@ -0,0 +1,103 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_runtime::Chain as ChainBase;
use frame_support::Parameter;
use jsonrpsee_types::jsonrpc::{DeserializeOwned, Serialize};
use num_traits::{CheckedSub, Zero};
use sp_core::{storage::StorageKey, Pair};
use sp_runtime::{
generic::SignedBlock,
traits::{
AtLeast32Bit, Block as BlockT, Dispatchable, MaybeDisplay, MaybeSerialize, MaybeSerializeDeserialize, Member,
},
Justification,
};
use std::{fmt::Debug, time::Duration};
/// Substrate-based chain from minimal relay-client point of view.
pub trait Chain: ChainBase {
/// Chain name.
const NAME: &'static str;
/// Average block interval.
///
/// How often blocks are produced on that chain. It's suggested to set this value
/// to match the block time of the chain.
const AVERAGE_BLOCK_INTERVAL: Duration;
/// The user account identifier type for the runtime.
type AccountId: Parameter + Member + MaybeSerializeDeserialize + Debug + MaybeDisplay + Ord + Default;
/// Index of a transaction used by the chain.
type Index: Parameter
+ Member
+ MaybeSerialize
+ Debug
+ Default
+ MaybeDisplay
+ DeserializeOwned
+ AtLeast32Bit
+ Copy;
/// Block type.
type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification<Self::Header>;
/// The aggregated `Call` type.
type Call: Dispatchable + Debug;
}
/// Substrate-based chain with `frame_system::Config::AccountData` set to
/// the `pallet_balances::AccountData<NativeBalance>`.
pub trait ChainWithBalances: Chain {
/// Balance of an account in native tokens.
type NativeBalance: Parameter + Member + DeserializeOwned + Clone + Copy + CheckedSub + PartialOrd + Zero;
/// Return runtime storage key for getting `frame_system::AccountInfo` of given account.
fn account_info_storage_key(account_id: &Self::AccountId) -> StorageKey;
}
/// Block with justification.
pub trait BlockWithJustification<Header> {
/// Return block header.
fn header(&self) -> Header;
/// Return block justification, if known.
fn justification(&self) -> Option<&Justification>;
}
/// Substrate-based chain transactions signing scheme.
pub trait TransactionSignScheme {
/// Chain that this scheme is to be used.
type Chain: Chain;
/// Type of key pairs used to sign transactions.
type AccountKeyPair: Pair;
/// Signed transaction.
type SignedTransaction;
/// Create transaction for given runtime call, signed by given account.
fn sign_transaction(
genesis_hash: <Self::Chain as ChainBase>::Hash,
signer: &Self::AccountKeyPair,
signer_nonce: <Self::Chain as Chain>::Index,
call: <Self::Chain as Chain>::Call,
) -> Self::SignedTransaction;
}
impl<Block: BlockT> BlockWithJustification<Block::Header> for SignedBlock<Block> {
fn header(&self) -> Block::Header {
self.block.header().clone()
}
fn justification(&self) -> Option<&Justification> {
self.justification.as_ref()
}
}
@@ -0,0 +1,274 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate node client.
use crate::chain::{Chain, ChainWithBalances};
use crate::rpc::{Substrate, SubstrateMessages};
use crate::{ConnectionParams, Error, Result};
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use codec::Decode;
use frame_system::AccountInfo;
use jsonrpsee_types::{jsonrpc::DeserializeOwned, traits::SubscriptionClient};
use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig, WsSubscription as Subscription};
use num_traits::Zero;
use pallet_balances::AccountData;
use sp_core::Bytes;
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::ops::RangeInclusive;
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub type JustificationsSubscription = Subscription<Bytes>;
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
/// Substrate client type.
///
/// Cloning `Client` is a cheap operation.
pub struct Client<C: Chain> {
/// Client connection params.
params: ConnectionParams,
/// Substrate RPC client.
client: RpcClient,
/// Genesis block hash.
genesis_hash: C::Hash,
}
impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
}
}
}
impl<C: Chain> std::fmt::Debug for Client<C> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Client")
.field("genesis_hash", &self.genesis_hash)
.finish()
}
}
impl<C: Chain> Client<C> {
/// Returns client that is able to call RPCs on Substrate node over websocket connection.
pub async fn new(params: ConnectionParams) -> Result<Self> {
let client = Self::build_client(params.clone()).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash = Substrate::<C>::chain_get_block_hash(&client, number).await?;
Ok(Self {
params,
client,
genesis_hash,
})
}
/// Reopen client connection.
pub async fn reconnect(&mut self) -> Result<()> {
self.client = Self::build_client(self.params.clone()).await?;
Ok(())
}
/// Build client to use in connection.
async fn build_client(params: ConnectionParams) -> Result<RpcClient> {
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
let mut config = RpcConfig::with_url(&uri);
config.max_notifs_per_subscription = MAX_SUBSCRIPTION_CAPACITY;
let client = RpcClient::new(config).await?;
Ok(client)
}
}
impl<C: Chain> Client<C> {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
let health = Substrate::<C>::system_health(&self.client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
}
/// Return hash of the genesis block.
pub fn genesis_hash(&self) -> &C::Hash {
&self.genesis_hash
}
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> {
Ok(Substrate::<C>::chain_get_finalized_head(&self.client).await?)
}
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
Ok(Substrate::<C>::chain_get_header(&self.client, None).await?)
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> {
Ok(Substrate::<C>::chain_get_block(&self.client, block_hash).await?)
}
/// Get a Substrate header by its hash.
pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
Ok(Substrate::<C>::chain_get_header(&self.client, block_hash).await?)
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> {
Ok(Substrate::<C>::chain_get_block_hash(&self.client, number).await?)
}
/// Get a Substrate header by its number.
pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
Ok(Self::header_by_hash(self, block_hash).await?)
}
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
Ok(Substrate::<C>::runtime_version(&self.client).await?)
}
/// Return native tokens balance of the account.
pub async fn free_native_balance(&self, account: C::AccountId) -> Result<C::NativeBalance>
where
C: ChainWithBalances,
{
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data = Substrate::<C>::get_storage(&self.client, storage_key)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data =
AccountInfo::<C::Index, AccountData<C::NativeBalance>>::decode(&mut &encoded_account_data.0[..])
.map_err(Error::ResponseParseFailed)?;
Ok(decoded_account_data.data.free)
}
/// Get the nonce of the given Substrate account.
///
/// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address.
pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Index> {
Ok(Substrate::<C>::system_account_next_index(&self.client, account).await?)
}
/// Submit an extrinsic for inclusion in a block.
///
/// Note: The given transaction does not need be SCALE encoded beforehand.
pub async fn submit_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
let tx_hash = Substrate::<C>::author_submit_extrinsic(&self.client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash);
Ok(tx_hash)
}
/// Get the GRANDPA authority set at given block.
pub async fn grandpa_authorities_set(&self, block: C::Hash) -> Result<OpaqueGrandpaAuthoritiesSet> {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
let data = Bytes(Vec::new());
let encoded_response = Substrate::<C>::state_call(&self.client, call, data, Some(block)).await?;
let authority_list = encoded_response.0;
Ok(authority_list)
}
/// Execute runtime call at given block.
pub async fn state_call(&self, method: String, data: Bytes, at_block: Option<C::Hash>) -> Result<Bytes> {
Substrate::<C>::state_call(&self.client, method, data, at_block)
.await
.map_err(Into::into)
}
/// Returns proof-of-message(s) in given inclusive range.
pub async fn prove_messages(
&self,
instance: InstanceId,
lane: LaneId,
range: RangeInclusive<MessageNonce>,
include_outbound_lane_state: bool,
at_block: C::Hash,
) -> Result<StorageProof> {
let encoded_trie_nodes = SubstrateMessages::<C>::prove_messages(
&self.client,
instance,
lane,
*range.start(),
*range.end(),
include_outbound_lane_state,
Some(at_block),
)
.await
.map_err(Error::RpcError)?;
let decoded_trie_nodes: Vec<Vec<u8>> =
Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?;
Ok(StorageProof::new(decoded_trie_nodes))
}
/// Returns proof-of-message(s) delivery.
pub async fn prove_messages_delivery(
&self,
instance: InstanceId,
lane: LaneId,
at_block: C::Hash,
) -> Result<Vec<Vec<u8>>> {
let encoded_trie_nodes =
SubstrateMessages::<C>::prove_messages_delivery(&self.client, instance, lane, Some(at_block))
.await
.map_err(Error::RpcError)?;
let decoded_trie_nodes: Vec<Vec<u8>> =
Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?;
Ok(decoded_trie_nodes)
}
/// Return new justifications stream.
pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> {
Ok(self
.client
.subscribe(
"grandpa_subscribeJustifications",
jsonrpsee_types::jsonrpc::Params::None,
"grandpa_unsubscribeJustifications",
)
.await?)
}
}
@@ -0,0 +1,81 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate node RPC errors.
use jsonrpsee_types::error::Error as RpcError;
use relay_utils::MaybeConnectionError;
use sc_rpc_api::system::Health;
/// Result type used by Substrate client.
pub type Result<T> = std::result::Result<T, Error>;
/// Errors that can occur only when interacting with
/// a Substrate node through RPC.
#[derive(Debug)]
pub enum Error {
/// An error that can occur when making a request to
/// an JSON-RPC server.
RpcError(RpcError),
/// The response from the server could not be SCALE decoded.
ResponseParseFailed(codec::Error),
/// The Substrate bridge pallet has not yet been initialized.
UninitializedBridgePallet,
/// Account does not exist on the chain.
AccountDoesNotExist,
/// The client we're connected to is not synced, so we can't rely on its state.
ClientNotSynced(Health),
/// Custom logic error.
Custom(String),
}
impl From<RpcError> for Error {
fn from(error: RpcError) -> Self {
Error::RpcError(error)
}
}
impl MaybeConnectionError for Error {
fn is_connection_error(&self) -> bool {
matches!(
*self,
Error::RpcError(RpcError::TransportError(_))
// right now if connection to the ws server is dropped (after it is already established),
// we're getting this error
| Error::RpcError(RpcError::Internal(_))
| Error::ClientNotSynced(_),
)
}
}
impl From<Error> for String {
fn from(error: Error) -> String {
error.to_string()
}
}
impl ToString for Error {
fn to_string(&self) -> String {
match self {
Self::RpcError(e) => e.to_string(),
Self::ResponseParseFailed(e) => e.to_string(),
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health),
Self::Custom(e) => e.clone(),
}
}
}
@@ -0,0 +1,162 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Default generic implementation of finality source for basic Substrate client.
use crate::chain::{BlockWithJustification, Chain};
use crate::client::Client;
use crate::error::Error;
use crate::sync_header::SyncHeader;
use async_trait::async_trait;
use bp_header_chain::justification::decode_justification_target;
use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader};
use futures::stream::{unfold, Stream, StreamExt};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;
use std::{marker::PhantomData, pin::Pin};
/// Wrapped raw Justification.
#[derive(Debug, Clone)]
pub struct Justification<Number> {
/// Header number decoded from the [`raw_justification`].
target_header_number: Number,
/// Raw, encoded justification bytes.
raw_justification: sp_runtime::Justification,
}
impl<Number> Justification<Number> {
/// Extract raw justification.
pub fn into_inner(self) -> sp_runtime::Justification {
self.raw_justification
}
}
impl<Number: relay_utils::BlockNumberBase> FinalityProof<Number> for Justification<Number> {
fn target_header_number(&self) -> Number {
self.target_header_number
}
}
/// Substrate node as finality source.
pub struct FinalitySource<C: Chain, P> {
client: Client<C>,
_phantom: PhantomData<P>,
}
impl<C: Chain, P> FinalitySource<C, P> {
/// Create new headers source using given client.
pub fn new(client: Client<C>) -> Self {
FinalitySource {
client,
_phantom: Default::default(),
}
}
}
impl<C: Chain, P> Clone for FinalitySource<C, P> {
fn clone(&self) -> Self {
FinalitySource {
client: self.client.clone(),
_phantom: Default::default(),
}
}
}
#[async_trait]
impl<C: Chain, P: FinalitySyncPipeline> RelayClient for FinalitySource<C, P> {
type Error = Error;
async fn reconnect(&mut self) -> Result<(), Error> {
self.client.reconnect().await
}
}
#[async_trait]
impl<C, P> SourceClient<P> for FinalitySource<C, P>
where
C: Chain,
C::BlockNumber: relay_utils::BlockNumberBase,
P: FinalitySyncPipeline<
Hash = C::Hash,
Number = C::BlockNumber,
Header = SyncHeader<C::Header>,
FinalityProof = Justification<C::BlockNumber>,
>,
P::Header: SourceHeader<C::BlockNumber>,
{
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>> + Send>>;
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because
// target node may be missing proofs that are already available at the source
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
Ok(*finalized_header.number())
}
async fn header_and_finality_proof(
&self,
number: P::Number,
) -> Result<(P::Header, Option<P::FinalityProof>), Error> {
let header_hash = self.client.block_hash_by_number(number).await?;
let signed_block = self.client.get_block(Some(header_hash)).await?;
Ok((
signed_block.header().into(),
signed_block
.justification()
.cloned()
.map(|raw_justification| Justification {
target_header_number: number,
raw_justification,
}),
))
}
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
Ok(unfold(
self.client.clone().subscribe_justifications().await?,
move |mut subscription| async move {
loop {
let next_justification = subscription.next().await?;
let decoded_target = decode_justification_target::<C::Header>(&next_justification.0);
let target_header_number = match decoded_target {
Ok((_, number)) => number,
Err(err) => {
log::error!(
target: "bridge",
"Failed to decode justification target from the {} justifications stream: {:?}",
P::SOURCE_NAME,
err,
);
continue;
}
};
return Some((
Justification {
target_header_number,
raw_justification: next_justification.0,
},
subscription,
));
}
},
)
.boxed())
}
}
@@ -0,0 +1,372 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Module provides a set of guard functions that are running in background threads
//! and are aborting process if some condition fails.
use crate::{Chain, ChainWithBalances, Client};
use async_trait::async_trait;
use num_traits::CheckedSub;
use sp_version::RuntimeVersion;
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
/// Guards environment.
#[async_trait]
pub trait Environment<C: ChainWithBalances>: Send + Sync + 'static {
/// Return current runtime version.
async fn runtime_version(&mut self) -> Result<RuntimeVersion, String>;
/// Return free native balance of the account on the chain.
async fn free_native_balance(&mut self, account: C::AccountId) -> Result<C::NativeBalance, String>;
/// Return current time.
fn now(&self) -> Instant {
Instant::now()
}
/// Sleep given amount of time.
async fn sleep(&mut self, duration: Duration) {
async_std::task::sleep(duration).await
}
/// Abort current process. Called when guard condition check fails.
async fn abort(&mut self) {
std::process::abort();
}
}
/// Abort when runtime spec version is different from specified.
pub fn abort_on_spec_version_change<C: ChainWithBalances>(mut env: impl Environment<C>, expected_spec_version: u32) {
async_std::task::spawn(async move {
loop {
let actual_spec_version = env.runtime_version().await;
match actual_spec_version {
Ok(version) if version.spec_version == expected_spec_version => (),
Ok(version) => {
log::error!(
target: "bridge-guard",
"{} runtime spec version has changed from {} to {}. Aborting relay",
C::NAME,
expected_spec_version,
version.spec_version,
);
env.abort().await;
}
Err(error) => log::warn!(
target: "bridge-guard",
"Failed to read {} runtime version: {:?}. Relay may need to be stopped manually",
C::NAME,
error,
),
}
env.sleep(conditions_check_delay::<C>()).await;
}
});
}
/// Abort if, during a 24 hours, free balance of given account is decreased at least by given value.
/// Other components may increase (or decrease) balance of account and it WILL affect logic of the guard.
pub fn abort_when_account_balance_decreased<C: ChainWithBalances>(
mut env: impl Environment<C>,
account_id: C::AccountId,
maximal_decrease: C::NativeBalance,
) {
const DAY: Duration = Duration::from_secs(60 * 60 * 24);
async_std::task::spawn(async move {
let mut balances = VecDeque::new();
loop {
let current_time = env.now();
// remember balances that are beyound 24h border
let time_border = current_time - DAY;
while balances.front().map(|(time, _)| *time < time_border).unwrap_or(false) {
balances.pop_front();
}
// read balance of the account
let current_balance = env.free_native_balance(account_id.clone()).await;
// remember balance and check difference
match current_balance {
Ok(current_balance) => {
// remember balance
balances.push_back((current_time, current_balance));
// check if difference between current and oldest balance is too large
let (oldest_time, oldest_balance) =
balances.front().expect("pushed to queue couple of lines above; qed");
let balances_difference = oldest_balance.checked_sub(&current_balance);
if balances_difference > Some(maximal_decrease) {
log::error!(
target: "bridge-guard",
"Balance of {} account {:?} has decreased from {:?} to {:?} in {} minutes. Aborting relay",
C::NAME,
account_id,
oldest_balance,
current_balance,
current_time.duration_since(*oldest_time).as_secs() / 60,
);
env.abort().await;
}
}
Err(error) => {
log::warn!(
target: "bridge-guard",
"Failed to read {} account {:?} balance: {:?}. Relay may need to be stopped manually",
C::NAME,
account_id,
error,
);
}
};
env.sleep(conditions_check_delay::<C>()).await;
}
});
}
/// Delay between conditions check.
fn conditions_check_delay<C: Chain>() -> Duration {
C::AVERAGE_BLOCK_INTERVAL * (10 + rand::random::<u32>() % 10)
}
#[async_trait]
impl<C: ChainWithBalances> Environment<C> for Client<C> {
async fn runtime_version(&mut self) -> Result<RuntimeVersion, String> {
Client::<C>::runtime_version(self).await.map_err(|e| e.to_string())
}
async fn free_native_balance(&mut self, account: C::AccountId) -> Result<C::NativeBalance, String> {
Client::<C>::free_native_balance(self, account)
.await
.map_err(|e| e.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
future::FutureExt,
stream::StreamExt,
SinkExt,
};
struct TestChain;
impl bp_runtime::Chain for TestChain {
type BlockNumber = u32;
type Hash = sp_core::H256;
type Hasher = sp_runtime::traits::BlakeTwo256;
type Header = sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>;
}
impl Chain for TestChain {
const NAME: &'static str = "Test";
const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(1);
type AccountId = u32;
type Index = u32;
type SignedBlock =
sp_runtime::generic::SignedBlock<sp_runtime::generic::Block<Self::Header, sp_runtime::OpaqueExtrinsic>>;
type Call = ();
}
impl ChainWithBalances for TestChain {
type NativeBalance = u32;
fn account_info_storage_key(_account_id: &u32) -> sp_core::storage::StorageKey {
unreachable!()
}
}
struct TestEnvironment {
runtime_version_rx: UnboundedReceiver<RuntimeVersion>,
free_native_balance_rx: UnboundedReceiver<u32>,
slept_tx: UnboundedSender<()>,
aborted_tx: UnboundedSender<()>,
}
#[async_trait]
impl Environment<TestChain> for TestEnvironment {
async fn runtime_version(&mut self) -> Result<RuntimeVersion, String> {
Ok(self.runtime_version_rx.next().await.unwrap_or_default())
}
async fn free_native_balance(&mut self, _account: u32) -> Result<u32, String> {
Ok(self.free_native_balance_rx.next().await.unwrap_or_default())
}
async fn sleep(&mut self, _duration: Duration) {
let _ = self.slept_tx.send(()).await;
}
async fn abort(&mut self) {
let _ = self.aborted_tx.send(()).await;
// simulate process abort :)
async_std::task::sleep(Duration::from_secs(60)).await;
}
}
#[test]
fn aborts_when_spec_version_is_changed() {
async_std::task::block_on(async {
let (
(mut runtime_version_tx, runtime_version_rx),
(_free_native_balance_tx, free_native_balance_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded(), unbounded());
abort_on_spec_version_change(
TestEnvironment {
runtime_version_rx,
free_native_balance_rx,
slept_tx,
aborted_tx,
},
0,
);
// client responds with wrong version
runtime_version_tx
.send(RuntimeVersion {
spec_version: 42,
..Default::default()
})
.await
.unwrap();
// then the `abort` function is called
aborted_rx.next().await;
// and we do not reach the `sleep` function call
assert!(slept_rx.next().now_or_never().is_none());
});
}
#[test]
fn does_not_aborts_when_spec_version_is_unchanged() {
async_std::task::block_on(async {
let (
(mut runtime_version_tx, runtime_version_rx),
(_free_native_balance_tx, free_native_balance_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded(), unbounded());
abort_on_spec_version_change(
TestEnvironment {
runtime_version_rx,
free_native_balance_rx,
slept_tx,
aborted_tx,
},
42,
);
// client responds with the same version
runtime_version_tx
.send(RuntimeVersion {
spec_version: 42,
..Default::default()
})
.await
.unwrap();
// then the `sleep` function is called
slept_rx.next().await;
// and the `abort` function is not called
assert!(aborted_rx.next().now_or_never().is_none());
});
}
#[test]
fn aborts_when_balance_is_too_low() {
async_std::task::block_on(async {
let (
(_runtime_version_tx, runtime_version_rx),
(mut free_native_balance_tx, free_native_balance_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded(), unbounded());
abort_when_account_balance_decreased(
TestEnvironment {
runtime_version_rx,
free_native_balance_rx,
slept_tx,
aborted_tx,
},
0,
100,
);
// client responds with initial balance
free_native_balance_tx.send(1000).await.unwrap();
// then the guard sleeps
slept_rx.next().await;
// and then client responds with updated balance, which is too low
free_native_balance_tx.send(899).await.unwrap();
// then the `abort` function is called
aborted_rx.next().await;
// and we do not reach next `sleep` function call
assert!(slept_rx.next().now_or_never().is_none());
});
}
#[test]
fn does_not_aborts_when_balance_is_enough() {
async_std::task::block_on(async {
let (
(_runtime_version_tx, runtime_version_rx),
(mut free_native_balance_tx, free_native_balance_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded(), unbounded());
abort_when_account_balance_decreased(
TestEnvironment {
runtime_version_rx,
free_native_balance_rx,
slept_tx,
aborted_tx,
},
0,
100,
);
// client responds with initial balance
free_native_balance_tx.send(1000).await.unwrap();
// then the guard sleeps
slept_rx.next().await;
// and then client responds with updated balance, which is enough
free_native_balance_tx.send(950).await.unwrap();
// then the `sleep` function is called
slept_rx.next().await;
// and `abort` is not called
assert!(aborted_rx.next().now_or_never().is_none());
});
}
}
@@ -0,0 +1,108 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Default generic implementation of headers source for basic Substrate client.
use crate::chain::{BlockWithJustification, Chain};
use crate::client::Client;
use crate::error::Error;
use async_trait::async_trait;
use headers_relay::{
sync_loop::SourceClient,
sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SourceHeader},
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::{traits::Header as HeaderT, Justification};
use std::marker::PhantomData;
/// Substrate node as headers source.
pub struct HeadersSource<C: Chain, P> {
client: Client<C>,
_phantom: PhantomData<P>,
}
impl<C: Chain, P> HeadersSource<C, P> {
/// Create new headers source using given client.
pub fn new(client: Client<C>) -> Self {
HeadersSource {
client,
_phantom: Default::default(),
}
}
}
impl<C: Chain, P> Clone for HeadersSource<C, P> {
fn clone(&self) -> Self {
HeadersSource {
client: self.client.clone(),
_phantom: Default::default(),
}
}
}
#[async_trait]
impl<C: Chain, P: HeadersSyncPipeline> RelayClient for HeadersSource<C, P> {
type Error = Error;
async fn reconnect(&mut self) -> Result<(), Error> {
self.client.reconnect().await
}
}
#[async_trait]
impl<C, P> SourceClient<P> for HeadersSource<C, P>
where
C: Chain,
C::BlockNumber: relay_utils::BlockNumberBase,
C::Header: Into<P::Header>,
P: HeadersSyncPipeline<Extra = (), Completion = Justification, Hash = C::Hash, Number = C::BlockNumber>,
P::Header: SourceHeader<C::Hash, C::BlockNumber>,
{
async fn best_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay headers if source node is out of sync, because
// target node may be missing headers that are already available at the source
Ok(*self.client.best_header().await?.number())
}
async fn header_by_hash(&self, hash: P::Hash) -> Result<P::Header, Error> {
self.client
.header_by_hash(hash)
.await
.map(Into::into)
.map_err(Into::into)
}
async fn header_by_number(&self, number: P::Number) -> Result<P::Header, Error> {
self.client
.header_by_number(number)
.await
.map(Into::into)
.map_err(Into::into)
}
async fn header_completion(&self, id: HeaderIdOf<P>) -> Result<(HeaderIdOf<P>, Option<P::Completion>), Error> {
let hash = id.1;
let signed_block = self.client.get_block(Some(hash)).await?;
let grandpa_justification = signed_block.justification().cloned();
Ok((id, grandpa_justification))
}
async fn header_extra(&self, id: HeaderIdOf<P>, _header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, ()), Error> {
Ok((id, ()))
}
}
@@ -0,0 +1,59 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tools to interact with (Open) Ethereum node using RPC methods.
#![warn(missing_docs)]
mod chain;
mod client;
mod error;
mod rpc;
mod sync_header;
pub mod finality_source;
pub mod guard;
pub mod headers_source;
pub use crate::chain::{BlockWithJustification, Chain, ChainWithBalances, TransactionSignScheme};
pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet};
pub use crate::error::{Error, Result};
pub use crate::sync_header::SyncHeader;
pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf};
/// Header id used by the chain.
pub type HeaderIdOf<C> = relay_utils::HeaderId<HashOf<C>, BlockNumberOf<C>>;
/// Substrate-over-websocket connection params.
#[derive(Debug, Clone)]
pub struct ConnectionParams {
/// Websocket server hostname.
pub host: String,
/// Websocket server TCP port.
pub port: u16,
/// Use secure websocket connection.
pub secure: bool,
}
impl Default for ConnectionParams {
fn default() -> Self {
ConnectionParams {
host: "localhost".into(),
port: 9944,
secure: false,
}
}
}
@@ -0,0 +1,72 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! The most generic Substrate node RPC interface.
use crate::chain::Chain;
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use sc_rpc_api::system::Health;
use sp_core::{
storage::{StorageData, StorageKey},
Bytes,
};
use sp_version::RuntimeVersion;
jsonrpsee_proc_macros::rpc_client_api! {
pub(crate) Substrate<C: Chain> {
#[rpc(method = "system_health", positional_params)]
fn system_health() -> Health;
#[rpc(method = "chain_getHeader", positional_params)]
fn chain_get_header(block_hash: Option<C::Hash>) -> C::Header;
#[rpc(method = "chain_getFinalizedHead", positional_params)]
fn chain_get_finalized_head() -> C::Hash;
#[rpc(method = "chain_getBlock", positional_params)]
fn chain_get_block(block_hash: Option<C::Hash>) -> C::SignedBlock;
#[rpc(method = "chain_getBlockHash", positional_params)]
fn chain_get_block_hash(block_number: Option<C::BlockNumber>) -> C::Hash;
#[rpc(method = "system_accountNextIndex", positional_params)]
fn system_account_next_index(account_id: C::AccountId) -> C::Index;
#[rpc(method = "author_submitExtrinsic", positional_params)]
fn author_submit_extrinsic(extrinsic: Bytes) -> C::Hash;
#[rpc(method = "state_call", positional_params)]
fn state_call(method: String, data: Bytes, at_block: Option<C::Hash>) -> Bytes;
#[rpc(method = "state_getStorage", positional_params)]
fn get_storage(key: StorageKey) -> Option<StorageData>;
#[rpc(method = "state_getRuntimeVersion", positional_params)]
fn runtime_version() -> RuntimeVersion;
}
pub(crate) SubstrateMessages<C: Chain> {
#[rpc(method = "messages_proveMessages", positional_params)]
fn prove_messages(
instance: InstanceId,
lane: LaneId,
begin: MessageNonce,
end: MessageNonce,
include_outbound_lane_state: bool,
block: Option<C::Hash>,
) -> Bytes;
#[rpc(method = "messages_proveMessagesDelivery", positional_params)]
fn prove_messages_delivery(
instance: InstanceId,
lane: LaneId,
block: Option<C::Hash>,
) -> Bytes;
}
}
@@ -0,0 +1,73 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_header_chain::find_grandpa_authorities_scheduled_change;
use finality_relay::SourceHeader as FinalitySourceHeader;
use headers_relay::sync_types::SourceHeader;
use num_traits::{CheckedSub, One};
use relay_utils::HeaderId;
use sp_runtime::traits::Header as HeaderT;
/// Generic wrapper for `sp_runtime::traits::Header` based headers, that
/// implements `headers_relay::sync_types::SourceHeader` and may be used in headers sync directly.
#[derive(Clone, Debug, PartialEq)]
pub struct SyncHeader<Header>(Header);
impl<Header> SyncHeader<Header> {
/// Extracts wrapped header from self.
pub fn into_inner(self) -> Header {
self.0
}
}
impl<Header> std::ops::Deref for SyncHeader<Header> {
type Target = Header;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<Header> From<Header> for SyncHeader<Header> {
fn from(header: Header) -> Self {
Self(header)
}
}
impl<Header: HeaderT> SourceHeader<Header::Hash, Header::Number> for SyncHeader<Header> {
fn id(&self) -> HeaderId<Header::Hash, Header::Number> {
relay_utils::HeaderId(*self.0.number(), self.hash())
}
fn parent_id(&self) -> HeaderId<Header::Hash, Header::Number> {
relay_utils::HeaderId(
self.number()
.checked_sub(&One::one())
.expect("should never be called for genesis header"),
*self.parent_hash(),
)
}
}
impl<Header: HeaderT> FinalitySourceHeader<Header::Number> for SyncHeader<Header> {
fn number(&self) -> Header::Number {
*self.0.number()
}
fn is_mandatory(&self) -> bool {
find_grandpa_authorities_scheduled_change(&self.0).is_some()
}
}