blocks: Fetch chainHead storage

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-11-30 12:44:47 +00:00
parent 9ef0a70742
commit a635f3bebf
3 changed files with 93 additions and 6 deletions
+15 -6
View File
@@ -29,14 +29,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(false).await?;
// Handle all subscriptions from the `chainHead_follow`.
while let Some(event) = follow_sub.next().await {
let event = event?;
while let Some(block) = follow_sub.next().await {
let block = block?;
let body = event.body().await?;
println!("[hash={:?}] body={:?}", event.hash(), body);
let body = block.body().await?;
println!("[hash={:?}] body={:?}", block.hash(), body);
let header = event.header().await?;
println!("[hash={:?}] header={:?}", event.hash(), header);
let header = block.header().await?;
println!("[hash={:?}] header={:?}", block.hash(), header);
let active_era_addr = polkadot::storage().staking().active_era();
let era = block.storage(&active_era_addr).await?.unwrap();
println!(
"[hash={:?}] storage index: {:?}, start: {:?}",
block.hash(),
era.index,
era.start
);
}
// Subscribe to the `chainHead_follow` method.
+59
View File
@@ -12,6 +12,7 @@ use crate::{
Error,
},
events,
metadata::DecodeWithMetadata,
rpc::{
subscription_events::{
ChainHeadEvent,
@@ -19,6 +20,11 @@ use crate::{
},
ChainBlockResponse,
},
storage::{
address::Yes,
utils,
StorageAddress,
},
Config,
};
use codec::Decode;
@@ -115,6 +121,59 @@ where
let header: T::Header = Decode::decode(&mut &bytes[..])?;
Ok(header)
}
/// Fetch the header of this block.
pub async fn storage<'a, Address>(
&self,
key: &'a Address,
) -> Result<Option<<Address::Target as DecodeWithMetadata>::Target>, Error>
where
Address: StorageAddress<IsFetchable = Yes> + 'a,
{
// Look up the return type ID to enable DecodeWithMetadata:
let metadata = self.client.metadata();
let key_bytes = utils::storage_address_bytes(key, &metadata)?;
let mut sub = self
.client
.rpc()
.subscribe_chainhead_storage(
self.subscription_id.clone(),
self.hash,
&key_bytes,
None,
)
.await?;
if let Some(event) = sub.next().await {
let event = event?;
println!("Got event: {:?}", event);
return match event {
ChainHeadEvent::Done(ChainHeadResult { result }) => {
let result = match result {
Some(result) => result,
None => return Ok(None),
};
let bytes = hex::decode(result.trim_start_matches("0x"))
.map_err(|err| Error::Other(err.to_string()))?;
let storage = <Address::Target as DecodeWithMetadata>::decode_storage_with_metadata(
&mut &*bytes,
key.pallet_name(),
key.entry_name(),
&metadata,
)?;
Ok(Some(storage))
}
_ => Err(Error::Other("Failed to fetch the block body".into())),
}
}
Err(Error::Other("Failed to fetch the block body".into()))
}
}
/// A representation of a block.
+19
View File
@@ -662,6 +662,25 @@ impl<T: Config> Rpc<T> {
Ok(header)
}
/// Subscribe to the chain head storage.
pub async fn subscribe_chainhead_storage(
&self,
subscription_id: String,
hash: T::Hash,
key: &[u8],
child_key: Option<&[u8]>,
) -> Result<Subscription<ChainHeadEvent<Option<String>>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_storage",
rpc_params![subscription_id, hash, to_hex(key), child_key.map(to_hex)],
"chainHead_unstable_stopStorage",
)
.await?;
Ok(subscription)
}
/// Subscribe to finalized block headers.
///
/// Note: this may not produce _every_ block in the finalized chain;