diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 60238f2d43..2a0e0680e2 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -19,3 +19,4 @@ futures = "0.3.13" codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } hex = "0.4.3" tracing-subscriber = "0.3.11" +array-bytes = "4.1" diff --git a/examples/examples/chainhead_subscription.rs b/examples/examples/chainhead_subscription.rs index ded14edf68..3471495432 100644 --- a/examples/examples/chainhead_subscription.rs +++ b/examples/examples/chainhead_subscription.rs @@ -28,7 +28,7 @@ async fn main() -> Result<(), Box> { // Create a client to use: let api = OnlineClient::::new().await?; - let genesis = api.rpc().get_chainhead_genesis_hash().await?; + let genesis = api.rpc().chainhead_genesis_hash().await?; println!("Genesis: {:?}", genesis); let mut follow_sub = api.blocks().subscribe_chainhead_finalized(true).await?; @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { let call_params = AccountKeyring::Alice.to_account_id().encode(); let call = block - .call("AccountNonceApi_account_nonce".into(), &call_params) + .call("AccountNonceApi_account_nonce".into(), Some(&call_params)) .await?; println!("[hash={:?}] call={:?}", block.hash(), call); } diff --git a/subxt/src/blocks/block_types.rs b/subxt/src/blocks/block_types.rs index 42bae3c196..dbfe055f7a 100644 --- a/subxt/src/blocks/block_types.rs +++ b/subxt/src/blocks/block_types.rs @@ -27,7 +27,6 @@ use crate::{ }, Config, }; -use codec::Decode; use derivative::Derivative; use futures::lock::Mutex as AsyncMutex; use sp_runtime::traits::{ @@ -72,52 +71,18 @@ where { /// Fetch the body (vector of extrinsics) of this block. pub async fn body(&self) -> Result>, Error> { - let mut sub = self - .client + self.client .rpc() - .subscribe_chainhead_body(self.subscription_id.clone(), self.hash) - .await?; - - if let Some(event) = sub.next().await { - let event = event?; - - return match event { - ChainHeadEvent::Done(ChainHeadResult { result }) => { - let bytes = hex::decode(result.trim_start_matches("0x")) - .map_err(|err| Error::Other(err.to_string()))?; - - let extrinsics: Vec> = Decode::decode(&mut &bytes[..])?; - Ok(extrinsics) - } - _ => Err(Error::Other("Failed to fetch the block body".into())), - } - } - - Err(Error::Other("Failed to fetch the block body".into())) + .fetch_chainhead_body(self.subscription_id.clone(), self.hash) + .await } /// Fetch the header of this block. pub async fn header(&self) -> Result { - let header = self - .client + self.client .rpc() - .get_chainhead_header(self.subscription_id.clone(), self.hash) - .await?; - - let header = match header { - Some(header) => header, - None => { - return Err(Error::Other( - "Chain does not contain the header of this block".into(), - )) - } - }; - - let bytes = hex::decode(header.trim_start_matches("0x")) - .map_err(|err| Error::Other(err.to_string()))?; - - let header: T::Header = Decode::decode(&mut &bytes[..])?; - Ok(header) + .fetch_chainhead_header(self.subscription_id.clone(), self.hash) + .await } /// Fetch the header of this block. @@ -132,10 +97,10 @@ where let metadata = self.client.metadata(); let key_bytes = utils::storage_address_bytes(key, &metadata)?; - let mut sub = self + let storage_bytes = self .client .rpc() - .subscribe_chainhead_storage( + .fetch_chainhead_storage( self.subscription_id.clone(), self.hash, &key_bytes, @@ -143,40 +108,28 @@ where ) .await?; - if let Some(event) = sub.next().await { - let event = event?; - - return match event { - ChainHeadEvent::Done(ChainHeadResult { result }) => { - let result = match result { - Some(result) => result, - None => return Ok(None), - }; - - let bytes = hex::decode(result.trim_start_matches("0x")) - .map_err(|err| Error::Other(err.to_string()))?; - - let storage = ::decode_storage_with_metadata( - &mut &*bytes, - key.pallet_name(), - key.entry_name(), - &metadata, - )?; - Ok(Some(storage)) - } - _ => Err(Error::Other("Failed to fetch the block body".into())), - } + if let Some(bytes) = storage_bytes { + let storage = + ::decode_storage_with_metadata( + &mut &*bytes, + key.pallet_name(), + key.entry_name(), + &metadata, + )?; + Ok(Some(storage)) + } else { + Ok(None) } - - Err(Error::Other("Failed to fetch the block body".into())) } /// Fetch the body (vector of extrinsics) of this block. pub async fn call( &self, function: String, - call_parameters: &[u8], + call_parameters: Option<&[u8]>, ) -> Result, Error> { + let call_parameters = call_parameters.unwrap_or(Default::default()); + let mut sub = self .client .rpc() @@ -197,11 +150,17 @@ where .map_err(|err| Error::Other(err.to_string()))?; Ok(bytes) } - _ => Err(Error::Other("Failed to fetch the block body".into())), + _ => { + Err(Error::Other( + "Failed to execute the runtime API call".into(), + )) + } } } - Err(Error::Other("Failed to fetch the block body".into())) + Err(Error::Other( + "Failed to execute the runtime API call".into(), + )) } } diff --git a/subxt/src/rpc/rpc.rs b/subxt/src/rpc/rpc.rs index 38c5c8c350..4bd299327c 100644 --- a/subxt/src/rpc/rpc.rs +++ b/subxt/src/rpc/rpc.rs @@ -43,6 +43,7 @@ use super::{ rpc_params, subscription_events::{ ChainHeadEvent, + ChainHeadResult, FollowEvent, }, RpcClient, @@ -610,7 +611,7 @@ impl Rpc { Ok(subscription) } - /// Subscribe to the chain head follow for newly added block hashes. + /// Subscribe to `chainHead_follow` directly to obtain all reported blocks. pub async fn subscribe_chainhead_follow( &self, runtime_updates: bool, @@ -627,7 +628,8 @@ impl Rpc { Ok(subscription) } - /// Subscribe to the chain head body. + /// Subscribe to `chainHead_body` directly to obtain events regarding the + /// block's body. pub async fn subscribe_chainhead_body( &self, subscription_id: String, @@ -645,8 +647,38 @@ impl Rpc { Ok(subscription) } - /// Get the chain head header. - pub async fn get_chainhead_header( + /// Subscribe to `chainHead_body` events and fetch the block's body. + /// + /// # Note + /// + /// This is a wrapper over [`subscribe_chainhead_body`]. + pub async fn fetch_chainhead_body( + &self, + subscription_id: String, + hash: T::Hash, + ) -> Result>, Error> { + let mut sub = self.subscribe_chainhead_body(subscription_id, hash).await?; + + if let Some(event) = sub.next().await { + let event = event?; + + return match event { + ChainHeadEvent::Done(ChainHeadResult { result }) => { + let bytes = hex::decode(result.trim_start_matches("0x")) + .map_err(|err| Error::Other(err.to_string()))?; + + let extrinsics: Vec> = Decode::decode(&mut &bytes[..])?; + Ok(extrinsics) + } + _ => Err(Error::Other("Failed to fetch the block body".into())), + } + } + + Err(Error::Other("Failed to fetch the block body".into())) + } + + /// Get the block's body using the `chainHead_header` method. + pub async fn chainhead_header( &self, subscription_id: String, hash: T::Hash, @@ -662,22 +694,46 @@ impl Rpc { Ok(header) } - /// Get the chain head genesis hash. - pub async fn get_chainhead_genesis_hash( + /// Parse `chaiHead_header` response and return the block's header. + /// + /// # Note + /// + /// This is a wrapper over `chainhead_header`. + pub async fn fetch_chainhead_header( &self, - ) -> Result { + subscription_id: String, + hash: T::Hash, + ) -> Result { + let header = self.chainhead_header(subscription_id, hash).await?; + + let header = match header { + Some(header) => header, + None => { + return Err(Error::Other( + "Chain does not contain the header of this block".into(), + )) + } + }; + + let bytes = hex::decode(header.trim_start_matches("0x")) + .map_err(|err| Error::Other(err.to_string()))?; + + let header: T::Header = Decode::decode(&mut &bytes[..])?; + Ok(header) + } + + /// Get the chain head genesis hash. + pub async fn chainhead_genesis_hash(&self) -> Result { let hash = self .client - .request( - "chainHead_unstable_genesisHash", - rpc_params![], - ) + .request("chainHead_unstable_genesisHash", rpc_params![]) .await?; Ok(hash) } - /// Subscribe to the chain head storage. + /// Subscribe to `chainHead_storage` directly to obtain events regarding the + /// block's storage. pub async fn subscribe_chainhead_storage( &self, subscription_id: String, @@ -697,7 +753,46 @@ impl Rpc { Ok(subscription) } - /// Subscribe to the chain head call. + /// Subscribe to `chainHead_storage` events and return the storage at the + /// provided key. + /// + /// # Note + /// + /// This is a wrapper over [`subscribe_chainhead_storage`]. + pub async fn fetch_chainhead_storage( + &self, + subscription_id: String, + hash: T::Hash, + key: &[u8], + child_key: Option<&[u8]>, + ) -> Result>, Error> { + let mut sub = self + .subscribe_chainhead_storage(subscription_id, hash, key, child_key) + .await?; + + if let Some(event) = sub.next().await { + let event = event?; + + return match event { + ChainHeadEvent::Done(ChainHeadResult { result }) => { + let result = match result { + Some(result) => result, + None => return Ok(None), + }; + + let bytes = hex::decode(result.trim_start_matches("0x")) + .map_err(|err| Error::Other(err.to_string()))?; + Ok(Some(bytes)) + } + _ => Err(Error::Other("Failed to fetch the block storage".into())), + } + } + + Err(Error::Other("Failed to fetch the block storage".into())) + } + + /// Subscribe to `chainHead_call` directly to obtain events regarding the + /// runtime API call. pub async fn subscribe_chainhead_call( &self, subscription_id: String, @@ -717,6 +812,39 @@ impl Rpc { Ok(subscription) } + /// Subscribe to `chainHead_call` events and return the result of the provided + /// runtime API call. + /// + /// # Note + /// + /// This is a wrapper over [`subscribe_chainhead_call`]. + pub async fn fetch_chainhead_call( + &self, + subscription_id: String, + hash: T::Hash, + function: String, + call_parameters: &[u8], + ) -> Result, Error> { + let mut sub = self + .subscribe_chainhead_call(subscription_id, hash, function, call_parameters) + .await?; + + if let Some(event) = sub.next().await { + let event = event?; + + return match event { + ChainHeadEvent::Done(ChainHeadResult { result }) => { + let bytes = hex::decode(result.trim_start_matches("0x")) + .map_err(|err| Error::Other(err.to_string()))?; + Ok(bytes) + } + _ => Err(Error::Other("Failed to execute runtime API call".into())), + } + } + + Err(Error::Other("Failed to execute runtime API call".into())) + } + /// Subscribe to finalized block headers. /// /// Note: this may not produce _every_ block in the finalized chain; @@ -842,6 +970,11 @@ impl Rpc { } fn to_hex(bytes: impl AsRef<[u8]>) -> String { + let bytes_ref = bytes.as_ref(); + if bytes_ref.is_empty() { + return "".into() + } + format!("0x{}", hex::encode(bytes.as_ref())) }