// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of substrate-subxt. // // subxt is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // subxt is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . use std::convert::TryInto; use codec::{ Decode, Encode, Error as CodecError, }; use jsonrpsee::{ client::Subscription, core::common::{ to_value as to_json_value, Params, }, Client, }; use num_traits::bounds::Bounded; use frame_metadata::RuntimeMetadataPrefixed; use sp_core::{ storage::{ StorageChangeSet, StorageData, StorageKey, }, twox_128, Bytes, }; use sp_rpc::{ list::ListOrValue, number::NumberOrHex, }; use sp_runtime::{ generic::{ Block, SignedBlock, }, traits::Hash, }; use sp_transaction_pool::TransactionStatus; use sp_version::RuntimeVersion; use std::marker::PhantomData; use crate::{ error::Error, events::{ EventsDecoder, RawEvent, RuntimeEvent, }, frame::{ balances::Balances, system::{ Phase, System, SystemEvent, }, }, metadata::Metadata, }; pub type ChainBlock = SignedBlock::Header, ::Extrinsic>>; pub type BlockNumber = NumberOrHex<::BlockNumber>; /// Client for substrate rpc interfaces #[derive(Clone)] pub struct Rpc { client: Client, marker: std::marker::PhantomData, } impl Rpc where T: System, { pub async fn connect_ws(url: &str) -> Result { let raw_client = jsonrpsee::ws::ws_raw_client(&url).await?; Ok(Rpc { client: raw_client.into(), marker: PhantomData, }) } /// Fetch a storage key pub async fn storage( &self, key: StorageKey, hash: Option, ) -> Result, Error> { // todo: update jsonrpsee::rpc_api! macro to accept shared Client (currently only RawClient) // until then we manually construct params here and in other methods let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]); let data: Option = self.client.request("state_getStorage", params).await?; match data { Some(data) => { let value = Decode::decode(&mut &data.0[..])?; Ok(Some(value)) } None => Ok(None), } } /// Query historical storage entries pub async fn query_storage( &self, keys: Vec, from: T::Hash, to: Option, ) -> Result::Hash>>, Error> { let params = Params::Array(vec![to_json_value(keys)?, to_json_value(from)?, to_json_value(to)?]); self.client.request("state_queryStorage", params).await.map_err(Into::into) } /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { let block_zero = Some(ListOrValue::Value(NumberOrHex::Number( T::BlockNumber::min_value(), ))); let params = Params::Array(vec![to_json_value(block_zero)?]); let list_or_value: ListOrValue> = self.client.request("chain_getBlockHash", params).await?; match list_or_value { ListOrValue::Value(genesis_hash) => { genesis_hash.ok_or_else(|| "Genesis hash not found".into()) } ListOrValue::List(_) => Err("Expected a Value, got a List".into()), } } /// Fetch the metadata pub async fn metadata(&self) -> Result { let bytes: Bytes = self .client .request("state_getMetadata", Params::None) .await?; let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?; let metadata: Metadata = meta.try_into()?; Ok(metadata) } /// Get a header pub async fn header( &self, hash: Option, ) -> Result, Error> { let params = Params::Array(vec![to_json_value(hash)?]); let header = self.client.request("chain_getHeader", params).await?; Ok(header) } /// Get a block hash, returns hash of latest block by default pub async fn block_hash( &self, block_number: Option>, ) -> Result, Error> { let block_number = block_number.map(|bn| ListOrValue::Value(bn)); let params = Params::Array(vec![to_json_value(block_number)?]); let list_or_value = self.client.request("chain_getBlockHash", params).await?; match list_or_value { ListOrValue::Value(hash) => Ok(hash), ListOrValue::List(_) => Err("Expected a Value, got a List".into()), } } /// Get a block hash of the latest finalized block pub async fn finalized_head(&self) -> Result { let hash = self .client .request("chain_getFinalizedHead", Params::None) .await?; Ok(hash) } /// Get a Block pub async fn block( &self, hash: Option, ) -> Result>, Error> { let params = Params::Array(vec![to_json_value(hash)?]); let block = self.client.request("chain_getBlock", params).await?; Ok(block) } /// Fetch the runtime version pub async fn runtime_version( &self, at: Option, ) -> Result { let params = Params::Array(vec![to_json_value(at)?]); let version = self .client .request("state_getRuntimeVersion", params) .await?; Ok(version) } } impl Rpc { /// Subscribe to substrate System Events pub async fn subscribe_events( &self, ) -> Result::Hash>>, Error> { let mut storage_key = twox_128(b"System").to_vec(); storage_key.extend(twox_128(b"Events").to_vec()); log::debug!("Events storage key {:?}", hex::encode(&storage_key)); let keys = Some(vec![StorageKey(storage_key)]); let params = Params::Array(vec![to_json_value(keys)?]); let subscription = self .client .subscribe("state_subscribeStorage", params, "state_unsubscribeStorage") .await?; Ok(subscription) } /// Subscribe to blocks. pub async fn subscribe_blocks(&self) -> Result, Error> { let subscription = self .client .subscribe( "chain_subscribeNewHeads", Params::None, "chain_subscribeNewHeads", ) .await?; Ok(subscription) } /// Subscribe to finalized blocks. pub async fn subscribe_finalized_blocks( &self, ) -> Result, Error> { let subscription = self .client .subscribe( "chain_subscribeFinalizedHeads", Params::None, "chain_subscribeFinalizedHeads", ) .await?; Ok(subscription) } /// Create and submit an extrinsic and return corresponding Hash if successful pub async fn submit_extrinsic( &self, extrinsic: E, ) -> Result { let bytes: Bytes = extrinsic.encode().into(); let params = Params::Array(vec![to_json_value(bytes)?]); let xt_hash = self .client .request("author_submitExtrinsic", params) .await?; Ok(xt_hash) } pub async fn watch_extrinsic( &self, extrinsic: E, ) -> Result>, Error> { let bytes: Bytes = extrinsic.encode().into(); let params = Params::Array(vec![to_json_value(bytes)?]); let subscription = self .client .subscribe( "author_submitAndWatchExtrinsic", params, "author_unwatchExtrinsic", ) .await?; Ok(subscription) } /// Create and submit an extrinsic and return corresponding Event if successful pub async fn submit_and_watch_extrinsic( self, extrinsic: E, decoder: EventsDecoder, ) -> Result, Error> { let ext_hash = T::Hashing::hash_of(&extrinsic); log::info!("Submitting Extrinsic `{:?}`", ext_hash); let events_sub = self.subscribe_events().await?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; while let status = xt_sub.next().await { log::info!("received status {:?}", status); match status { // ignore in progress extrinsic for now TransactionStatus::Future | TransactionStatus::Ready | TransactionStatus::Broadcast(_) => continue, TransactionStatus::InBlock(block_hash) => { log::info!("Fetching block {:?}", block_hash); let block = self.block(Some(block_hash)).await?; return match block { Some(signed_block) => { log::info!( "Found block {:?}, with {} extrinsics", block_hash, signed_block.block.extrinsics.len() ); wait_for_block_events( decoder, ext_hash, signed_block, block_hash, events_sub, ) .await } None => { Err(format!("Failed to find block {:?}", block_hash).into()) } } } TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()), TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()), TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()), } } unreachable!() } } /// Captures data for when an extrinsic is successfully included in a block #[derive(Debug)] pub struct ExtrinsicSuccess { /// Block hash. pub block: T::Hash, /// Extrinsic hash. pub extrinsic: T::Hash, /// Raw runtime events, can be decoded by the caller. pub events: Vec, } impl ExtrinsicSuccess { /// Find the Event for the given module/variant, with raw encoded event data. /// Returns `None` if the Event is not found. pub fn find_event_raw(&self, module: &str, variant: &str) -> Option<&RawEvent> { self.events.iter().find_map(|evt| { match evt { RuntimeEvent::Raw(ref raw) if raw.module == module && raw.variant == variant => { Some(raw) } _ => None, } }) } /// Returns all System Events pub fn system_events(&self) -> Vec<&SystemEvent> { self.events .iter() .filter_map(|evt| { match evt { RuntimeEvent::System(evt) => Some(evt), _ => None, } }) .collect() } /// Find the Event for the given module/variant, attempting to decode the event data. /// Returns `None` if the Event is not found. /// Returns `Err` if the data fails to decode into the supplied type pub fn find_event( &self, module: &str, variant: &str, ) -> Option> { self.find_event_raw(module, variant) .map(|evt| E::decode(&mut &evt.data[..])) } } /// Waits for events for the block triggered by the extrinsic pub async fn wait_for_block_events( decoder: EventsDecoder, ext_hash: T::Hash, signed_block: ChainBlock, block_hash: T::Hash, events_subscription: Subscription>, ) -> Result, Error> { let ext_index = signed_block .block .extrinsics .iter() .position(|ext| { let hash = T::Hashing::hash_of(ext); hash == ext_hash }) .ok_or_else(|| { Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash)) })?; let mut subscription = events_subscription; while let change_set = subscription.next().await { // only interested in events for the given block if change_set.block != block_hash { continue } let mut events = Vec::new(); for (_key, data) in change_set.changes { if let Some(data) = data { match decoder.decode_events(&mut &data.0[..]) { Ok(raw_events) => { for (phase, event) in raw_events { if let Phase::ApplyExtrinsic(i) = phase { if i as usize == ext_index { events.push(event) } } } } Err(err) => return Err(err.into()), } } } return if events.len() > 0 { Ok(ExtrinsicSuccess { block: block_hash, extrinsic: ext_hash, events, }) } else { Err(format!("No events found for block {}", block_hash).into()) } } unreachable!() }