mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Convert to std futures (#58)
* WIP * Begin converting rpc layer to use std futures and jsonrpsee * Convert metadata to async/await * Convert block_hash to async/await * Convert more methods to async/await * Remove sp_rpc * Fix more compilation errors * Remove connect * Starting to convert subscription functions * Use jsonrpsee branch from PR for public client types * Implement subscribe events with jsonrpsee subscription * Converting subscriptions and wait_for_block_events * WIP converting lib methods to async * Use shared client reference directly for rpc call `rpc_api!` macro currently only supports RawClient (which cannot be shared). Also supports named params only which is not currently compatible with substrate rpd which accepts only positional params. * Use &self instead of &mut self for shared Client * Convert submit_and_watch to async/await * Convert more Client fns to async * Pin some trait futures * Add serde error * Fix client creation * Fix client request compiler errors * Unify metadata errors * Add WS handshake error variant * Fix some more compiler errors * Fix more compiler errors * Convert submit_extrinsic to async * Convert submit and submit_and_watch * Add Send + Sync constraints * Clone clients * Fix EventArg conversion error * Fix remaining warnings/errors * Replace deny warnings with specific lints * Infallable subscription loops * Use jsonrpsee wss branch * Fix example * Start to fix up tests * Make contracts tests compile * Make some more tests pass * Fix up remaining tests * Fmt * Use correct event storage key type * Fix finding events * Use master jsonrpsee
This commit is contained in:
+4
-4
@@ -13,10 +13,11 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
|
||||
[dependencies]
|
||||
log = "0.4"
|
||||
thiserror = "1.0"
|
||||
futures = "0.1"
|
||||
jsonrpc-core-client = { version = "14.0", features = ["ws"] }
|
||||
futures = "0.3"
|
||||
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", features = ["ws"] }
|
||||
num-traits = { version = "0.2", default-features = false }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
url = "1.7"
|
||||
codec = { package = "parity-scale-codec", version = "1.1", default-features = false, features = ["derive", "full"] }
|
||||
|
||||
@@ -26,14 +27,13 @@ sp-runtime = { git = "https://github.com/paritytech/substrate/", package = "sp-r
|
||||
sp-version = { git = "https://github.com/paritytech/substrate/", package = "sp-version" }
|
||||
pallet-indices = { git = "https://github.com/paritytech/substrate/", package = "pallet-indices" }
|
||||
hex = "0.4.0"
|
||||
sc-rpc-api = { git = "https://github.com/paritytech/substrate/", package = "sc-rpc-api" }
|
||||
sp-rpc = { git = "https://github.com/paritytech/substrate/", package = "sp-rpc" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate/", package = "sp-core" }
|
||||
sp-transaction-pool = { git = "https://github.com/paritytech/substrate/", package = "sp-transaction-pool" }
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = "1.2.0"
|
||||
env_logger = "0.7"
|
||||
tokio = "0.1"
|
||||
wabt = "0.9"
|
||||
frame-system = { git = "https://github.com/paritytech/substrate/", package = "frame-system" }
|
||||
node-runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime" }
|
||||
|
||||
@@ -14,37 +14,41 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::future::Future;
|
||||
use sp_keyring::AccountKeyring;
|
||||
use substrate_subxt::{
|
||||
balances,
|
||||
system::System,
|
||||
DefaultNodeRuntime as Runtime,
|
||||
ExtrinsicSuccess,
|
||||
};
|
||||
|
||||
type AccountId = <Runtime as System>::AccountId;
|
||||
type Balance = <Runtime as balances::Balances>::Balance;
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let result: Result<ExtrinsicSuccess<_>, Box<dyn std::error::Error + 'static>> =
|
||||
async_std::task::block_on(async move {
|
||||
env_logger::init();
|
||||
|
||||
let dest = AccountKeyring::Bob.to_account_id();
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
|
||||
let fut = substrate_subxt::ClientBuilder::<Runtime>::new()
|
||||
.build()
|
||||
.and_then(|cli| cli.xt(signer, None))
|
||||
.and_then(move |xt| {
|
||||
xt.watch()
|
||||
let dest = AccountKeyring::Bob.to_account_id();
|
||||
|
||||
let cli = substrate_subxt::ClientBuilder::<Runtime>::new()
|
||||
.build()
|
||||
.await?;
|
||||
let xt = cli.xt(signer, None).await?;
|
||||
let xt_result = xt
|
||||
.watch()
|
||||
.events_decoder(|decoder| {
|
||||
// for any primitive event with no type size registered
|
||||
decoder.register_type_size::<(u64, u64)>("IdentificationTuple")
|
||||
})
|
||||
.submit(balances::transfer::<Runtime>(dest.clone().into(), 10_000))
|
||||
.await?;
|
||||
Ok(xt_result)
|
||||
});
|
||||
|
||||
let mut rt = tokio::runtime::Runtime::new().unwrap();
|
||||
match rt.block_on(fut) {
|
||||
match result {
|
||||
Ok(extrinsic_success) => {
|
||||
match extrinsic_success
|
||||
.find_event::<(AccountId, AccountId, Balance, Balance)>(
|
||||
@@ -57,6 +61,6 @@ fn main() {
|
||||
None => println!("Failed to find Contracts::CodeStored Event"),
|
||||
}
|
||||
}
|
||||
Err(err) => println!("Error: {}", err),
|
||||
Err(err) => println!("Error: {:?}", err),
|
||||
}
|
||||
}
|
||||
|
||||
+11
-8
@@ -14,7 +14,10 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use jsonrpc_core_client::RpcError;
|
||||
use jsonrpsee::{
|
||||
client::RequestError,
|
||||
ws::WsNewDnsError,
|
||||
};
|
||||
use sp_core::crypto::SecretStringError;
|
||||
use sp_runtime::transaction_validity::TransactionValidityError;
|
||||
|
||||
@@ -34,7 +37,13 @@ pub enum Error {
|
||||
Codec(#[from] codec::Error),
|
||||
/// Rpc error.
|
||||
#[error("Rpc error: {0}")]
|
||||
Rpc(RpcError),
|
||||
Rpc(#[from] RequestError),
|
||||
/// Error that can happen during the initial websocket handshake
|
||||
#[error("Rpc error: {0}")]
|
||||
WsHandshake(#[from] WsNewDnsError),
|
||||
/// Serde serialization error
|
||||
#[error("Serde json error: {0}")]
|
||||
Serialization(#[from] serde_json::error::Error),
|
||||
/// Secret string error.
|
||||
#[error("Secret String Error")]
|
||||
SecretString(SecretStringError),
|
||||
@@ -52,12 +61,6 @@ pub enum Error {
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl From<RpcError> for Error {
|
||||
fn from(error: RpcError) -> Self {
|
||||
Error::Rpc(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SecretStringError> for Error {
|
||||
fn from(error: SecretStringError) -> Self {
|
||||
Error::SecretString(error)
|
||||
|
||||
+2
-2
@@ -205,14 +205,14 @@ impl<T: System + Balances + 'static> EventsDecoder<T> {
|
||||
for _ in 0..len {
|
||||
// decode EventRecord
|
||||
let phase = Phase::decode(input)?;
|
||||
let module_variant = input.read_byte()? as u8;
|
||||
let module_variant = input.read_byte()?;
|
||||
|
||||
let module = self.metadata.module_with_events(module_variant)?;
|
||||
let event = if module.name() == "System" {
|
||||
let system_event = SystemEvent::decode(input)?;
|
||||
RuntimeEvent::System(system_event)
|
||||
} else {
|
||||
let event_variant = input.read_byte()? as u8;
|
||||
let event_variant = input.read_byte()?;
|
||||
let event_metadata = module.event(event_variant)?;
|
||||
|
||||
let mut event_data = Vec::<u8>::new();
|
||||
|
||||
+24
-7
@@ -16,7 +16,10 @@
|
||||
|
||||
//! Implements support for the pallet_balances module.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use futures::future::{
|
||||
self,
|
||||
@@ -74,17 +77,26 @@ pub trait BalancesStore {
|
||||
fn free_balance(
|
||||
&self,
|
||||
account_id: <Self::Balances as System>::AccountId,
|
||||
) -> Box<dyn Future<Item = <Self::Balances as Balances>::Balance, Error = Error> + Send>;
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<<Self::Balances as Balances>::Balance, Error>>
|
||||
+ Send,
|
||||
>,
|
||||
>;
|
||||
}
|
||||
|
||||
impl<T: Balances + 'static, S: 'static> BalancesStore for Client<T, S> {
|
||||
impl<T: Balances + Sync + Send + 'static, S: 'static> BalancesStore for Client<T, S> {
|
||||
type Balances = T;
|
||||
|
||||
fn free_balance(
|
||||
&self,
|
||||
account_id: <Self::Balances as System>::AccountId,
|
||||
) -> Box<dyn Future<Item = <Self::Balances as Balances>::Balance, Error = Error> + Send>
|
||||
{
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<<Self::Balances as Balances>::Balance, Error>>
|
||||
+ Send,
|
||||
>,
|
||||
> {
|
||||
let free_balance_map = || {
|
||||
Ok(self
|
||||
.metadata()
|
||||
@@ -96,9 +108,14 @@ impl<T: Balances + 'static, S: 'static> BalancesStore for Client<T, S> {
|
||||
};
|
||||
let map = match free_balance_map() {
|
||||
Ok(map) => map,
|
||||
Err(err) => return Box::new(future::err(err)),
|
||||
Err(err) => return Box::pin(future::err(err)),
|
||||
};
|
||||
Box::new(self.fetch_or(map.key(account_id), None, map.default()))
|
||||
let client = self.clone();
|
||||
Box::pin(async move {
|
||||
client
|
||||
.fetch_or(map.key(account_id), None, map.default())
|
||||
.await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+38
-49
@@ -139,11 +139,7 @@ pub fn call<T: Contracts>(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use codec::{
|
||||
Codec,
|
||||
Error as CodecError,
|
||||
};
|
||||
use futures::Future;
|
||||
use codec::Codec;
|
||||
use sp_core::Pair;
|
||||
use sp_keyring::AccountKeyring;
|
||||
use sp_runtime::traits::{
|
||||
@@ -154,7 +150,7 @@ mod tests {
|
||||
use super::events;
|
||||
use crate::{
|
||||
frame::contracts::MODULE,
|
||||
tests::test_setup,
|
||||
tests::test_client,
|
||||
Balances,
|
||||
Client,
|
||||
DefaultNodeRuntime as Runtime,
|
||||
@@ -164,17 +160,16 @@ mod tests {
|
||||
|
||||
type AccountId = <Runtime as System>::AccountId;
|
||||
|
||||
fn put_code<T, P, S>(
|
||||
async fn put_code<T, P, S>(
|
||||
client: &Client<T, S>,
|
||||
signer: P,
|
||||
) -> impl Future<Item = Option<Result<T::Hash, CodecError>>, Error = Error>
|
||||
) -> Result<T::Hash, Error>
|
||||
where
|
||||
T: System + Balances + Send + Sync,
|
||||
T::Address: From<T::AccountId>,
|
||||
P: Pair,
|
||||
P::Signature: Codec,
|
||||
S: 'static,
|
||||
S: Verify + Codec + From<P::Signature>,
|
||||
S: Verify + Codec + From<P::Signature> + 'static,
|
||||
S::Signer: From<P::Public> + IdentifyAccount<AccountId = T::AccountId>,
|
||||
{
|
||||
const CONTRACT: &str = r#"
|
||||
@@ -185,70 +180,64 @@ mod tests {
|
||||
"#;
|
||||
let wasm = wabt::wat2wasm(CONTRACT).expect("invalid wabt");
|
||||
|
||||
client.xt(signer, None).and_then(|xt| {
|
||||
xt.watch()
|
||||
.submit(super::put_code(500_000, wasm))
|
||||
.map(|result| result.find_event::<T::Hash>(MODULE, events::CODE_STORED))
|
||||
})
|
||||
let xt = client.xt(signer, None).await?;
|
||||
|
||||
let result = xt.watch().submit(super::put_code(500_000, wasm)).await?;
|
||||
let code_hash = result
|
||||
.find_event::<T::Hash>(MODULE, events::CODE_STORED)
|
||||
.ok_or(Error::Other("Failed to find CodeStored event".into()))??;
|
||||
|
||||
Ok(code_hash)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn tx_put_code() {
|
||||
let (mut rt, client) = test_setup();
|
||||
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let code_hash = rt.block_on(put_code(&client, signer)).unwrap();
|
||||
env_logger::try_init().ok();
|
||||
let code_hash: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let client = test_client().await;
|
||||
let code_hash = put_code(&client, signer).await?;
|
||||
Ok(code_hash)
|
||||
});
|
||||
|
||||
assert!(
|
||||
code_hash.is_some(),
|
||||
"Contracts CodeStored event should be present"
|
||||
);
|
||||
assert!(
|
||||
code_hash.unwrap().is_ok(),
|
||||
"CodeStored Hash should decode successfully"
|
||||
code_hash.is_ok(),
|
||||
"Contracts CodeStored event should be received and decoded"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn tx_instantiate() {
|
||||
let (mut rt, client) = test_setup();
|
||||
env_logger::try_init().ok();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let client = test_client().await;
|
||||
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let code_hash = rt
|
||||
.block_on(put_code(&client, signer.clone()))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let code_hash = put_code(&client, signer.clone()).await?;
|
||||
|
||||
println!("{:?}", code_hash);
|
||||
println!("{:?}", code_hash);
|
||||
|
||||
let instantiate = client.xt(signer, None).and_then(move |xt| {
|
||||
xt.watch()
|
||||
let xt = client.xt(signer, None).await?;
|
||||
let result = xt
|
||||
.watch()
|
||||
.submit(super::instantiate::<Runtime>(
|
||||
100_000_000_000_000,
|
||||
500_000,
|
||||
code_hash,
|
||||
Vec::new(),
|
||||
))
|
||||
.map(|result| {
|
||||
result.find_event::<(AccountId, AccountId)>(
|
||||
MODULE,
|
||||
events::INSTANTIATED,
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
let event = result
|
||||
.find_event::<(AccountId, AccountId)>(MODULE, events::INSTANTIATED)
|
||||
.ok_or(Error::Other("Failed to find Instantiated event".into()))??;
|
||||
Ok(event)
|
||||
});
|
||||
|
||||
let result = rt.block_on(instantiate).unwrap();
|
||||
|
||||
assert!(
|
||||
result.is_some(),
|
||||
"Contracts Instantiated event should be present"
|
||||
);
|
||||
assert!(
|
||||
result.unwrap().is_ok(),
|
||||
"Instantiated Event should decode successfully"
|
||||
result.is_ok(),
|
||||
"Contracts CodeStored event should be received and decoded"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+21
-10
@@ -16,16 +16,13 @@
|
||||
|
||||
//! Implements support for the frame_system module.
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use codec::Codec;
|
||||
use frame_support::Parameter;
|
||||
use futures::future::{
|
||||
self,
|
||||
Future,
|
||||
};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use frame_support::Parameter;
|
||||
use sp_runtime::traits::{
|
||||
Bounded,
|
||||
CheckEqual,
|
||||
@@ -39,6 +36,10 @@ use sp_runtime::traits::{
|
||||
SimpleArithmetic,
|
||||
SimpleBitOps,
|
||||
};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
error::Error,
|
||||
@@ -123,17 +124,22 @@ pub trait SystemStore {
|
||||
fn account_nonce(
|
||||
&self,
|
||||
account_id: <Self::System as System>::AccountId,
|
||||
) -> Box<dyn Future<Item = <Self::System as System>::Index, Error = Error> + Send>;
|
||||
) -> Pin<
|
||||
Box<dyn Future<Output = Result<<Self::System as System>::Index, Error>> + Send>,
|
||||
>;
|
||||
}
|
||||
|
||||
impl<T: System + Balances + 'static, S: 'static> SystemStore for Client<T, S> {
|
||||
impl<T: System + Balances + Sync + Send + 'static, S: 'static> SystemStore
|
||||
for Client<T, S>
|
||||
{
|
||||
type System = T;
|
||||
|
||||
fn account_nonce(
|
||||
&self,
|
||||
account_id: <Self::System as System>::AccountId,
|
||||
) -> Box<dyn Future<Item = <Self::System as System>::Index, Error = Error> + Send>
|
||||
{
|
||||
) -> Pin<
|
||||
Box<dyn Future<Output = Result<<Self::System as System>::Index, Error>> + Send>,
|
||||
> {
|
||||
let account_nonce_map = || {
|
||||
Ok(self
|
||||
.metadata
|
||||
@@ -143,9 +149,14 @@ impl<T: System + Balances + 'static, S: 'static> SystemStore for Client<T, S> {
|
||||
};
|
||||
let map = match account_nonce_map() {
|
||||
Ok(map) => map,
|
||||
Err(err) => return Box::new(future::err(err)),
|
||||
Err(err) => return Box::pin(future::err(err)),
|
||||
};
|
||||
Box::new(self.fetch_or(map.key(account_id), None, map.default()))
|
||||
let client = self.clone();
|
||||
Box::pin(async move {
|
||||
client
|
||||
.fetch_or(map.key(account_id), None, map.default())
|
||||
.await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+187
-158
@@ -17,8 +17,27 @@
|
||||
//! A library to **sub**mit e**xt**rinsics to a
|
||||
//! [substrate](https://github.com/paritytech/substrate) node via RPC.
|
||||
|
||||
#![deny(missing_docs)]
|
||||
#![deny(warnings)]
|
||||
#![deny(
|
||||
bad_style,
|
||||
const_err,
|
||||
improper_ctypes,
|
||||
missing_docs,
|
||||
non_shorthand_field_patterns,
|
||||
no_mangle_generic_items,
|
||||
overflowing_literals,
|
||||
path_statements,
|
||||
patterns_in_fns_without_body,
|
||||
plugin_as_library,
|
||||
private_in_public,
|
||||
unconditional_recursion,
|
||||
unused_allocation,
|
||||
unused_comparisons,
|
||||
unused_parens,
|
||||
while_true,
|
||||
trivial_casts,
|
||||
trivial_numeric_casts,
|
||||
unused_extern_crates
|
||||
)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
use std::{
|
||||
@@ -31,13 +50,8 @@ use codec::{
|
||||
Decode,
|
||||
Encode,
|
||||
};
|
||||
use futures::future::{
|
||||
self,
|
||||
Either,
|
||||
Future,
|
||||
IntoFuture,
|
||||
};
|
||||
use jsonrpc_core_client::transports::ws;
|
||||
use futures::future;
|
||||
use jsonrpsee::client::Subscription;
|
||||
use sp_core::{
|
||||
storage::{
|
||||
StorageChangeSet,
|
||||
@@ -54,7 +68,6 @@ use sp_runtime::{
|
||||
MultiSignature,
|
||||
};
|
||||
use sp_version::RuntimeVersion;
|
||||
use url::Url;
|
||||
|
||||
mod error;
|
||||
mod events;
|
||||
@@ -93,20 +106,15 @@ use self::{
|
||||
rpc::{
|
||||
BlockNumber,
|
||||
ChainBlock,
|
||||
MapStream,
|
||||
Rpc,
|
||||
},
|
||||
};
|
||||
|
||||
fn connect<T: System>(url: &Url) -> impl Future<Item = Rpc<T>, Error = Error> {
|
||||
ws::connect(url).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// ClientBuilder for constructing a Client.
|
||||
#[derive(Default)]
|
||||
pub struct ClientBuilder<T: System, S = MultiSignature> {
|
||||
_marker: std::marker::PhantomData<(T, S)>,
|
||||
url: Option<Url>,
|
||||
url: Option<String>,
|
||||
}
|
||||
|
||||
impl<T: System, S> ClientBuilder<T, S> {
|
||||
@@ -119,35 +127,35 @@ impl<T: System, S> ClientBuilder<T, S> {
|
||||
}
|
||||
|
||||
/// Set the substrate rpc address.
|
||||
pub fn set_url(mut self, url: Url) -> Self {
|
||||
self.url = Some(url);
|
||||
pub fn set_url(mut self, url: &str) -> Self {
|
||||
self.url = Some(url.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates a new Client.
|
||||
pub fn build(self) -> impl Future<Item = Client<T, S>, Error = Error> {
|
||||
let url = self.url.unwrap_or_else(|| {
|
||||
Url::parse("ws://127.0.0.1:9944").expect("Is valid url; qed")
|
||||
});
|
||||
connect::<T>(&url).and_then(|rpc| {
|
||||
rpc.metadata()
|
||||
.join3(rpc.genesis_hash(), rpc.runtime_version(None))
|
||||
.map(|(metadata, genesis_hash, runtime_version)| {
|
||||
Client {
|
||||
url,
|
||||
genesis_hash,
|
||||
metadata,
|
||||
runtime_version,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
})
|
||||
pub async fn build(self) -> Result<Client<T, S>, Error> {
|
||||
let url = self.url.unwrap_or("ws://127.0.0.1:9944".to_string());
|
||||
let rpc = Rpc::connect_ws(&url).await?;
|
||||
|
||||
let (metadata, genesis_hash, runtime_version) = future::join3(
|
||||
rpc.metadata(),
|
||||
rpc.genesis_hash(),
|
||||
rpc.runtime_version(None),
|
||||
)
|
||||
.await;
|
||||
Ok(Client {
|
||||
rpc,
|
||||
genesis_hash: genesis_hash?,
|
||||
metadata: metadata?,
|
||||
runtime_version: runtime_version?,
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Client to interface with a substrate node.
|
||||
pub struct Client<T: System, S = MultiSignature> {
|
||||
url: Url,
|
||||
rpc: Rpc<T>,
|
||||
genesis_hash: T::Hash,
|
||||
metadata: Metadata,
|
||||
runtime_version: RuntimeVersion,
|
||||
@@ -157,7 +165,7 @@ pub struct Client<T: System, S = MultiSignature> {
|
||||
impl<T: System, S> Clone for Client<T, S> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
url: self.url.clone(),
|
||||
rpc: self.rpc.clone(),
|
||||
genesis_hash: self.genesis_hash,
|
||||
metadata: self.metadata.clone(),
|
||||
runtime_version: self.runtime_version.clone(),
|
||||
@@ -166,120 +174,136 @@ impl<T: System, S> Clone for Client<T, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: System + Balances + 'static, S: 'static> Client<T, S> {
|
||||
fn connect(&self) -> impl Future<Item = Rpc<T>, Error = Error> {
|
||||
connect(&self.url)
|
||||
}
|
||||
|
||||
impl<T: System + Balances + Sync + Send + 'static, S: 'static> Client<T, S> {
|
||||
/// Returns the chain metadata.
|
||||
pub fn metadata(&self) -> &Metadata {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
/// Fetch a StorageKey.
|
||||
pub fn fetch<V: Decode>(
|
||||
pub async fn fetch<V: Decode>(
|
||||
&self,
|
||||
key: StorageKey,
|
||||
hash: Option<T::Hash>,
|
||||
) -> impl Future<Item = Option<V>, Error = Error> {
|
||||
self.connect().and_then(move |rpc| rpc.storage::<V>(key, hash))
|
||||
) -> Result<Option<V>, Error> {
|
||||
self.rpc.storage::<V>(key, hash).await
|
||||
}
|
||||
|
||||
/// Fetch a StorageKey or return the default.
|
||||
pub fn fetch_or<V: Decode>(
|
||||
pub async fn fetch_or<V: Decode>(
|
||||
&self,
|
||||
key: StorageKey,
|
||||
hash: Option<T::Hash>,
|
||||
default: V,
|
||||
) -> impl Future<Item = V, Error = Error> {
|
||||
self.fetch(key, hash).map(|value| value.unwrap_or(default))
|
||||
) -> Result<V, Error> {
|
||||
let result = self.fetch(key, hash).await?;
|
||||
Ok(result.unwrap_or(default))
|
||||
}
|
||||
|
||||
/// Fetch a StorageKey or return the default.
|
||||
pub fn fetch_or_default<V: Decode + Default>(
|
||||
pub async fn fetch_or_default<V: Decode + Default>(
|
||||
&self,
|
||||
key: StorageKey,
|
||||
hash: Option<T::Hash>,
|
||||
) -> impl Future<Item = V, Error = Error> {
|
||||
self.fetch(key, hash).map(|value| value.unwrap_or_default())
|
||||
) -> Result<V, Error> {
|
||||
let result = self.fetch(key, hash).await?;
|
||||
Ok(result.unwrap_or_default())
|
||||
}
|
||||
|
||||
/// Get a block hash. By default returns the latest block hash
|
||||
pub fn block_hash(
|
||||
pub async fn block_hash(
|
||||
&self,
|
||||
height: Option<BlockNumber<T>>,
|
||||
) -> impl Future<Item = Option<T::Hash>, Error = Error> {
|
||||
self.connect()
|
||||
.and_then(|rpc| rpc.block_hash(height.map(|h| h)))
|
||||
block_number: Option<BlockNumber<T>>,
|
||||
) -> Result<Option<T::Hash>, Error> {
|
||||
let hash = self.rpc.block_hash(block_number).await?;
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
/// Get a block hash of the latest finalized block
|
||||
pub fn finalized_head(&self) -> impl Future<Item = T::Hash, Error = Error> {
|
||||
self.connect().and_then(|rpc| rpc.finalized_head())
|
||||
pub async fn finalized_head(&self) -> Result<T::Hash, Error> {
|
||||
let head = self.rpc.finalized_head().await?;
|
||||
Ok(head)
|
||||
}
|
||||
|
||||
/// Get a block
|
||||
pub fn block<H>(
|
||||
&self,
|
||||
hash: Option<H>,
|
||||
) -> impl Future<Item = Option<ChainBlock<T>>, Error = Error>
|
||||
pub async fn block<H>(&self, hash: Option<H>) -> Result<Option<ChainBlock<T>>, Error>
|
||||
where
|
||||
H: Into<T::Hash> + 'static,
|
||||
{
|
||||
self.connect()
|
||||
.and_then(|rpc| rpc.block(hash.map(|h| h.into())))
|
||||
let block = self.rpc.block(hash.map(|h| h.into())).await?;
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Create and submit an extrinsic and return corresponding Hash if successful
|
||||
pub async fn submit_extrinsic<E: Encode>(
|
||||
&self,
|
||||
extrinsic: E,
|
||||
) -> Result<T::Hash, Error> {
|
||||
let xt_hash = self.rpc.submit_extrinsic(extrinsic).await?;
|
||||
Ok(xt_hash)
|
||||
}
|
||||
|
||||
/// Create and submit an extrinsic and return corresponding Event if successful
|
||||
pub async fn submit_and_watch_extrinsic<E: Encode + 'static>(
|
||||
self,
|
||||
extrinsic: E,
|
||||
decoder: EventsDecoder<T>,
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
let success = self
|
||||
.rpc
|
||||
.submit_and_watch_extrinsic(extrinsic, decoder)
|
||||
.await?;
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
/// Subscribe to events.
|
||||
pub fn subscribe_events(
|
||||
pub async fn subscribe_events(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<StorageChangeSet<T::Hash>>, Error = Error> {
|
||||
self.connect().and_then(|rpc| rpc.subscribe_events())
|
||||
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
|
||||
let events = self.rpc.subscribe_events().await?;
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// Subscribe to new blocks.
|
||||
pub fn subscribe_blocks(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<T::Header>, Error = Error> {
|
||||
self.connect().and_then(|rpc| rpc.subscribe_blocks())
|
||||
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
|
||||
let headers = self.rpc.subscribe_blocks().await?;
|
||||
Ok(headers)
|
||||
}
|
||||
|
||||
/// Subscribe to finalized blocks.
|
||||
pub fn subscribe_finalized_blocks(
|
||||
pub async fn subscribe_finalized_blocks(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<T::Header>, Error = Error> {
|
||||
self.connect()
|
||||
.and_then(|rpc| rpc.subscribe_finalized_blocks())
|
||||
) -> Result<Subscription<T::Header>, Error> {
|
||||
let headers = self.rpc.subscribe_finalized_blocks().await?;
|
||||
Ok(headers)
|
||||
}
|
||||
|
||||
/// Create a transaction builder for a private key.
|
||||
pub fn xt<P>(
|
||||
pub async fn xt<P>(
|
||||
&self,
|
||||
signer: P,
|
||||
nonce: Option<T::Index>,
|
||||
) -> impl Future<Item = XtBuilder<T, P, S>, Error = Error>
|
||||
) -> Result<XtBuilder<T, P, S>, Error>
|
||||
where
|
||||
P: Pair,
|
||||
P::Signature: Codec,
|
||||
S: Verify,
|
||||
S::Signer: From<P::Public> + IdentifyAccount<AccountId = T::AccountId>,
|
||||
{
|
||||
let client = self.clone();
|
||||
let account_id = S::Signer::from(signer.public()).into_account();
|
||||
match nonce {
|
||||
Some(nonce) => Either::A(future::ok(nonce)),
|
||||
None => Either::B(self.account_nonce(account_id)),
|
||||
}
|
||||
.map(|nonce| {
|
||||
let genesis_hash = client.genesis_hash;
|
||||
let runtime_version = client.runtime_version.clone();
|
||||
XtBuilder {
|
||||
client,
|
||||
nonce,
|
||||
runtime_version,
|
||||
genesis_hash,
|
||||
signer,
|
||||
}
|
||||
let nonce = match nonce {
|
||||
Some(nonce) => nonce,
|
||||
None => self.account_nonce(account_id).await?,
|
||||
};
|
||||
|
||||
let genesis_hash = self.genesis_hash;
|
||||
let runtime_version = self.runtime_version.clone();
|
||||
Ok(XtBuilder {
|
||||
client: self.clone(),
|
||||
nonce,
|
||||
runtime_version,
|
||||
genesis_hash,
|
||||
signer,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -294,7 +318,7 @@ pub struct XtBuilder<T: System, P, S> {
|
||||
signer: P,
|
||||
}
|
||||
|
||||
impl<T: System + Balances + 'static, P, S: 'static> XtBuilder<T, P, S>
|
||||
impl<T: System + Balances + Send + Sync + 'static, P, S: 'static> XtBuilder<T, P, S>
|
||||
where
|
||||
P: Pair,
|
||||
{
|
||||
@@ -365,17 +389,10 @@ where
|
||||
}
|
||||
|
||||
/// Submits a transaction to the chain.
|
||||
pub fn submit<C: Encode>(
|
||||
&self,
|
||||
call: Call<C>,
|
||||
) -> impl Future<Item = T::Hash, Error = Error> {
|
||||
let cli = self.client.connect();
|
||||
self.create_and_sign(call)
|
||||
.into_future()
|
||||
.map_err(Into::into)
|
||||
.and_then(move |extrinsic| {
|
||||
cli.and_then(move |rpc| rpc.submit_extrinsic(extrinsic))
|
||||
})
|
||||
pub async fn submit<C: Encode>(&self, call: Call<C>) -> Result<T::Hash, Error> {
|
||||
let extrinsic = self.create_and_sign(call)?;
|
||||
let xt_hash = self.client.submit_extrinsic(extrinsic).await?;
|
||||
Ok(xt_hash)
|
||||
}
|
||||
|
||||
/// Submits transaction to the chain and watch for events.
|
||||
@@ -422,24 +439,17 @@ where
|
||||
}
|
||||
|
||||
/// Submits transaction to the chain and watch for events.
|
||||
pub fn submit<C: Encode>(
|
||||
pub async fn submit<C: Encode>(
|
||||
self,
|
||||
call: Call<C>,
|
||||
) -> impl Future<Item = ExtrinsicSuccess<T>, Error = Error> {
|
||||
let cli = self.client.connect();
|
||||
let decoder = self.decoder.into_future().map_err(Into::into);
|
||||
|
||||
self.builder
|
||||
.create_and_sign(call)
|
||||
.into_future()
|
||||
.map_err(Into::into)
|
||||
.join(decoder)
|
||||
.and_then(move |(extrinsic, decoder)| {
|
||||
decoder.check_missing_type_sizes();
|
||||
cli.and_then(move |rpc| {
|
||||
rpc.submit_and_watch_extrinsic(extrinsic, decoder)
|
||||
})
|
||||
})
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
let decoder = self.decoder?;
|
||||
let extrinsic = self.builder.create_and_sign(call)?;
|
||||
let xt_success = self
|
||||
.client
|
||||
.submit_and_watch_extrinsic(extrinsic, decoder)
|
||||
.await?;
|
||||
Ok(xt_success)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,7 +468,6 @@ impl codec::Encode for Encoded {
|
||||
mod tests {
|
||||
use codec::Encode;
|
||||
use frame_support::StorageMap;
|
||||
use futures::stream::Stream;
|
||||
use sp_core::storage::StorageKey;
|
||||
use sp_keyring::AccountKeyring;
|
||||
|
||||
@@ -469,6 +478,7 @@ mod tests {
|
||||
BalancesStore,
|
||||
},
|
||||
DefaultNodeRuntime as Runtime,
|
||||
Error,
|
||||
};
|
||||
|
||||
type Index = <Runtime as System>::Index;
|
||||
@@ -476,85 +486,104 @@ mod tests {
|
||||
type Address = <Runtime as System>::Address;
|
||||
type Balance = <Runtime as Balances>::Balance;
|
||||
|
||||
pub(crate) fn test_setup() -> (tokio::runtime::Runtime, Client<Runtime>) {
|
||||
env_logger::try_init().ok();
|
||||
let mut rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let client_future = ClientBuilder::<Runtime>::new().build();
|
||||
let client = rt.block_on(client_future).unwrap();
|
||||
(rt, client)
|
||||
pub(crate) async fn test_client() -> Client<Runtime> {
|
||||
ClientBuilder::<Runtime>::new()
|
||||
.build()
|
||||
.await
|
||||
.expect("Error creating client")
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_tx_transfer_balance() {
|
||||
let (mut rt, client) = test_setup();
|
||||
env_logger::try_init().ok();
|
||||
let transfer = async_std::task::block_on(async move {
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let dest = AccountKeyring::Bob.to_account_id();
|
||||
|
||||
let signer = AccountKeyring::Alice.pair();
|
||||
let mut xt = rt.block_on(client.xt(signer, None)).unwrap();
|
||||
let client = test_client().await;
|
||||
let mut xt = client.xt(signer, None).await?;
|
||||
let _ = xt
|
||||
.submit(balances::transfer::<Runtime>(dest.clone().into(), 10_000))
|
||||
.await?;
|
||||
|
||||
let dest = AccountKeyring::Bob.to_account_id();
|
||||
let transfer =
|
||||
xt.submit(balances::transfer::<Runtime>(dest.clone().into(), 10_000));
|
||||
rt.block_on(transfer).unwrap();
|
||||
// check that nonce is handled correctly
|
||||
xt.increment_nonce()
|
||||
.submit(balances::transfer::<Runtime>(dest.clone().into(), 10_000))
|
||||
.await
|
||||
});
|
||||
|
||||
// check that nonce is handled correctly
|
||||
let transfer = xt
|
||||
.increment_nonce()
|
||||
.submit(balances::transfer::<Runtime>(dest.clone().into(), 10_000));
|
||||
|
||||
rt.block_on(transfer).unwrap();
|
||||
assert!(transfer.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_getting_hash() {
|
||||
let (mut rt, client) = test_setup();
|
||||
rt.block_on(client.block_hash(None)).unwrap();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let client = test_client().await;
|
||||
let block_hash = client.block_hash(None).await?;
|
||||
Ok(block_hash)
|
||||
});
|
||||
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_getting_block() {
|
||||
let (mut rt, client) = test_setup();
|
||||
rt.block_on(client.block_hash(None).and_then(move |h| client.block(h)))
|
||||
.unwrap();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let client = test_client().await;
|
||||
let block_hash = client.block_hash(None).await?;
|
||||
let block = client.block(block_hash).await?;
|
||||
Ok(block)
|
||||
});
|
||||
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_state_read_free_balance() {
|
||||
let (mut rt, client) = test_setup();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let account = AccountKeyring::Alice.to_account_id();
|
||||
let client = test_client().await;
|
||||
let balance = client.free_balance(account.into()).await?;
|
||||
Ok(balance)
|
||||
});
|
||||
|
||||
let account = AccountKeyring::Alice.to_account_id();
|
||||
rt.block_on(client.free_balance(account.into())).unwrap();
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_chain_subscribe_blocks() {
|
||||
let (mut rt, client) = test_setup();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let client = test_client().await;
|
||||
let mut blocks = client.subscribe_blocks().await?;
|
||||
let block = blocks.next().await;
|
||||
Ok(block)
|
||||
});
|
||||
|
||||
let stream = rt.block_on(client.subscribe_blocks()).unwrap();
|
||||
let (_header, _) = rt
|
||||
.block_on(stream.into_future().map_err(|(e, _)| e))
|
||||
.unwrap();
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_chain_subscribe_finalized_blocks() {
|
||||
let (mut rt, client) = test_setup();
|
||||
let result: Result<_, Error> = async_std::task::block_on(async move {
|
||||
let client = test_client().await;
|
||||
let mut blocks = client.subscribe_finalized_blocks().await?;
|
||||
let block = blocks.next().await;
|
||||
Ok(block)
|
||||
});
|
||||
|
||||
let stream = rt.block_on(client.subscribe_finalized_blocks()).unwrap();
|
||||
let (_header, _) = rt
|
||||
.block_on(stream.into_future().map_err(|(e, _)| e))
|
||||
.unwrap();
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore] // requires locally running substrate node
|
||||
fn test_chain_read_metadata() {
|
||||
let (_, client) = test_setup();
|
||||
let client = async_std::task::block_on(test_client());
|
||||
|
||||
let balances = client.metadata().module_with_calls("Balances").unwrap();
|
||||
let dest = sp_keyring::AccountKeyring::Bob.to_account_id();
|
||||
|
||||
+20
-12
@@ -41,6 +41,8 @@ use crate::Encoded;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MetadataError {
|
||||
#[error("Error converting substrate metadata: {0}")]
|
||||
Conversion(#[from] ConversionError),
|
||||
#[error("Module not found")]
|
||||
ModuleNotFound(String),
|
||||
#[error("Module with events not found")]
|
||||
@@ -284,14 +286,14 @@ pub enum EventArg {
|
||||
}
|
||||
|
||||
impl FromStr for EventArg {
|
||||
type Err = Error;
|
||||
type Err = ConversionError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.starts_with("Vec<") {
|
||||
if s.ends_with('>') {
|
||||
Ok(EventArg::Vec(Box::new(s[4..s.len() - 1].parse()?)))
|
||||
} else {
|
||||
Err(Error::InvalidEventArg(
|
||||
Err(ConversionError::InvalidEventArg(
|
||||
s.to_string(),
|
||||
"Expected closing `>` for `Vec`",
|
||||
))
|
||||
@@ -305,7 +307,7 @@ impl FromStr for EventArg {
|
||||
}
|
||||
Ok(EventArg::Tuple(args))
|
||||
} else {
|
||||
Err(Error::InvalidEventArg(
|
||||
Err(ConversionError::InvalidEventArg(
|
||||
s.to_string(),
|
||||
"Expecting closing `)` for tuple",
|
||||
))
|
||||
@@ -333,24 +335,28 @@ impl EventArg {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConversionError {
|
||||
#[error("Invalid prefix")]
|
||||
InvalidPrefix,
|
||||
#[error("Invalid version")]
|
||||
InvalidVersion,
|
||||
#[error("Expected DecodeDifferent::Decoded")]
|
||||
ExpectedDecoded,
|
||||
#[error("Invalid event arg {0}")]
|
||||
InvalidEventArg(String, &'static str),
|
||||
}
|
||||
|
||||
impl TryFrom<RuntimeMetadataPrefixed> for Metadata {
|
||||
type Error = Error;
|
||||
type Error = MetadataError;
|
||||
|
||||
fn try_from(metadata: RuntimeMetadataPrefixed) -> Result<Self, Self::Error> {
|
||||
if metadata.0 != META_RESERVED {
|
||||
return Err(Error::InvalidPrefix)
|
||||
return Err(ConversionError::InvalidPrefix.into())
|
||||
}
|
||||
let meta = match metadata.1 {
|
||||
RuntimeMetadata::V10(meta) => meta,
|
||||
_ => return Err(Error::InvalidVersion),
|
||||
_ => return Err(ConversionError::InvalidVersion.into()),
|
||||
};
|
||||
let mut modules = HashMap::new();
|
||||
let mut modules_with_calls = HashMap::new();
|
||||
@@ -417,16 +423,18 @@ impl TryFrom<RuntimeMetadataPrefixed> for Metadata {
|
||||
}
|
||||
}
|
||||
|
||||
fn convert<B: 'static, O: 'static>(dd: DecodeDifferent<B, O>) -> Result<O, Error> {
|
||||
fn convert<B: 'static, O: 'static>(
|
||||
dd: DecodeDifferent<B, O>,
|
||||
) -> Result<O, ConversionError> {
|
||||
match dd {
|
||||
DecodeDifferent::Decoded(value) => Ok(value),
|
||||
_ => Err(Error::ExpectedDecoded),
|
||||
_ => Err(ConversionError::ExpectedDecoded),
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_event(
|
||||
event: frame_metadata::EventMetadata,
|
||||
) -> Result<ModuleEventMetadata, Error> {
|
||||
) -> Result<ModuleEventMetadata, ConversionError> {
|
||||
let name = convert(event.name)?;
|
||||
let mut arguments = Vec::new();
|
||||
for arg in convert(event.arguments)? {
|
||||
@@ -440,7 +448,7 @@ fn convert_entry(
|
||||
module_prefix: String,
|
||||
storage_prefix: String,
|
||||
entry: frame_metadata::StorageEntryMetadata,
|
||||
) -> Result<StorageMetadata, Error> {
|
||||
) -> Result<StorageMetadata, ConversionError> {
|
||||
let default = convert(entry.default)?;
|
||||
Ok(StorageMetadata {
|
||||
module_prefix,
|
||||
|
||||
+220
-244
@@ -21,35 +21,26 @@ use codec::{
|
||||
Encode,
|
||||
Error as CodecError,
|
||||
};
|
||||
use futures::{
|
||||
future::{
|
||||
self,
|
||||
Future,
|
||||
IntoFuture,
|
||||
},
|
||||
stream::{
|
||||
self,
|
||||
Stream,
|
||||
use jsonrpsee::{
|
||||
client::Subscription,
|
||||
core::common::{
|
||||
to_value as to_json_value,
|
||||
Params,
|
||||
},
|
||||
Client,
|
||||
};
|
||||
use jsonrpc_core_client::{
|
||||
RpcChannel,
|
||||
TypedSubscriptionStream,
|
||||
};
|
||||
|
||||
use num_traits::bounds::Bounded;
|
||||
|
||||
use frame_metadata::RuntimeMetadataPrefixed;
|
||||
use sc_rpc_api::{
|
||||
author::AuthorClient,
|
||||
chain::ChainClient,
|
||||
state::StateClient,
|
||||
};
|
||||
use sp_core::{
|
||||
storage::{
|
||||
StorageChangeSet,
|
||||
StorageData,
|
||||
StorageKey,
|
||||
},
|
||||
twox_128,
|
||||
Bytes,
|
||||
};
|
||||
use sp_rpc::{
|
||||
list::ListOrValue,
|
||||
@@ -64,6 +55,7 @@ use sp_runtime::{
|
||||
};
|
||||
use sp_transaction_pool::TransactionStatus;
|
||||
use sp_version::RuntimeVersion;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use crate::{
|
||||
error::Error,
|
||||
@@ -83,265 +75,251 @@ use crate::{
|
||||
metadata::Metadata,
|
||||
};
|
||||
|
||||
pub type ChainBlock<T> = SignedBlock<Block<<T as System>::Header, <T as System>::Extrinsic>>;
|
||||
pub type ChainBlock<T> =
|
||||
SignedBlock<Block<<T as System>::Header, <T as System>::Extrinsic>>;
|
||||
pub type BlockNumber<T> = NumberOrHex<<T as System>::BlockNumber>;
|
||||
|
||||
/// Client for substrate rpc interfaces
|
||||
#[derive(Clone)]
|
||||
pub struct Rpc<T: System> {
|
||||
state: StateClient<T::Hash>,
|
||||
chain: ChainClient<T::BlockNumber, T::Hash, T::Header, ChainBlock<T>>,
|
||||
author: AuthorClient<T::Hash, T::Hash>,
|
||||
client: Client,
|
||||
marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
/// Allows connecting to all inner interfaces on the same RpcChannel
|
||||
impl<T: System> From<RpcChannel> for Rpc<T> {
|
||||
fn from(channel: RpcChannel) -> Self {
|
||||
Self {
|
||||
state: channel.clone().into(),
|
||||
chain: channel.clone().into(),
|
||||
author: channel.into(),
|
||||
}
|
||||
impl<T> Rpc<T>
|
||||
where
|
||||
T: System,
|
||||
{
|
||||
pub async fn connect_ws(url: &str) -> Result<Self, Error> {
|
||||
let raw_client = jsonrpsee::ws::ws_raw_client(&url).await?;
|
||||
Ok(Rpc {
|
||||
client: raw_client.into(),
|
||||
marker: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: System> Rpc<T> {
|
||||
/// Fetch a storage key
|
||||
pub fn storage<V: Decode>(
|
||||
pub async fn storage<V: Decode>(
|
||||
&self,
|
||||
key: StorageKey,
|
||||
hash: Option<T::Hash>,
|
||||
) -> impl Future<Item = Option<V>, Error = Error> {
|
||||
self.state
|
||||
.storage(key, hash)
|
||||
.map_err(Into::into)
|
||||
.and_then(|data| {
|
||||
match data {
|
||||
Some(data) => {
|
||||
let value = Decode::decode(&mut &data.0[..])?;
|
||||
Ok(Some(value))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
})
|
||||
) -> Result<Option<V>, Error> {
|
||||
// todo: update jsonrpsee::rpc_api! macro to accept shared Client (currently only RawClient)
|
||||
// until then we manually construct params here and in other methods
|
||||
let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]);
|
||||
let data: Option<StorageData> =
|
||||
self.client.request("state_getStorage", params).await?;
|
||||
match data {
|
||||
Some(data) => {
|
||||
let value = Decode::decode(&mut &data.0[..])?;
|
||||
Ok(Some(value))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the genesis hash
|
||||
pub fn genesis_hash(&self) -> impl Future<Item = T::Hash, Error = Error> {
|
||||
let block_zero = T::BlockNumber::min_value();
|
||||
self.chain
|
||||
.block_hash(Some(ListOrValue::Value(NumberOrHex::Number(block_zero))))
|
||||
.map_err(Into::into)
|
||||
.and_then(|list_or_value| {
|
||||
future::result(match list_or_value {
|
||||
ListOrValue::Value(genesis_hash) => {
|
||||
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
|
||||
}
|
||||
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
|
||||
})
|
||||
})
|
||||
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
|
||||
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(
|
||||
T::BlockNumber::min_value(),
|
||||
)));
|
||||
let params = Params::Array(vec![to_json_value(block_zero)?]);
|
||||
let list_or_value: ListOrValue<Option<T::Hash>> =
|
||||
self.client.request("chain_getBlockHash", params).await?;
|
||||
match list_or_value {
|
||||
ListOrValue::Value(genesis_hash) => {
|
||||
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
|
||||
}
|
||||
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the metadata
|
||||
pub fn metadata(&self) -> impl Future<Item = Metadata, Error = Error> {
|
||||
self.state
|
||||
.metadata(None)
|
||||
.map_err(Into::into)
|
||||
.and_then(|bytes| {
|
||||
let result = Decode::decode(&mut &bytes[..])
|
||||
.map_err(Into::into)
|
||||
.and_then(|meta: RuntimeMetadataPrefixed| {
|
||||
meta.try_into().map_err(|err| format!("{:?}", err).into())
|
||||
});
|
||||
future::result(result)
|
||||
})
|
||||
pub async fn metadata(&self) -> Result<Metadata, Error> {
|
||||
let bytes: Bytes = self
|
||||
.client
|
||||
.request("state_getMetadata", Params::None)
|
||||
.await?;
|
||||
let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?;
|
||||
let metadata: Metadata = meta.try_into()?;
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Get a block hash, returns hash of latest block by default
|
||||
pub fn block_hash(
|
||||
pub async fn block_hash(
|
||||
&self,
|
||||
block_number: Option<BlockNumber<T>>,
|
||||
) -> impl Future<Item = Option<T::Hash>, Error = Error> {
|
||||
self.chain
|
||||
.block_hash(block_number.map(|bn| ListOrValue::Value(bn)))
|
||||
.map_err(Into::into)
|
||||
.and_then(|list_or_value| {
|
||||
match list_or_value {
|
||||
ListOrValue::Value(hash) => Ok(hash),
|
||||
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
|
||||
}
|
||||
})
|
||||
) -> Result<Option<T::Hash>, Error> {
|
||||
let block_number = block_number.map(|bn| ListOrValue::Value(bn));
|
||||
let params = Params::Array(vec![to_json_value(block_number)?]);
|
||||
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
|
||||
match list_or_value {
|
||||
ListOrValue::Value(hash) => Ok(hash),
|
||||
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a block hash of the latest finalized block
|
||||
pub fn finalized_head(&self) -> impl Future<Item = T::Hash, Error = Error> {
|
||||
self.chain.finalized_head().map_err(Into::into)
|
||||
pub async fn finalized_head(&self) -> Result<T::Hash, Error> {
|
||||
let hash = self
|
||||
.client
|
||||
.request("chain_getFinalizedHead", Params::None)
|
||||
.await?;
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
/// Get a Block
|
||||
pub fn block(
|
||||
pub async fn block(
|
||||
&self,
|
||||
hash: Option<T::Hash>,
|
||||
) -> impl Future<Item = Option<ChainBlock<T>>, Error = Error> {
|
||||
self.chain.block(hash).map_err(Into::into)
|
||||
) -> Result<Option<ChainBlock<T>>, Error> {
|
||||
let params = Params::Array(vec![to_json_value(hash)?]);
|
||||
let block = self.client.request("chain_getBlock", params).await?;
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Fetch the runtime version
|
||||
pub fn runtime_version(
|
||||
pub async fn runtime_version(
|
||||
&self,
|
||||
at: Option<T::Hash>,
|
||||
) -> impl Future<Item = RuntimeVersion, Error = Error> {
|
||||
self.state.runtime_version(at).map_err(Into::into)
|
||||
) -> Result<RuntimeVersion, Error> {
|
||||
let params = Params::Array(vec![to_json_value(at)?]);
|
||||
let version = self
|
||||
.client
|
||||
.request("state_getRuntimeVersion", params)
|
||||
.await?;
|
||||
Ok(version)
|
||||
}
|
||||
}
|
||||
|
||||
type MapClosure<T> = Box<dyn Fn(T) -> T + Send>;
|
||||
pub type MapStream<T> = stream::Map<TypedSubscriptionStream<T>, MapClosure<T>>;
|
||||
|
||||
impl<T: System + Balances + 'static> Rpc<T> {
|
||||
/// Subscribe to substrate System Events
|
||||
pub fn subscribe_events(
|
||||
pub async fn subscribe_events(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<StorageChangeSet<<T as System>::Hash>>, Error = Error>
|
||||
{
|
||||
) -> Result<Subscription<StorageChangeSet<<T as System>::Hash>>, Error> {
|
||||
let mut storage_key = twox_128(b"System").to_vec();
|
||||
storage_key.extend(twox_128(b"Events").to_vec());
|
||||
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
|
||||
|
||||
let closure: MapClosure<StorageChangeSet<<T as System>::Hash>> =
|
||||
Box::new(|event| {
|
||||
log::debug!(
|
||||
"Event {:?}",
|
||||
event
|
||||
.changes
|
||||
.iter()
|
||||
.map(|(k, v)| {
|
||||
(hex::encode(&k.0), v.as_ref().map(|v| hex::encode(&v.0)))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
event
|
||||
});
|
||||
self.state
|
||||
.subscribe_storage(Some(vec![StorageKey(storage_key)]))
|
||||
.map(|stream: TypedSubscriptionStream<_>| stream.map(closure))
|
||||
.map_err(Into::into)
|
||||
let keys = Some(vec![StorageKey(storage_key)]);
|
||||
let params = Params::Array(vec![to_json_value(keys)?]);
|
||||
|
||||
let subscription = self
|
||||
.client
|
||||
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
|
||||
.await?;
|
||||
Ok(subscription)
|
||||
}
|
||||
|
||||
/// Subscribe to blocks.
|
||||
pub fn subscribe_blocks(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<T::Header>, Error = Error> {
|
||||
let closure: MapClosure<T::Header> = Box::new(|event| {
|
||||
log::info!("New block {:?}", event);
|
||||
event
|
||||
});
|
||||
self.chain
|
||||
.subscribe_new_heads()
|
||||
.map(|stream| stream.map(closure))
|
||||
.map_err(Into::into)
|
||||
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
|
||||
let subscription = self
|
||||
.client
|
||||
.subscribe(
|
||||
"chain_subscribeNewHeads",
|
||||
Params::None,
|
||||
"chain_subscribeNewHeads",
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(subscription)
|
||||
}
|
||||
|
||||
/// Subscribe to finalized blocks.
|
||||
pub fn subscribe_finalized_blocks(
|
||||
pub async fn subscribe_finalized_blocks(
|
||||
&self,
|
||||
) -> impl Future<Item = MapStream<T::Header>, Error = Error> {
|
||||
let closure: MapClosure<T::Header> = Box::new(|event| {
|
||||
log::info!("Finalized block {:?}", event);
|
||||
event
|
||||
});
|
||||
self.chain
|
||||
.subscribe_finalized_heads()
|
||||
.map(|stream| stream.map(closure))
|
||||
.map_err(Into::into)
|
||||
) -> Result<Subscription<T::Header>, Error> {
|
||||
let subscription = self
|
||||
.client
|
||||
.subscribe(
|
||||
"chain_subscribeFinalizedHeads",
|
||||
Params::None,
|
||||
"chain_subscribeFinalizedHeads",
|
||||
)
|
||||
.await?;
|
||||
Ok(subscription)
|
||||
}
|
||||
|
||||
/// Create and submit an extrinsic and return corresponding Hash if successful
|
||||
pub fn submit_extrinsic<E>(
|
||||
self,
|
||||
pub async fn submit_extrinsic<E: Encode>(
|
||||
&self,
|
||||
extrinsic: E,
|
||||
) -> impl Future<Item = T::Hash, Error = Error>
|
||||
where
|
||||
E: Encode,
|
||||
{
|
||||
self.author
|
||||
.submit_extrinsic(extrinsic.encode().into())
|
||||
.map_err(Into::into)
|
||||
) -> Result<T::Hash, Error> {
|
||||
let bytes: Bytes = extrinsic.encode().into();
|
||||
let params = Params::Array(vec![to_json_value(bytes)?]);
|
||||
let xt_hash = self
|
||||
.client
|
||||
.request("author_submitExtrinsic", params)
|
||||
.await?;
|
||||
Ok(xt_hash)
|
||||
}
|
||||
|
||||
pub async fn watch_extrinsic<E: Encode>(
|
||||
&self,
|
||||
extrinsic: E,
|
||||
) -> Result<Subscription<TransactionStatus<T::Hash, T::Hash>>, Error> {
|
||||
let bytes: Bytes = extrinsic.encode().into();
|
||||
let params = Params::Array(vec![to_json_value(bytes)?]);
|
||||
let subscription = self
|
||||
.client
|
||||
.subscribe(
|
||||
"author_submitAndWatchExtrinsic",
|
||||
params,
|
||||
"author_unwatchExtrinsic",
|
||||
)
|
||||
.await?;
|
||||
Ok(subscription)
|
||||
}
|
||||
|
||||
/// Create and submit an extrinsic and return corresponding Event if successful
|
||||
pub fn submit_and_watch_extrinsic<E: 'static>(
|
||||
pub async fn submit_and_watch_extrinsic<E: Encode + 'static>(
|
||||
self,
|
||||
extrinsic: E,
|
||||
decoder: EventsDecoder<T>,
|
||||
) -> impl Future<Item = ExtrinsicSuccess<T>, Error = Error>
|
||||
where
|
||||
E: Encode,
|
||||
{
|
||||
let events = self.subscribe_events().map_err(Into::into);
|
||||
events.and_then(move |events| {
|
||||
let ext_hash = T::Hashing::hash_of(&extrinsic);
|
||||
log::info!("Submitting Extrinsic `{:?}`", ext_hash);
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
let ext_hash = T::Hashing::hash_of(&extrinsic);
|
||||
log::info!("Submitting Extrinsic `{:?}`", ext_hash);
|
||||
|
||||
let chain = self.chain.clone();
|
||||
self.author
|
||||
.watch_extrinsic(extrinsic.encode().into())
|
||||
.map_err(Into::into)
|
||||
.and_then(|stream| {
|
||||
stream
|
||||
.filter_map(|status| {
|
||||
log::info!("received status {:?}", status);
|
||||
match status {
|
||||
// ignore in progress extrinsic for now
|
||||
TransactionStatus::Future
|
||||
| TransactionStatus::Ready
|
||||
| TransactionStatus::Broadcast(_) => None,
|
||||
TransactionStatus::InBlock(block_hash) => {
|
||||
Some(Ok(block_hash))
|
||||
}
|
||||
TransactionStatus::Usurped(_) => {
|
||||
Some(Err("Extrinsic Usurped".into()))
|
||||
}
|
||||
TransactionStatus::Dropped => {
|
||||
Some(Err("Extrinsic Dropped".into()))
|
||||
}
|
||||
TransactionStatus::Invalid => {
|
||||
Some(Err("Extrinsic Invalid".into()))
|
||||
}
|
||||
}
|
||||
})
|
||||
.into_future()
|
||||
.map_err(|(e, _)| e.into())
|
||||
.and_then(|(result, _)| {
|
||||
log::info!("received result {:?}", result);
|
||||
let events_sub = self.subscribe_events().await?;
|
||||
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;
|
||||
|
||||
result
|
||||
.ok_or_else(|| Error::from("Stream terminated"))
|
||||
.and_then(|r| r)
|
||||
.into_future()
|
||||
})
|
||||
})
|
||||
.and_then(move |bh| {
|
||||
log::info!("Fetching block {:?}", bh);
|
||||
chain
|
||||
.block(Some(bh))
|
||||
.map(move |b| (bh, b))
|
||||
.map_err(Into::into)
|
||||
})
|
||||
.and_then(|(h, b)| {
|
||||
b.ok_or_else(|| format!("Failed to find block {:?}", h).into())
|
||||
.map(|b| (h, b))
|
||||
.into_future()
|
||||
})
|
||||
.and_then(move |(bh, sb)| {
|
||||
log::info!(
|
||||
"Found block {:?}, with {} extrinsics",
|
||||
bh,
|
||||
sb.block.extrinsics.len()
|
||||
);
|
||||
|
||||
wait_for_block_events(decoder, ext_hash, sb, bh, events)
|
||||
})
|
||||
})
|
||||
while let status = xt_sub.next().await {
|
||||
log::info!("received status {:?}", status);
|
||||
match status {
|
||||
// ignore in progress extrinsic for now
|
||||
TransactionStatus::Future
|
||||
| TransactionStatus::Ready
|
||||
| TransactionStatus::Broadcast(_) => continue,
|
||||
TransactionStatus::InBlock(block_hash) => {
|
||||
log::info!("Fetching block {:?}", block_hash);
|
||||
let block = self.block(Some(block_hash)).await?;
|
||||
return match block {
|
||||
Some(signed_block) => {
|
||||
log::info!(
|
||||
"Found block {:?}, with {} extrinsics",
|
||||
block_hash,
|
||||
signed_block.block.extrinsics.len()
|
||||
);
|
||||
wait_for_block_events(
|
||||
decoder,
|
||||
ext_hash,
|
||||
signed_block,
|
||||
block_hash,
|
||||
events_sub,
|
||||
)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
Err(format!("Failed to find block {:?}", block_hash).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
|
||||
TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()),
|
||||
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
|
||||
}
|
||||
}
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -399,13 +377,13 @@ impl<T: System> ExtrinsicSuccess<T> {
|
||||
}
|
||||
|
||||
/// Waits for events for the block triggered by the extrinsic
|
||||
pub fn wait_for_block_events<T: System + Balances + 'static>(
|
||||
pub async fn wait_for_block_events<T: System + Balances + 'static>(
|
||||
decoder: EventsDecoder<T>,
|
||||
ext_hash: T::Hash,
|
||||
signed_block: ChainBlock<T>,
|
||||
block_hash: T::Hash,
|
||||
events_stream: MapStream<StorageChangeSet<T::Hash>>,
|
||||
) -> impl Future<Item = ExtrinsicSuccess<T>, Error = Error> {
|
||||
events_subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
let ext_index = signed_block
|
||||
.block
|
||||
.extrinsics
|
||||
@@ -415,43 +393,41 @@ pub fn wait_for_block_events<T: System + Balances + 'static>(
|
||||
hash == ext_hash
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
format!("Failed to find Extrinsic with hash {:?}", ext_hash).into()
|
||||
})
|
||||
.into_future();
|
||||
Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash))
|
||||
})?;
|
||||
|
||||
events_stream
|
||||
.filter(move |event| event.block == block_hash)
|
||||
.into_future()
|
||||
.map_err(|(e, _)| e.into())
|
||||
.join(ext_index)
|
||||
.and_then(move |((change_set, _), ext_index)| {
|
||||
let events = match change_set {
|
||||
None => Vec::new(),
|
||||
Some(change_set) => {
|
||||
let mut events = Vec::new();
|
||||
for (_key, data) in change_set.changes {
|
||||
if let Some(data) = data {
|
||||
match decoder.decode_events(&mut &data.0[..]) {
|
||||
Ok(raw_events) => {
|
||||
for (phase, event) in raw_events {
|
||||
if let Phase::ApplyExtrinsic(i) = phase {
|
||||
if i as usize == ext_index {
|
||||
events.push(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut subscription = events_subscription;
|
||||
while let change_set = subscription.next().await {
|
||||
// only interested in events for the given block
|
||||
if change_set.block != block_hash {
|
||||
continue
|
||||
}
|
||||
let mut events = Vec::new();
|
||||
for (_key, data) in change_set.changes {
|
||||
if let Some(data) = data {
|
||||
match decoder.decode_events(&mut &data.0[..]) {
|
||||
Ok(raw_events) => {
|
||||
for (phase, event) in raw_events {
|
||||
if let Phase::ApplyExtrinsic(i) = phase {
|
||||
if i as usize == ext_index {
|
||||
events.push(event)
|
||||
}
|
||||
Err(err) => return future::err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
events
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
};
|
||||
future::ok(ExtrinsicSuccess {
|
||||
}
|
||||
}
|
||||
return if events.len() > 0 {
|
||||
Ok(ExtrinsicSuccess {
|
||||
block: block_hash,
|
||||
extrinsic: ext_hash,
|
||||
events,
|
||||
})
|
||||
})
|
||||
} else {
|
||||
Err(format!("No events found for block {}", block_hash).into())
|
||||
}
|
||||
}
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user