diff --git a/Cargo.toml b/Cargo.toml index 9dbbefebc9..18e1ab2281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,10 +13,11 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"] [dependencies] log = "0.4" thiserror = "1.0" -futures = "0.1" -jsonrpc-core-client = { version = "14.0", features = ["ws"] } +futures = "0.3" +jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", features = ["ws"] } num-traits = { version = "0.2", default-features = false } serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" url = "1.7" codec = { package = "parity-scale-codec", version = "1.1", default-features = false, features = ["derive", "full"] } @@ -26,14 +27,13 @@ sp-runtime = { git = "https://github.com/paritytech/substrate/", package = "sp-r sp-version = { git = "https://github.com/paritytech/substrate/", package = "sp-version" } pallet-indices = { git = "https://github.com/paritytech/substrate/", package = "pallet-indices" } hex = "0.4.0" -sc-rpc-api = { git = "https://github.com/paritytech/substrate/", package = "sc-rpc-api" } sp-rpc = { git = "https://github.com/paritytech/substrate/", package = "sp-rpc" } sp-core = { git = "https://github.com/paritytech/substrate/", package = "sp-core" } sp-transaction-pool = { git = "https://github.com/paritytech/substrate/", package = "sp-transaction-pool" } [dev-dependencies] +async-std = "1.2.0" env_logger = "0.7" -tokio = "0.1" wabt = "0.9" frame-system = { git = "https://github.com/paritytech/substrate/", package = "frame-system" } node-runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime" } diff --git a/examples/submit_and_watch.rs b/examples/submit_and_watch.rs index 203fe1542d..019313e982 100644 --- a/examples/submit_and_watch.rs +++ b/examples/submit_and_watch.rs @@ -14,37 +14,41 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use futures::future::Future; use sp_keyring::AccountKeyring; use substrate_subxt::{ balances, system::System, DefaultNodeRuntime as Runtime, + ExtrinsicSuccess, }; type AccountId = ::AccountId; type Balance = ::Balance; fn main() { - env_logger::init(); - let signer = AccountKeyring::Alice.pair(); + let result: Result, Box> = + async_std::task::block_on(async move { + env_logger::init(); - let dest = AccountKeyring::Bob.to_account_id(); + let signer = AccountKeyring::Alice.pair(); - let fut = substrate_subxt::ClientBuilder::::new() - .build() - .and_then(|cli| cli.xt(signer, None)) - .and_then(move |xt| { - xt.watch() + let dest = AccountKeyring::Bob.to_account_id(); + + let cli = substrate_subxt::ClientBuilder::::new() + .build() + .await?; + let xt = cli.xt(signer, None).await?; + let xt_result = xt + .watch() .events_decoder(|decoder| { // for any primitive event with no type size registered decoder.register_type_size::<(u64, u64)>("IdentificationTuple") }) .submit(balances::transfer::(dest.clone().into(), 10_000)) + .await?; + Ok(xt_result) }); - - let mut rt = tokio::runtime::Runtime::new().unwrap(); - match rt.block_on(fut) { + match result { Ok(extrinsic_success) => { match extrinsic_success .find_event::<(AccountId, AccountId, Balance, Balance)>( @@ -57,6 +61,6 @@ fn main() { None => println!("Failed to find Contracts::CodeStored Event"), } } - Err(err) => println!("Error: {}", err), + Err(err) => println!("Error: {:?}", err), } } diff --git a/src/error.rs b/src/error.rs index 00a3c809c6..38e76c0d8f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use jsonrpc_core_client::RpcError; +use jsonrpsee::{ + client::RequestError, + ws::WsNewDnsError, +}; use sp_core::crypto::SecretStringError; use sp_runtime::transaction_validity::TransactionValidityError; @@ -34,7 +37,13 @@ pub enum Error { Codec(#[from] codec::Error), /// Rpc error. #[error("Rpc error: {0}")] - Rpc(RpcError), + Rpc(#[from] RequestError), + /// Error that can happen during the initial websocket handshake + #[error("Rpc error: {0}")] + WsHandshake(#[from] WsNewDnsError), + /// Serde serialization error + #[error("Serde json error: {0}")] + Serialization(#[from] serde_json::error::Error), /// Secret string error. #[error("Secret String Error")] SecretString(SecretStringError), @@ -52,12 +61,6 @@ pub enum Error { Other(String), } -impl From for Error { - fn from(error: RpcError) -> Self { - Error::Rpc(error) - } -} - impl From for Error { fn from(error: SecretStringError) -> Self { Error::SecretString(error) diff --git a/src/events.rs b/src/events.rs index 84a35b21d7..69a1d8aa7e 100644 --- a/src/events.rs +++ b/src/events.rs @@ -205,14 +205,14 @@ impl EventsDecoder { for _ in 0..len { // decode EventRecord let phase = Phase::decode(input)?; - let module_variant = input.read_byte()? as u8; + let module_variant = input.read_byte()?; let module = self.metadata.module_with_events(module_variant)?; let event = if module.name() == "System" { let system_event = SystemEvent::decode(input)?; RuntimeEvent::System(system_event) } else { - let event_variant = input.read_byte()? as u8; + let event_variant = input.read_byte()?; let event_metadata = module.event(event_variant)?; let mut event_data = Vec::::new(); diff --git a/src/frame/balances.rs b/src/frame/balances.rs index ade21eeee8..a37af971cb 100644 --- a/src/frame/balances.rs +++ b/src/frame/balances.rs @@ -16,7 +16,10 @@ //! Implements support for the pallet_balances module. -use std::fmt::Debug; +use std::{ + fmt::Debug, + pin::Pin, +}; use futures::future::{ self, @@ -74,17 +77,26 @@ pub trait BalancesStore { fn free_balance( &self, account_id: ::AccountId, - ) -> Box::Balance, Error = Error> + Send>; + ) -> Pin< + Box< + dyn Future::Balance, Error>> + + Send, + >, + >; } -impl BalancesStore for Client { +impl BalancesStore for Client { type Balances = T; fn free_balance( &self, account_id: ::AccountId, - ) -> Box::Balance, Error = Error> + Send> - { + ) -> Pin< + Box< + dyn Future::Balance, Error>> + + Send, + >, + > { let free_balance_map = || { Ok(self .metadata() @@ -96,9 +108,14 @@ impl BalancesStore for Client { }; let map = match free_balance_map() { Ok(map) => map, - Err(err) => return Box::new(future::err(err)), + Err(err) => return Box::pin(future::err(err)), }; - Box::new(self.fetch_or(map.key(account_id), None, map.default())) + let client = self.clone(); + Box::pin(async move { + client + .fetch_or(map.key(account_id), None, map.default()) + .await + }) } } diff --git a/src/frame/contracts.rs b/src/frame/contracts.rs index 55415bfe3f..99c3cd62bd 100644 --- a/src/frame/contracts.rs +++ b/src/frame/contracts.rs @@ -139,11 +139,7 @@ pub fn call( #[cfg(test)] mod tests { - use codec::{ - Codec, - Error as CodecError, - }; - use futures::Future; + use codec::Codec; use sp_core::Pair; use sp_keyring::AccountKeyring; use sp_runtime::traits::{ @@ -154,7 +150,7 @@ mod tests { use super::events; use crate::{ frame::contracts::MODULE, - tests::test_setup, + tests::test_client, Balances, Client, DefaultNodeRuntime as Runtime, @@ -164,17 +160,16 @@ mod tests { type AccountId = ::AccountId; - fn put_code( + async fn put_code( client: &Client, signer: P, - ) -> impl Future>, Error = Error> + ) -> Result where T: System + Balances + Send + Sync, T::Address: From, P: Pair, P::Signature: Codec, - S: 'static, - S: Verify + Codec + From, + S: Verify + Codec + From + 'static, S::Signer: From + IdentifyAccount, { const CONTRACT: &str = r#" @@ -185,70 +180,64 @@ mod tests { "#; let wasm = wabt::wat2wasm(CONTRACT).expect("invalid wabt"); - client.xt(signer, None).and_then(|xt| { - xt.watch() - .submit(super::put_code(500_000, wasm)) - .map(|result| result.find_event::(MODULE, events::CODE_STORED)) - }) + let xt = client.xt(signer, None).await?; + + let result = xt.watch().submit(super::put_code(500_000, wasm)).await?; + let code_hash = result + .find_event::(MODULE, events::CODE_STORED) + .ok_or(Error::Other("Failed to find CodeStored event".into()))??; + + Ok(code_hash) } #[test] #[ignore] // requires locally running substrate node fn tx_put_code() { - let (mut rt, client) = test_setup(); - - let signer = AccountKeyring::Alice.pair(); - let code_hash = rt.block_on(put_code(&client, signer)).unwrap(); + env_logger::try_init().ok(); + let code_hash: Result<_, Error> = async_std::task::block_on(async move { + let signer = AccountKeyring::Alice.pair(); + let client = test_client().await; + let code_hash = put_code(&client, signer).await?; + Ok(code_hash) + }); assert!( - code_hash.is_some(), - "Contracts CodeStored event should be present" - ); - assert!( - code_hash.unwrap().is_ok(), - "CodeStored Hash should decode successfully" + code_hash.is_ok(), + "Contracts CodeStored event should be received and decoded" ); } #[test] #[ignore] // requires locally running substrate node fn tx_instantiate() { - let (mut rt, client) = test_setup(); + env_logger::try_init().ok(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let signer = AccountKeyring::Alice.pair(); + let client = test_client().await; - let signer = AccountKeyring::Alice.pair(); - let code_hash = rt - .block_on(put_code(&client, signer.clone())) - .unwrap() - .unwrap() - .unwrap(); + let code_hash = put_code(&client, signer.clone()).await?; - println!("{:?}", code_hash); + println!("{:?}", code_hash); - let instantiate = client.xt(signer, None).and_then(move |xt| { - xt.watch() + let xt = client.xt(signer, None).await?; + let result = xt + .watch() .submit(super::instantiate::( 100_000_000_000_000, 500_000, code_hash, Vec::new(), )) - .map(|result| { - result.find_event::<(AccountId, AccountId)>( - MODULE, - events::INSTANTIATED, - ) - }) + .await?; + let event = result + .find_event::<(AccountId, AccountId)>(MODULE, events::INSTANTIATED) + .ok_or(Error::Other("Failed to find Instantiated event".into()))??; + Ok(event) }); - let result = rt.block_on(instantiate).unwrap(); - assert!( - result.is_some(), - "Contracts Instantiated event should be present" - ); - assert!( - result.unwrap().is_ok(), - "Instantiated Event should decode successfully" + result.is_ok(), + "Contracts CodeStored event should be received and decoded" ); } } diff --git a/src/frame/system.rs b/src/frame/system.rs index ee803cdc04..7df1db866f 100644 --- a/src/frame/system.rs +++ b/src/frame/system.rs @@ -16,16 +16,13 @@ //! Implements support for the frame_system module. -use std::fmt::Debug; - use codec::Codec; +use frame_support::Parameter; use futures::future::{ self, Future, }; use serde::de::DeserializeOwned; - -use frame_support::Parameter; use sp_runtime::traits::{ Bounded, CheckEqual, @@ -39,6 +36,10 @@ use sp_runtime::traits::{ SimpleArithmetic, SimpleBitOps, }; +use std::{ + fmt::Debug, + pin::Pin, +}; use crate::{ error::Error, @@ -123,17 +124,22 @@ pub trait SystemStore { fn account_nonce( &self, account_id: ::AccountId, - ) -> Box::Index, Error = Error> + Send>; + ) -> Pin< + Box::Index, Error>> + Send>, + >; } -impl SystemStore for Client { +impl SystemStore + for Client +{ type System = T; fn account_nonce( &self, account_id: ::AccountId, - ) -> Box::Index, Error = Error> + Send> - { + ) -> Pin< + Box::Index, Error>> + Send>, + > { let account_nonce_map = || { Ok(self .metadata @@ -143,9 +149,14 @@ impl SystemStore for Client { }; let map = match account_nonce_map() { Ok(map) => map, - Err(err) => return Box::new(future::err(err)), + Err(err) => return Box::pin(future::err(err)), }; - Box::new(self.fetch_or(map.key(account_id), None, map.default())) + let client = self.clone(); + Box::pin(async move { + client + .fetch_or(map.key(account_id), None, map.default()) + .await + }) } } diff --git a/src/lib.rs b/src/lib.rs index fc3619213e..39582d4c61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,8 +17,27 @@ //! A library to **sub**mit e**xt**rinsics to a //! [substrate](https://github.com/paritytech/substrate) node via RPC. -#![deny(missing_docs)] -#![deny(warnings)] +#![deny( + bad_style, + const_err, + improper_ctypes, + missing_docs, + non_shorthand_field_patterns, + no_mangle_generic_items, + overflowing_literals, + path_statements, + patterns_in_fns_without_body, + plugin_as_library, + private_in_public, + unconditional_recursion, + unused_allocation, + unused_comparisons, + unused_parens, + while_true, + trivial_casts, + trivial_numeric_casts, + unused_extern_crates +)] #![allow(clippy::type_complexity)] use std::{ @@ -31,13 +50,8 @@ use codec::{ Decode, Encode, }; -use futures::future::{ - self, - Either, - Future, - IntoFuture, -}; -use jsonrpc_core_client::transports::ws; +use futures::future; +use jsonrpsee::client::Subscription; use sp_core::{ storage::{ StorageChangeSet, @@ -54,7 +68,6 @@ use sp_runtime::{ MultiSignature, }; use sp_version::RuntimeVersion; -use url::Url; mod error; mod events; @@ -93,20 +106,15 @@ use self::{ rpc::{ BlockNumber, ChainBlock, - MapStream, Rpc, }, }; -fn connect(url: &Url) -> impl Future, Error = Error> { - ws::connect(url).map_err(Into::into) -} - /// ClientBuilder for constructing a Client. #[derive(Default)] pub struct ClientBuilder { _marker: std::marker::PhantomData<(T, S)>, - url: Option, + url: Option, } impl ClientBuilder { @@ -119,35 +127,35 @@ impl ClientBuilder { } /// Set the substrate rpc address. - pub fn set_url(mut self, url: Url) -> Self { - self.url = Some(url); + pub fn set_url(mut self, url: &str) -> Self { + self.url = Some(url.to_string()); self } /// Creates a new Client. - pub fn build(self) -> impl Future, Error = Error> { - let url = self.url.unwrap_or_else(|| { - Url::parse("ws://127.0.0.1:9944").expect("Is valid url; qed") - }); - connect::(&url).and_then(|rpc| { - rpc.metadata() - .join3(rpc.genesis_hash(), rpc.runtime_version(None)) - .map(|(metadata, genesis_hash, runtime_version)| { - Client { - url, - genesis_hash, - metadata, - runtime_version, - _marker: PhantomData, - } - }) + pub async fn build(self) -> Result, Error> { + let url = self.url.unwrap_or("ws://127.0.0.1:9944".to_string()); + let rpc = Rpc::connect_ws(&url).await?; + + let (metadata, genesis_hash, runtime_version) = future::join3( + rpc.metadata(), + rpc.genesis_hash(), + rpc.runtime_version(None), + ) + .await; + Ok(Client { + rpc, + genesis_hash: genesis_hash?, + metadata: metadata?, + runtime_version: runtime_version?, + _marker: PhantomData, }) } } /// Client to interface with a substrate node. pub struct Client { - url: Url, + rpc: Rpc, genesis_hash: T::Hash, metadata: Metadata, runtime_version: RuntimeVersion, @@ -157,7 +165,7 @@ pub struct Client { impl Clone for Client { fn clone(&self) -> Self { Self { - url: self.url.clone(), + rpc: self.rpc.clone(), genesis_hash: self.genesis_hash, metadata: self.metadata.clone(), runtime_version: self.runtime_version.clone(), @@ -166,120 +174,136 @@ impl Clone for Client { } } -impl Client { - fn connect(&self) -> impl Future, Error = Error> { - connect(&self.url) - } - +impl Client { /// Returns the chain metadata. pub fn metadata(&self) -> &Metadata { &self.metadata } /// Fetch a StorageKey. - pub fn fetch( + pub async fn fetch( &self, key: StorageKey, hash: Option, - ) -> impl Future, Error = Error> { - self.connect().and_then(move |rpc| rpc.storage::(key, hash)) + ) -> Result, Error> { + self.rpc.storage::(key, hash).await } /// Fetch a StorageKey or return the default. - pub fn fetch_or( + pub async fn fetch_or( &self, key: StorageKey, hash: Option, default: V, - ) -> impl Future { - self.fetch(key, hash).map(|value| value.unwrap_or(default)) + ) -> Result { + let result = self.fetch(key, hash).await?; + Ok(result.unwrap_or(default)) } /// Fetch a StorageKey or return the default. - pub fn fetch_or_default( + pub async fn fetch_or_default( &self, key: StorageKey, hash: Option, - ) -> impl Future { - self.fetch(key, hash).map(|value| value.unwrap_or_default()) + ) -> Result { + let result = self.fetch(key, hash).await?; + Ok(result.unwrap_or_default()) } /// Get a block hash. By default returns the latest block hash - pub fn block_hash( + pub async fn block_hash( &self, - height: Option>, - ) -> impl Future, Error = Error> { - self.connect() - .and_then(|rpc| rpc.block_hash(height.map(|h| h))) + block_number: Option>, + ) -> Result, Error> { + let hash = self.rpc.block_hash(block_number).await?; + Ok(hash) } /// Get a block hash of the latest finalized block - pub fn finalized_head(&self) -> impl Future { - self.connect().and_then(|rpc| rpc.finalized_head()) + pub async fn finalized_head(&self) -> Result { + let head = self.rpc.finalized_head().await?; + Ok(head) } /// Get a block - pub fn block( - &self, - hash: Option, - ) -> impl Future>, Error = Error> + pub async fn block(&self, hash: Option) -> Result>, Error> where H: Into + 'static, { - self.connect() - .and_then(|rpc| rpc.block(hash.map(|h| h.into()))) + let block = self.rpc.block(hash.map(|h| h.into())).await?; + Ok(block) + } + + /// Create and submit an extrinsic and return corresponding Hash if successful + pub async fn submit_extrinsic( + &self, + extrinsic: E, + ) -> Result { + let xt_hash = self.rpc.submit_extrinsic(extrinsic).await?; + Ok(xt_hash) + } + + /// Create and submit an extrinsic and return corresponding Event if successful + pub async fn submit_and_watch_extrinsic( + self, + extrinsic: E, + decoder: EventsDecoder, + ) -> Result, Error> { + let success = self + .rpc + .submit_and_watch_extrinsic(extrinsic, decoder) + .await?; + Ok(success) } /// Subscribe to events. - pub fn subscribe_events( + pub async fn subscribe_events( &self, - ) -> impl Future>, Error = Error> { - self.connect().and_then(|rpc| rpc.subscribe_events()) + ) -> Result>, Error> { + let events = self.rpc.subscribe_events().await?; + Ok(events) } /// Subscribe to new blocks. - pub fn subscribe_blocks( - &self, - ) -> impl Future, Error = Error> { - self.connect().and_then(|rpc| rpc.subscribe_blocks()) + pub async fn subscribe_blocks(&self) -> Result, Error> { + let headers = self.rpc.subscribe_blocks().await?; + Ok(headers) } /// Subscribe to finalized blocks. - pub fn subscribe_finalized_blocks( + pub async fn subscribe_finalized_blocks( &self, - ) -> impl Future, Error = Error> { - self.connect() - .and_then(|rpc| rpc.subscribe_finalized_blocks()) + ) -> Result, Error> { + let headers = self.rpc.subscribe_finalized_blocks().await?; + Ok(headers) } /// Create a transaction builder for a private key. - pub fn xt

