Relay subcommand that performs token RLT <> MLAU token swap (#1141)

* token swap relay

* token swap subcommand fixes

* fmt

* removed debug traces

* removed commented code
This commit is contained in:
Svyatoslav Nikolsky
2021-09-21 17:39:05 +03:00
committed by Bastian Köcher
parent 2db84b74cc
commit 5dbf6ba78c
16 changed files with 861 additions and 97 deletions
@@ -10,6 +10,7 @@ async-std = { version = "1.6.5", features = ["attributes"] }
async-trait = "0.1.40"
codec = { package = "parity-scale-codec", version = "2.2.0" }
jsonrpsee-proc-macros = "0.2"
jsonrpsee-types = "0.2"
jsonrpsee-ws-client = "0.2"
log = "0.4.11"
num-traits = "0.2"
@@ -37,6 +38,7 @@ sp-finality-grandpa = { git = "https://github.com/paritytech/substrate", branch
sp-rpc = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-version = { git = "https://github.com/paritytech/substrate", branch = "master" }
+7 -2
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_runtime::{Chain as ChainBase, TransactionEraOf};
use bp_runtime::{Chain as ChainBase, HashOf, TransactionEraOf};
use codec::{Codec, Encode};
use frame_support::weights::WeightToFeePolynomial;
use jsonrpsee_ws_client::{DeserializeOwned, Serialize};
@@ -25,6 +25,7 @@ use sp_runtime::{
traits::{Block as BlockT, Dispatchable, Member},
EncodedJustification,
};
use sp_transaction_pool::TransactionStatus;
use std::{fmt::Debug, time::Duration};
/// Substrate-based chain from minimal relay-client point of view.
@@ -50,8 +51,12 @@ pub trait Chain: ChainBase + Clone {
type WeightToFee: WeightToFeePolynomial<Balance = Self::Balance>;
}
/// Weight-to-Fee type used by the chain
/// Call type used by the chain.
pub type CallOf<C> = <C as Chain>::Call;
/// Weight-to-Fee type used by the chain.
pub type WeightToFeeOf<C> = <C as Chain>::WeightToFee;
/// Transaction status of the chain.
pub type TransactionStatusOf<C> = TransactionStatus<HashOf<C>, HashOf<C>>;
/// Substrate-based chain with `frame_system::Config::AccountData` set to
/// the `pallet_balances::AccountData<Balance>`.
+110 -45
View File
@@ -16,7 +16,7 @@
//! Substrate node client.
use crate::chain::{Chain, ChainWithBalances};
use crate::chain::{Chain, ChainWithBalances, TransactionStatusOf};
use crate::rpc::Substrate;
use crate::{ConnectionParams, Error, HeaderIdOf, Result};
@@ -31,7 +31,7 @@ use num_traits::{Bounded, Zero};
use pallet_balances::AccountData;
use pallet_transaction_payment::InclusionFee;
use relay_utils::{relay_loop::RECONNECT_DELAY, HeaderId};
use sp_core::{storage::StorageKey, Bytes};
use sp_core::{storage::StorageKey, Bytes, Hasher};
use sp_runtime::{
traits::Header as HeaderT,
transaction_validity::{TransactionSource, TransactionValidity},
@@ -45,7 +45,7 @@ const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_valida
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub struct JustificationsSubscription(Mutex<futures::channel::mpsc::Receiver<Option<Bytes>>>);
pub struct Subscription<T>(Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
@@ -190,6 +190,14 @@ impl<C: Chain> Client<C> {
.await
}
/// Return number of the best finalized block.
pub async fn best_finalized_header_number(&self) -> Result<C::BlockNumber> {
Ok(*self
.header_by_hash(self.best_finalized_header_hash().await?)
.await?
.number())
}
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result<C::Header>
where
@@ -243,9 +251,13 @@ impl<C: Chain> Client<C> {
}
/// Read value from runtime storage.
pub async fn storage_value<T: Send + Decode + 'static>(&self, storage_key: StorageKey) -> Result<Option<T>> {
pub async fn storage_value<T: Send + Decode + 'static>(
&self,
storage_key: StorageKey,
block_hash: Option<C::Hash>,
) -> Result<Option<T>> {
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_get_storage(&*client, storage_key)
Substrate::<C>::state_get_storage(&*client, storage_key, block_hash)
.await?
.map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed))
.transpose()
@@ -260,7 +272,7 @@ impl<C: Chain> Client<C> {
{
self.jsonrpsee_execute(move |client| async move {
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data = Substrate::<C>::state_get_storage(&*client, storage_key)
let encoded_account_data = Substrate::<C>::state_get_storage(&*client, storage_key, None)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data =
@@ -318,6 +330,44 @@ impl<C: Chain> Client<C> {
.await
}
/// Does exactly the same as `submit_signed_extrinsic`, but keeps watching for extrinsic status
/// after submission.
pub async fn submit_and_watch_signed_extrinsic(
&self,
extrinsic_signer: C::AccountId,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Index) -> Bytes + Send + 'static,
) -> Result<Subscription<TransactionStatusOf<C>>> {
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let best_header = self.best_header().await?;
let best_header_id = HeaderId(*best_header.number(), best_header.hash());
let subscription = self
.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce);
let tx_hash = C::Hasher::hash(&extrinsic.0);
let subscription = client
.subscribe(
"author_submitAndWatchExtrinsic",
JsonRpcParams::Array(vec![
jsonrpsee_types::to_json_value(extrinsic).map_err(|e| Error::RpcError(e.into()))?
]),
"author_unwatchExtrinsic",
)
.await?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(subscription)
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"extrinsic".into(),
subscription,
sender,
));
Ok(Subscription(Mutex::new(receiver)))
}
/// Returns pending extrinsics from transaction pool.
pub async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
self.jsonrpsee_execute(
@@ -405,8 +455,8 @@ impl<C: Chain> Client<C> {
}
/// Return new justifications stream.
pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> {
let mut subscription = self
pub async fn subscribe_justifications(&self) -> Result<Subscription<Bytes>> {
let subscription = self
.jsonrpsee_execute(move |client| async move {
Ok(client
.subscribe(
@@ -417,38 +467,14 @@ impl<C: Chain> Client<C> {
.await?)
})
.await?;
let (mut sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(async move {
loop {
match subscription.next().await {
Ok(Some(justification)) => {
if sender.send(Some(justification)).await.is_err() {
break;
}
}
Ok(None) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned None. Stream needs to be restarted.",
C::NAME,
);
let _ = sender.send(None).await;
break;
}
Err(e) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned '{:?}'. Stream needs to be restarted.",
C::NAME,
e,
);
let _ = sender.send(None).await;
break;
}
}
}
});
Ok(JustificationsSubscription(Mutex::new(receiver)))
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
sender,
));
Ok(Subscription(Mutex::new(receiver)))
}
/// Execute jsonrpsee future in tokio context.
@@ -465,11 +491,50 @@ impl<C: Chain> Client<C> {
}
}
impl JustificationsSubscription {
/// Return next justification from the subscription.
pub async fn next(&self) -> Result<Option<Bytes>> {
impl<T: DeserializeOwned> Subscription<T> {
/// Return next item from the subscription.
pub async fn next(&self) -> Result<Option<T>> {
let mut receiver = self.0.lock().await;
let justification = receiver.next().await;
Ok(justification.unwrap_or(None))
let item = receiver.next().await;
Ok(item.unwrap_or(None))
}
/// Background worker that is executed in tokio context as `jsonrpsee` requires.
async fn background_worker(
chain_name: String,
item_type: String,
mut subscription: jsonrpsee_types::Subscription<T>,
mut sender: futures::channel::mpsc::Sender<Option<T>>,
) {
loop {
match subscription.next().await {
Ok(Some(item)) => {
if sender.send(Some(item)).await.is_err() {
break;
}
}
Ok(None) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned None. Stream needs to be restarted.",
chain_name,
item_type,
);
let _ = sender.send(None).await;
break;
}
Err(e) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.",
chain_name,
item_type,
e,
);
let _ = sender.send(None).await;
break;
}
}
}
}
}
+5 -4
View File
@@ -32,14 +32,15 @@ pub mod metrics;
use std::time::Duration;
pub use crate::chain::{
BlockWithJustification, Chain, ChainWithBalances, TransactionSignScheme, UnsignedTransaction, WeightToFeeOf,
BlockWithJustification, CallOf, Chain, ChainWithBalances, TransactionSignScheme, TransactionStatusOf,
UnsignedTransaction, WeightToFeeOf,
};
pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet};
pub use crate::client::{Client, OpaqueGrandpaAuthoritiesSet, Subscription};
pub use crate::error::{Error, Result};
pub use crate::sync_header::SyncHeader;
pub use bp_runtime::{
AccountIdOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf, IndexOf, TransactionEra,
TransactionEraOf,
AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf, IndexOf, SignatureOf,
TransactionEra, TransactionEraOf,
};
/// Header id used by the chain.
@@ -79,7 +79,7 @@ where
async fn update(&self) {
let value = self
.client
.storage_value::<T>(self.storage_key.clone())
.storage_value::<T>(self.storage_key.clone(), None)
.await
.map(|maybe_storage_value| {
maybe_storage_value.or(self.maybe_default_value).map(|storage_value| {
+1 -1
View File
@@ -48,7 +48,7 @@ jsonrpsee_proc_macros::rpc_client_api! {
#[rpc(method = "state_call", positional_params)]
fn state_call(method: String, data: Bytes, at_block: Option<C::Hash>) -> Bytes;
#[rpc(method = "state_getStorage", positional_params)]
fn state_get_storage(key: StorageKey) -> Option<StorageData>;
fn state_get_storage(key: StorageKey, at_block: Option<C::Hash>) -> Option<StorageData>;
#[rpc(method = "state_getReadProof", positional_params)]
fn state_prove_storage(keys: Vec<StorageKey>, hash: Option<C::Hash>) -> ReadProof<C::Hash>;
#[rpc(method = "state_getRuntimeVersion", positional_params)]