blocks: Move parsing logic to the RPC for the chainHead methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-11-30 17:30:38 +00:00
parent e7192f350a
commit 1645a1d3a3
4 changed files with 179 additions and 86 deletions
+1
View File
@@ -19,3 +19,4 @@ futures = "0.3.13"
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
hex = "0.4.3"
tracing-subscriber = "0.3.11"
array-bytes = "4.1"
+2 -2
View File
@@ -28,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a client to use:
let api = OnlineClient::<PolkadotConfig>::new().await?;
let genesis = api.rpc().get_chainhead_genesis_hash().await?;
let genesis = api.rpc().chainhead_genesis_hash().await?;
println!("Genesis: {:?}", genesis);
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(true).await?;
@@ -53,7 +53,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let call_params = AccountKeyring::Alice.to_account_id().encode();
let call = block
.call("AccountNonceApi_account_nonce".into(), &call_params)
.call("AccountNonceApi_account_nonce".into(), Some(&call_params))
.await?;
println!("[hash={:?}] call={:?}", block.hash(), call);
}
+30 -71
View File
@@ -27,7 +27,6 @@ use crate::{
},
Config,
};
use codec::Decode;
use derivative::Derivative;
use futures::lock::Mutex as AsyncMutex;
use sp_runtime::traits::{
@@ -72,52 +71,18 @@ where
{
/// Fetch the body (vector of extrinsics) of this block.
pub async fn body(&self) -> Result<Vec<Vec<u8>>, Error> {
let mut sub = self
.client
self.client
.rpc()
.subscribe_chainhead_body(self.subscription_id.clone(), self.hash)
.await?;
if let Some(event) = sub.next().await {
let event = event?;
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let extrinsics: Vec<Vec<u8>> = Decode::decode(&mut &bytes[..])?;
Ok(extrinsics)
}
_ => Err(Error::Other("Failed to fetch the block body".into())),
}
}
Err(Error::Other("Failed to fetch the block body".into()))
.fetch_chainhead_body(self.subscription_id.clone(), self.hash)
.await
}
/// Fetch the header of this block.
pub async fn header(&self) -> Result<T::Header, Error> {
let header = self
.client
self.client
.rpc()
.get_chainhead_header(self.subscription_id.clone(), self.hash)
.await?;
let header = match header {
Some(header) => header,
None => {
return Err(Error::Other(
"Chain does not contain the header of this block".into(),
))
}
};
let bytes = hex::decode(header.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let header: T::Header = Decode::decode(&mut &bytes[..])?;
Ok(header)
.fetch_chainhead_header(self.subscription_id.clone(), self.hash)
.await
}
/// Fetch the header of this block.
@@ -132,10 +97,10 @@ where
let metadata = self.client.metadata();
let key_bytes = utils::storage_address_bytes(key, &metadata)?;
let mut sub = self
let storage_bytes = self
.client
.rpc()
.subscribe_chainhead_storage(
.fetch_chainhead_storage(
self.subscription_id.clone(),
self.hash,
&key_bytes,
@@ -143,40 +108,28 @@ where
)
.await?;
if let Some(event) = sub.next().await {
let event = event?;
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let result = match result {
Some(result) => result,
None => return Ok(None),
};
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let storage = <Address::Target as DecodeWithMetadata>::decode_storage_with_metadata(
&mut &*bytes,
key.pallet_name(),
key.entry_name(),
&metadata,
)?;
Ok(Some(storage))
}
_ => Err(Error::Other("Failed to fetch the block body".into())),
}
if let Some(bytes) = storage_bytes {
let storage =
<Address::Target as DecodeWithMetadata>::decode_storage_with_metadata(
&mut &*bytes,
key.pallet_name(),
key.entry_name(),
&metadata,
)?;
Ok(Some(storage))
} else {
Ok(None)
}
Err(Error::Other("Failed to fetch the block body".into()))
}
/// Fetch the body (vector of extrinsics) of this block.
pub async fn call(
&self,
function: String,
call_parameters: &[u8],
call_parameters: Option<&[u8]>,
) -> Result<Vec<u8>, Error> {
let call_parameters = call_parameters.unwrap_or(Default::default());
let mut sub = self
.client
.rpc()
@@ -197,11 +150,17 @@ where
.map_err(|err| Error::Other(err.to_string()))?;
Ok(bytes)
}
_ => Err(Error::Other("Failed to fetch the block body".into())),
_ => {
Err(Error::Other(
"Failed to execute the runtime API call".into(),
))
}
}
}
Err(Error::Other("Failed to fetch the block body".into()))
Err(Error::Other(
"Failed to execute the runtime API call".into(),
))
}
}
+146 -13
View File
@@ -43,6 +43,7 @@ use super::{
rpc_params,
subscription_events::{
ChainHeadEvent,
ChainHeadResult,
FollowEvent,
},
RpcClient,
@@ -610,7 +611,7 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to the chain head follow for newly added block hashes.
/// Subscribe to `chainHead_follow` directly to obtain all reported blocks.
pub async fn subscribe_chainhead_follow(
&self,
runtime_updates: bool,
@@ -627,7 +628,8 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to the chain head body.
/// Subscribe to `chainHead_body` directly to obtain events regarding the
/// block's body.
pub async fn subscribe_chainhead_body(
&self,
subscription_id: String,
@@ -645,8 +647,38 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Get the chain head header.
pub async fn get_chainhead_header(
/// Subscribe to `chainHead_body` events and fetch the block's body.
///
/// # Note
///
/// This is a wrapper over [`subscribe_chainhead_body`].
pub async fn fetch_chainhead_body(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Vec<Vec<u8>>, Error> {
let mut sub = self.subscribe_chainhead_body(subscription_id, hash).await?;
if let Some(event) = sub.next().await {
let event = event?;
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let extrinsics: Vec<Vec<u8>> = Decode::decode(&mut &bytes[..])?;
Ok(extrinsics)
}
_ => Err(Error::Other("Failed to fetch the block body".into())),
}
}
Err(Error::Other("Failed to fetch the block body".into()))
}
/// Get the block's body using the `chainHead_header` method.
pub async fn chainhead_header(
&self,
subscription_id: String,
hash: T::Hash,
@@ -662,22 +694,46 @@ impl<T: Config> Rpc<T> {
Ok(header)
}
/// Get the chain head genesis hash.
pub async fn get_chainhead_genesis_hash(
/// Parse `chaiHead_header` response and return the block's header.
///
/// # Note
///
/// This is a wrapper over `chainhead_header`.
pub async fn fetch_chainhead_header(
&self,
) -> Result<T::Hash, Error> {
subscription_id: String,
hash: T::Hash,
) -> Result<T::Header, Error> {
let header = self.chainhead_header(subscription_id, hash).await?;
let header = match header {
Some(header) => header,
None => {
return Err(Error::Other(
"Chain does not contain the header of this block".into(),
))
}
};
let bytes = hex::decode(header.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let header: T::Header = Decode::decode(&mut &bytes[..])?;
Ok(header)
}
/// Get the chain head genesis hash.
pub async fn chainhead_genesis_hash(&self) -> Result<T::Hash, Error> {
let hash = self
.client
.request(
"chainHead_unstable_genesisHash",
rpc_params![],
)
.request("chainHead_unstable_genesisHash", rpc_params![])
.await?;
Ok(hash)
}
/// Subscribe to the chain head storage.
/// Subscribe to `chainHead_storage` directly to obtain events regarding the
/// block's storage.
pub async fn subscribe_chainhead_storage(
&self,
subscription_id: String,
@@ -697,7 +753,46 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to the chain head call.
/// Subscribe to `chainHead_storage` events and return the storage at the
/// provided key.
///
/// # Note
///
/// This is a wrapper over [`subscribe_chainhead_storage`].
pub async fn fetch_chainhead_storage(
&self,
subscription_id: String,
hash: T::Hash,
key: &[u8],
child_key: Option<&[u8]>,
) -> Result<Option<Vec<u8>>, Error> {
let mut sub = self
.subscribe_chainhead_storage(subscription_id, hash, key, child_key)
.await?;
if let Some(event) = sub.next().await {
let event = event?;
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let result = match result {
Some(result) => result,
None => return Ok(None),
};
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
Ok(Some(bytes))
}
_ => Err(Error::Other("Failed to fetch the block storage".into())),
}
}
Err(Error::Other("Failed to fetch the block storage".into()))
}
/// Subscribe to `chainHead_call` directly to obtain events regarding the
/// runtime API call.
pub async fn subscribe_chainhead_call(
&self,
subscription_id: String,
@@ -717,6 +812,39 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to `chainHead_call` events and return the result of the provided
/// runtime API call.
///
/// # Note
///
/// This is a wrapper over [`subscribe_chainhead_call`].
pub async fn fetch_chainhead_call(
&self,
subscription_id: String,
hash: T::Hash,
function: String,
call_parameters: &[u8],
) -> Result<Vec<u8>, Error> {
let mut sub = self
.subscribe_chainhead_call(subscription_id, hash, function, call_parameters)
.await?;
if let Some(event) = sub.next().await {
let event = event?;
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
Ok(bytes)
}
_ => Err(Error::Other("Failed to execute runtime API call".into())),
}
}
Err(Error::Other("Failed to execute runtime API call".into()))
}
/// Subscribe to finalized block headers.
///
/// Note: this may not produce _every_ block in the finalized chain;
@@ -842,6 +970,11 @@ impl<T: Config> Rpc<T> {
}
fn to_hex(bytes: impl AsRef<[u8]>) -> String {
let bytes_ref = bytes.as_ref();
if bytes_ref.is_empty() {
return "".into()
}
format!("0x{}", hex::encode(bytes.as_ref()))
}