( + pub async fn xt

( &self, signer: P, nonce: Option, - ) -> impl Future, Error = Error> + ) -> Result, Error> where P: Pair, P::Signature: Codec, S: Verify, S::Signer: From + IdentifyAccount, { - let client = self.clone(); let account_id = S::Signer::from(signer.public()).into_account(); - match nonce { - Some(nonce) => Either::A(future::ok(nonce)), - None => Either::B(self.account_nonce(account_id)), - } - .map(|nonce| { - let genesis_hash = client.genesis_hash; - let runtime_version = client.runtime_version.clone(); - XtBuilder { - client, - nonce, - runtime_version, - genesis_hash, - signer, - } + let nonce = match nonce { + Some(nonce) => nonce, + None => self.account_nonce(account_id).await?, + }; + + let genesis_hash = self.genesis_hash; + let runtime_version = self.runtime_version.clone(); + Ok(XtBuilder { + client: self.clone(), + nonce, + runtime_version, + genesis_hash, + signer, }) } } @@ -294,7 +318,7 @@ pub struct XtBuilder { signer: P, } -impl XtBuilder +impl XtBuilder where P: Pair, { @@ -365,17 +389,10 @@ where } /// Submits a transaction to the chain. - pub fn submit( - &self, - call: Call, - ) -> impl Future { - let cli = self.client.connect(); - self.create_and_sign(call) - .into_future() - .map_err(Into::into) - .and_then(move |extrinsic| { - cli.and_then(move |rpc| rpc.submit_extrinsic(extrinsic)) - }) + pub async fn submit(&self, call: Call) -> Result { + let extrinsic = self.create_and_sign(call)?; + let xt_hash = self.client.submit_extrinsic(extrinsic).await?; + Ok(xt_hash) } /// Submits transaction to the chain and watch for events. @@ -422,24 +439,17 @@ where } /// Submits transaction to the chain and watch for events. - pub fn submit( + pub async fn submit( self, call: Call, - ) -> impl Future, Error = Error> { - let cli = self.client.connect(); - let decoder = self.decoder.into_future().map_err(Into::into); - - self.builder - .create_and_sign(call) - .into_future() - .map_err(Into::into) - .join(decoder) - .and_then(move |(extrinsic, decoder)| { - decoder.check_missing_type_sizes(); - cli.and_then(move |rpc| { - rpc.submit_and_watch_extrinsic(extrinsic, decoder) - }) - }) + ) -> Result, Error> { + let decoder = self.decoder?; + let extrinsic = self.builder.create_and_sign(call)?; + let xt_success = self + .client + .submit_and_watch_extrinsic(extrinsic, decoder) + .await?; + Ok(xt_success) } } @@ -458,7 +468,6 @@ impl codec::Encode for Encoded { mod tests { use codec::Encode; use frame_support::StorageMap; - use futures::stream::Stream; use sp_core::storage::StorageKey; use sp_keyring::AccountKeyring; @@ -469,6 +478,7 @@ mod tests { BalancesStore, }, DefaultNodeRuntime as Runtime, + Error, }; type Index = ::Index; @@ -476,85 +486,104 @@ mod tests { type Address = ::Address; type Balance = ::Balance; - pub(crate) fn test_setup() -> (tokio::runtime::Runtime, Client) { - env_logger::try_init().ok(); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let client_future = ClientBuilder::::new().build(); - let client = rt.block_on(client_future).unwrap(); - (rt, client) + pub(crate) async fn test_client() -> Client { + ClientBuilder::::new() + .build() + .await + .expect("Error creating client") } #[test] #[ignore] // requires locally running substrate node fn test_tx_transfer_balance() { - let (mut rt, client) = test_setup(); + env_logger::try_init().ok(); + let transfer = async_std::task::block_on(async move { + let signer = AccountKeyring::Alice.pair(); + let dest = AccountKeyring::Bob.to_account_id(); - let signer = AccountKeyring::Alice.pair(); - let mut xt = rt.block_on(client.xt(signer, None)).unwrap(); + let client = test_client().await; + let mut xt = client.xt(signer, None).await?; + let _ = xt + .submit(balances::transfer::(dest.clone().into(), 10_000)) + .await?; - let dest = AccountKeyring::Bob.to_account_id(); - let transfer = - xt.submit(balances::transfer::(dest.clone().into(), 10_000)); - rt.block_on(transfer).unwrap(); + // check that nonce is handled correctly + xt.increment_nonce() + .submit(balances::transfer::(dest.clone().into(), 10_000)) + .await + }); - // check that nonce is handled correctly - let transfer = xt - .increment_nonce() - .submit(balances::transfer::(dest.clone().into(), 10_000)); - - rt.block_on(transfer).unwrap(); + assert!(transfer.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_getting_hash() { - let (mut rt, client) = test_setup(); - rt.block_on(client.block_hash(None)).unwrap(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let client = test_client().await; + let block_hash = client.block_hash(None).await?; + Ok(block_hash) + }); + + assert!(result.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_getting_block() { - let (mut rt, client) = test_setup(); - rt.block_on(client.block_hash(None).and_then(move |h| client.block(h))) - .unwrap(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let client = test_client().await; + let block_hash = client.block_hash(None).await?; + let block = client.block(block_hash).await?; + Ok(block) + }); + + assert!(result.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_state_read_free_balance() { - let (mut rt, client) = test_setup(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let account = AccountKeyring::Alice.to_account_id(); + let client = test_client().await; + let balance = client.free_balance(account.into()).await?; + Ok(balance) + }); - let account = AccountKeyring::Alice.to_account_id(); - rt.block_on(client.free_balance(account.into())).unwrap(); + assert!(result.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_chain_subscribe_blocks() { - let (mut rt, client) = test_setup(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let client = test_client().await; + let mut blocks = client.subscribe_blocks().await?; + let block = blocks.next().await; + Ok(block) + }); - let stream = rt.block_on(client.subscribe_blocks()).unwrap(); - let (_header, _) = rt - .block_on(stream.into_future().map_err(|(e, _)| e)) - .unwrap(); + assert!(result.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_chain_subscribe_finalized_blocks() { - let (mut rt, client) = test_setup(); + let result: Result<_, Error> = async_std::task::block_on(async move { + let client = test_client().await; + let mut blocks = client.subscribe_finalized_blocks().await?; + let block = blocks.next().await; + Ok(block) + }); - let stream = rt.block_on(client.subscribe_finalized_blocks()).unwrap(); - let (_header, _) = rt - .block_on(stream.into_future().map_err(|(e, _)| e)) - .unwrap(); + assert!(result.is_ok()) } #[test] #[ignore] // requires locally running substrate node fn test_chain_read_metadata() { - let (_, client) = test_setup(); + let client = async_std::task::block_on(test_client()); let balances = client.metadata().module_with_calls("Balances").unwrap(); let dest = sp_keyring::AccountKeyring::Bob.to_account_id(); diff --git a/src/metadata.rs b/src/metadata.rs index d84978fc61..c451535d42 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -41,6 +41,8 @@ use crate::Encoded; #[derive(Debug, thiserror::Error)] pub enum MetadataError { + #[error("Error converting substrate metadata: {0}")] + Conversion(#[from] ConversionError), #[error("Module not found")] ModuleNotFound(String), #[error("Module with events not found")] @@ -284,14 +286,14 @@ pub enum EventArg { } impl FromStr for EventArg { - type Err = Error; + type Err = ConversionError; fn from_str(s: &str) -> Result { if s.starts_with("Vec<") { if s.ends_with('>') { Ok(EventArg::Vec(Box::new(s[4..s.len() - 1].parse()?))) } else { - Err(Error::InvalidEventArg( + Err(ConversionError::InvalidEventArg( s.to_string(), "Expected closing `>` for `Vec`", )) @@ -305,7 +307,7 @@ impl FromStr for EventArg { } Ok(EventArg::Tuple(args)) } else { - Err(Error::InvalidEventArg( + Err(ConversionError::InvalidEventArg( s.to_string(), "Expecting closing `)` for tuple", )) @@ -333,24 +335,28 @@ impl EventArg { } } -#[derive(Debug)] -pub enum Error { +#[derive(Debug, thiserror::Error)] +pub enum ConversionError { + #[error("Invalid prefix")] InvalidPrefix, + #[error("Invalid version")] InvalidVersion, + #[error("Expected DecodeDifferent::Decoded")] ExpectedDecoded, + #[error("Invalid event arg {0}")] InvalidEventArg(String, &'static str), } impl TryFrom for Metadata { - type Error = Error; + type Error = MetadataError; fn try_from(metadata: RuntimeMetadataPrefixed) -> Result { if metadata.0 != META_RESERVED { - return Err(Error::InvalidPrefix) + return Err(ConversionError::InvalidPrefix.into()) } let meta = match metadata.1 { RuntimeMetadata::V10(meta) => meta, - _ => return Err(Error::InvalidVersion), + _ => return Err(ConversionError::InvalidVersion.into()), }; let mut modules = HashMap::new(); let mut modules_with_calls = HashMap::new(); @@ -417,16 +423,18 @@ impl TryFrom for Metadata { } } -fn convert(dd: DecodeDifferent) -> Result { +fn convert( + dd: DecodeDifferent, +) -> Result { match dd { DecodeDifferent::Decoded(value) => Ok(value), - _ => Err(Error::ExpectedDecoded), + _ => Err(ConversionError::ExpectedDecoded), } } fn convert_event( event: frame_metadata::EventMetadata, -) -> Result { +) -> Result { let name = convert(event.name)?; let mut arguments = Vec::new(); for arg in convert(event.arguments)? { @@ -440,7 +448,7 @@ fn convert_entry( module_prefix: String, storage_prefix: String, entry: frame_metadata::StorageEntryMetadata, -) -> Result { +) -> Result { let default = convert(entry.default)?; Ok(StorageMetadata { module_prefix, diff --git a/src/rpc.rs b/src/rpc.rs index 069186dd5f..35cf258ad3 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -21,35 +21,26 @@ use codec::{ Encode, Error as CodecError, }; -use futures::{ - future::{ - self, - Future, - IntoFuture, - }, - stream::{ - self, - Stream, +use jsonrpsee::{ + client::Subscription, + core::common::{ + to_value as to_json_value, + Params, }, + Client, }; -use jsonrpc_core_client::{ - RpcChannel, - TypedSubscriptionStream, -}; + use num_traits::bounds::Bounded; use frame_metadata::RuntimeMetadataPrefixed; -use sc_rpc_api::{ - author::AuthorClient, - chain::ChainClient, - state::StateClient, -}; use sp_core::{ storage::{ StorageChangeSet, + StorageData, StorageKey, }, twox_128, + Bytes, }; use sp_rpc::{ list::ListOrValue, @@ -64,6 +55,7 @@ use sp_runtime::{ }; use sp_transaction_pool::TransactionStatus; use sp_version::RuntimeVersion; +use std::marker::PhantomData; use crate::{ error::Error, @@ -83,265 +75,251 @@ use crate::{ metadata::Metadata, }; -pub type ChainBlock = SignedBlock::Header, ::Extrinsic>>; +pub type ChainBlock = + SignedBlock::Header, ::Extrinsic>>; pub type BlockNumber = NumberOrHex<::BlockNumber>; /// Client for substrate rpc interfaces +#[derive(Clone)] pub struct Rpc { - state: StateClient, - chain: ChainClient>, - author: AuthorClient, + client: Client, + marker: std::marker::PhantomData, } -/// Allows connecting to all inner interfaces on the same RpcChannel -impl From for Rpc { - fn from(channel: RpcChannel) -> Self { - Self { - state: channel.clone().into(), - chain: channel.clone().into(), - author: channel.into(), - } +impl Rpc +where + T: System, +{ + pub async fn connect_ws(url: &str) -> Result { + let raw_client = jsonrpsee::ws::ws_raw_client(&url).await?; + Ok(Rpc { + client: raw_client.into(), + marker: PhantomData, + }) } -} -impl Rpc { /// Fetch a storage key - pub fn storage( + pub async fn storage( &self, key: StorageKey, hash: Option, - ) -> impl Future, Error = Error> { - self.state - .storage(key, hash) - .map_err(Into::into) - .and_then(|data| { - match data { - Some(data) => { - let value = Decode::decode(&mut &data.0[..])?; - Ok(Some(value)) - } - None => Ok(None), - } - }) + ) -> Result, Error> { + // todo: update jsonrpsee::rpc_api! macro to accept shared Client (currently only RawClient) + // until then we manually construct params here and in other methods + let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]); + let data: Option = + self.client.request("state_getStorage", params).await?; + match data { + Some(data) => { + let value = Decode::decode(&mut &data.0[..])?; + Ok(Some(value)) + } + None => Ok(None), + } } /// Fetch the genesis hash - pub fn genesis_hash(&self) -> impl Future { - let block_zero = T::BlockNumber::min_value(); - self.chain - .block_hash(Some(ListOrValue::Value(NumberOrHex::Number(block_zero)))) - .map_err(Into::into) - .and_then(|list_or_value| { - future::result(match list_or_value { - ListOrValue::Value(genesis_hash) => { - genesis_hash.ok_or_else(|| "Genesis hash not found".into()) - } - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - }) - }) + pub async fn genesis_hash(&self) -> Result { + let block_zero = Some(ListOrValue::Value(NumberOrHex::Number( + T::BlockNumber::min_value(), + ))); + let params = Params::Array(vec![to_json_value(block_zero)?]); + let list_or_value: ListOrValue> = + self.client.request("chain_getBlockHash", params).await?; + match list_or_value { + ListOrValue::Value(genesis_hash) => { + genesis_hash.ok_or_else(|| "Genesis hash not found".into()) + } + ListOrValue::List(_) => Err("Expected a Value, got a List".into()), + } } /// Fetch the metadata - pub fn metadata(&self) -> impl Future { - self.state - .metadata(None) - .map_err(Into::into) - .and_then(|bytes| { - let result = Decode::decode(&mut &bytes[..]) - .map_err(Into::into) - .and_then(|meta: RuntimeMetadataPrefixed| { - meta.try_into().map_err(|err| format!("{:?}", err).into()) - }); - future::result(result) - }) + pub async fn metadata(&self) -> Result { + let bytes: Bytes = self + .client + .request("state_getMetadata", Params::None) + .await?; + let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?; + let metadata: Metadata = meta.try_into()?; + Ok(metadata) } /// Get a block hash, returns hash of latest block by default - pub fn block_hash( + pub async fn block_hash( &self, block_number: Option>, - ) -> impl Future, Error = Error> { - self.chain - .block_hash(block_number.map(|bn| ListOrValue::Value(bn))) - .map_err(Into::into) - .and_then(|list_or_value| { - match list_or_value { - ListOrValue::Value(hash) => Ok(hash), - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } - }) + ) -> Result, Error> { + let block_number = block_number.map(|bn| ListOrValue::Value(bn)); + let params = Params::Array(vec![to_json_value(block_number)?]); + let list_or_value = self.client.request("chain_getBlockHash", params).await?; + match list_or_value { + ListOrValue::Value(hash) => Ok(hash), + ListOrValue::List(_) => Err("Expected a Value, got a List".into()), + } } /// Get a block hash of the latest finalized block - pub fn finalized_head(&self) -> impl Future { - self.chain.finalized_head().map_err(Into::into) + pub async fn finalized_head(&self) -> Result { + let hash = self + .client + .request("chain_getFinalizedHead", Params::None) + .await?; + Ok(hash) } /// Get a Block - pub fn block( + pub async fn block( &self, hash: Option, - ) -> impl Future>, Error = Error> { - self.chain.block(hash).map_err(Into::into) + ) -> Result>, Error> { + let params = Params::Array(vec![to_json_value(hash)?]); + let block = self.client.request("chain_getBlock", params).await?; + Ok(block) } /// Fetch the runtime version - pub fn runtime_version( + pub async fn runtime_version( &self, at: Option, - ) -> impl Future { - self.state.runtime_version(at).map_err(Into::into) + ) -> Result { + let params = Params::Array(vec![to_json_value(at)?]); + let version = self + .client + .request("state_getRuntimeVersion", params) + .await?; + Ok(version) } } -type MapClosure = Box T + Send>; -pub type MapStream = stream::Map, MapClosure>; - impl Rpc { /// Subscribe to substrate System Events - pub fn subscribe_events( + pub async fn subscribe_events( &self, - ) -> impl Future::Hash>>, Error = Error> - { + ) -> Result::Hash>>, Error> { let mut storage_key = twox_128(b"System").to_vec(); storage_key.extend(twox_128(b"Events").to_vec()); log::debug!("Events storage key {:?}", hex::encode(&storage_key)); - let closure: MapClosure::Hash>> = - Box::new(|event| { - log::debug!( - "Event {:?}", - event - .changes - .iter() - .map(|(k, v)| { - (hex::encode(&k.0), v.as_ref().map(|v| hex::encode(&v.0))) - }) - .collect::>() - ); - event - }); - self.state - .subscribe_storage(Some(vec![StorageKey(storage_key)])) - .map(|stream: TypedSubscriptionStream<_>| stream.map(closure)) - .map_err(Into::into) + let keys = Some(vec![StorageKey(storage_key)]); + let params = Params::Array(vec![to_json_value(keys)?]); + + let subscription = self + .client + .subscribe("state_subscribeStorage", params, "state_unsubscribeStorage") + .await?; + Ok(subscription) } /// Subscribe to blocks. - pub fn subscribe_blocks( - &self, - ) -> impl Future, Error = Error> { - let closure: MapClosure = Box::new(|event| { - log::info!("New block {:?}", event); - event - }); - self.chain - .subscribe_new_heads() - .map(|stream| stream.map(closure)) - .map_err(Into::into) + pub async fn subscribe_blocks(&self) -> Result, Error> { + let subscription = self + .client + .subscribe( + "chain_subscribeNewHeads", + Params::None, + "chain_subscribeNewHeads", + ) + .await?; + + Ok(subscription) } /// Subscribe to finalized blocks. - pub fn subscribe_finalized_blocks( + pub async fn subscribe_finalized_blocks( &self, - ) -> impl Future, Error = Error> { - let closure: MapClosure = Box::new(|event| { - log::info!("Finalized block {:?}", event); - event - }); - self.chain - .subscribe_finalized_heads() - .map(|stream| stream.map(closure)) - .map_err(Into::into) + ) -> Result, Error> { + let subscription = self + .client + .subscribe( + "chain_subscribeFinalizedHeads", + Params::None, + "chain_subscribeFinalizedHeads", + ) + .await?; + Ok(subscription) } /// Create and submit an extrinsic and return corresponding Hash if successful - pub fn submit_extrinsic( - self, + pub async fn submit_extrinsic( + &self, extrinsic: E, - ) -> impl Future - where - E: Encode, - { - self.author - .submit_extrinsic(extrinsic.encode().into()) - .map_err(Into::into) + ) -> Result { + let bytes: Bytes = extrinsic.encode().into(); + let params = Params::Array(vec![to_json_value(bytes)?]); + let xt_hash = self + .client + .request("author_submitExtrinsic", params) + .await?; + Ok(xt_hash) + } + + pub async fn watch_extrinsic( + &self, + extrinsic: E, + ) -> Result>, Error> { + let bytes: Bytes = extrinsic.encode().into(); + let params = Params::Array(vec![to_json_value(bytes)?]); + let subscription = self + .client + .subscribe( + "author_submitAndWatchExtrinsic", + params, + "author_unwatchExtrinsic", + ) + .await?; + Ok(subscription) } /// Create and submit an extrinsic and return corresponding Event if successful - pub fn submit_and_watch_extrinsic( + pub async fn submit_and_watch_extrinsic( self, extrinsic: E, decoder: EventsDecoder, - ) -> impl Future, Error = Error> - where - E: Encode, - { - let events = self.subscribe_events().map_err(Into::into); - events.and_then(move |events| { - let ext_hash = T::Hashing::hash_of(&extrinsic); - log::info!("Submitting Extrinsic `{:?}`", ext_hash); + ) -> Result, Error> { + let ext_hash = T::Hashing::hash_of(&extrinsic); + log::info!("Submitting Extrinsic `{:?}`", ext_hash); - let chain = self.chain.clone(); - self.author - .watch_extrinsic(extrinsic.encode().into()) - .map_err(Into::into) - .and_then(|stream| { - stream - .filter_map(|status| { - log::info!("received status {:?}", status); - match status { - // ignore in progress extrinsic for now - TransactionStatus::Future - | TransactionStatus::Ready - | TransactionStatus::Broadcast(_) => None, - TransactionStatus::InBlock(block_hash) => { - Some(Ok(block_hash)) - } - TransactionStatus::Usurped(_) => { - Some(Err("Extrinsic Usurped".into())) - } - TransactionStatus::Dropped => { - Some(Err("Extrinsic Dropped".into())) - } - TransactionStatus::Invalid => { - Some(Err("Extrinsic Invalid".into())) - } - } - }) - .into_future() - .map_err(|(e, _)| e.into()) - .and_then(|(result, _)| { - log::info!("received result {:?}", result); + let events_sub = self.subscribe_events().await?; + let mut xt_sub = self.watch_extrinsic(extrinsic).await?; - result - .ok_or_else(|| Error::from("Stream terminated")) - .and_then(|r| r) - .into_future() - }) - }) - .and_then(move |bh| { - log::info!("Fetching block {:?}", bh); - chain - .block(Some(bh)) - .map(move |b| (bh, b)) - .map_err(Into::into) - }) - .and_then(|(h, b)| { - b.ok_or_else(|| format!("Failed to find block {:?}", h).into()) - .map(|b| (h, b)) - .into_future() - }) - .and_then(move |(bh, sb)| { - log::info!( - "Found block {:?}, with {} extrinsics", - bh, - sb.block.extrinsics.len() - ); - - wait_for_block_events(decoder, ext_hash, sb, bh, events) - }) - }) + while let status = xt_sub.next().await { + log::info!("received status {:?}", status); + match status { + // ignore in progress extrinsic for now + TransactionStatus::Future + | TransactionStatus::Ready + | TransactionStatus::Broadcast(_) => continue, + TransactionStatus::InBlock(block_hash) => { + log::info!("Fetching block {:?}", block_hash); + let block = self.block(Some(block_hash)).await?; + return match block { + Some(signed_block) => { + log::info!( + "Found block {:?}, with {} extrinsics", + block_hash, + signed_block.block.extrinsics.len() + ); + wait_for_block_events( + decoder, + ext_hash, + signed_block, + block_hash, + events_sub, + ) + .await + } + None => { + Err(format!("Failed to find block {:?}", block_hash).into()) + } + } + } + TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()), + TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()), + TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()), + } + } + unreachable!() } } @@ -399,13 +377,13 @@ impl ExtrinsicSuccess { } /// Waits for events for the block triggered by the extrinsic -pub fn wait_for_block_events( +pub async fn wait_for_block_events( decoder: EventsDecoder, ext_hash: T::Hash, signed_block: ChainBlock, block_hash: T::Hash, - events_stream: MapStream>, -) -> impl Future, Error = Error> { + events_subscription: Subscription>, +) -> Result, Error> { let ext_index = signed_block .block .extrinsics @@ -415,43 +393,41 @@ pub fn wait_for_block_events( hash == ext_hash }) .ok_or_else(|| { - format!("Failed to find Extrinsic with hash {:?}", ext_hash).into() - }) - .into_future(); + Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash)) + })?; - events_stream - .filter(move |event| event.block == block_hash) - .into_future() - .map_err(|(e, _)| e.into()) - .join(ext_index) - .and_then(move |((change_set, _), ext_index)| { - let events = match change_set { - None => Vec::new(), - Some(change_set) => { - let mut events = Vec::new(); - for (_key, data) in change_set.changes { - if let Some(data) = data { - match decoder.decode_events(&mut &data.0[..]) { - Ok(raw_events) => { - for (phase, event) in raw_events { - if let Phase::ApplyExtrinsic(i) = phase { - if i as usize == ext_index { - events.push(event) - } - } - } + let mut subscription = events_subscription; + while let change_set = subscription.next().await { + // only interested in events for the given block + if change_set.block != block_hash { + continue + } + let mut events = Vec::new(); + for (_key, data) in change_set.changes { + if let Some(data) = data { + match decoder.decode_events(&mut &data.0[..]) { + Ok(raw_events) => { + for (phase, event) in raw_events { + if let Phase::ApplyExtrinsic(i) = phase { + if i as usize == ext_index { + events.push(event) } - Err(err) => return future::err(err.into()), } } } - events + Err(err) => return Err(err.into()), } - }; - future::ok(ExtrinsicSuccess { + } + } + return if events.len() > 0 { + Ok(ExtrinsicSuccess { block: block_hash, extrinsic: ext_hash, events, }) - }) + } else { + Err(format!("No events found for block {}", block_hash).into()) + } + } + unreachable!() }