rpc: Fetch metadata and runtime using chainHead subscription

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-12-07 16:40:46 +00:00
parent 1645a1d3a3
commit ad6a1f2e68
4 changed files with 113 additions and 19 deletions
+3 -2
View File
@@ -31,7 +31,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let genesis = api.rpc().chainhead_genesis_hash().await?;
println!("Genesis: {:?}", genesis);
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(true).await?;
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(false).await?;
println!("Following subscription...");
// Handle all subscriptions from the `chainHead_follow`.
while let Some(block) = follow_sub.next().await {
let block = block?;
@@ -55,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let call = block
.call("AccountNonceApi_account_nonce".into(), Some(&call_params))
.await?;
println!("[hash={:?}] call={:?}", block.hash(), call);
// println!("[hash={:?}] call={:?}", block.hash(), call);
}
// Subscribe to the `chainHead_follow` method.
+6 -6
View File
@@ -90,18 +90,18 @@ impl<T: Config> OnlineClient<T> {
) -> Result<OnlineClient<T>, Error> {
let rpc = Rpc::new(rpc_client);
let (genesis_hash, runtime_version, metadata) = future::join3(
rpc.genesis_hash(),
rpc.runtime_version(None),
rpc.metadata(None),
let (genesis_hash, result) = future::join(
rpc.chainhead_genesis_hash(),
rpc.fetch_chainhead_call_metadata_runtime(),
)
.await;
let (metadata, runtime_version) = result?;
Ok(OnlineClient {
inner: Arc::new(RwLock::new(Inner {
genesis_hash: genesis_hash?,
runtime_version: runtime_version?,
metadata: metadata?,
runtime_version,
metadata,
})),
rpc,
})
+27 -10
View File
@@ -409,18 +409,10 @@ pub enum InvalidMetadataError {
TypeDefNotVariant(u32),
}
impl TryFrom<RuntimeMetadataPrefixed> for Metadata {
impl TryFrom<RuntimeMetadataV14> for Metadata {
type Error = InvalidMetadataError;
fn try_from(metadata: RuntimeMetadataPrefixed) -> Result<Self, Self::Error> {
if metadata.0 != META_RESERVED {
return Err(InvalidMetadataError::InvalidPrefix)
}
let metadata = match metadata.1 {
RuntimeMetadata::V14(meta) => meta,
_ => return Err(InvalidMetadataError::InvalidVersion),
};
fn try_from(metadata: RuntimeMetadataV14) -> Result<Self, Self::Error> {
let get_type_def_variant = |type_id: u32| {
let ty = metadata
.types
@@ -547,6 +539,31 @@ impl TryFrom<RuntimeMetadataPrefixed> for Metadata {
}
}
impl TryFrom<RuntimeMetadata> for Metadata {
type Error = InvalidMetadataError;
fn try_from(metadata: RuntimeMetadata) -> Result<Self, Self::Error> {
let metadata = match metadata {
RuntimeMetadata::V14(meta) => meta,
_ => return Err(InvalidMetadataError::InvalidVersion),
};
metadata.try_into()
}
}
impl TryFrom<RuntimeMetadataPrefixed> for Metadata {
type Error = InvalidMetadataError;
fn try_from(metadata: RuntimeMetadataPrefixed) -> Result<Self, Self::Error> {
if metadata.0 != META_RESERVED {
return Err(InvalidMetadataError::InvalidPrefix)
}
metadata.1.try_into()
}
}
#[cfg(test)]
mod tests {
use super::*;
+77 -1
View File
@@ -45,6 +45,7 @@ use super::{
ChainHeadEvent,
ChainHeadResult,
FollowEvent,
RuntimeEvent,
},
RpcClient,
RpcClientT,
@@ -60,7 +61,10 @@ use codec::{
Decode,
Encode,
};
use frame_metadata::RuntimeMetadataPrefixed;
use frame_metadata::{
OpaqueMetadata,
RuntimeMetadataPrefixed,
};
use serde::{
Deserialize,
Serialize,
@@ -845,6 +849,78 @@ impl<T: Config> Rpc<T> {
Err(Error::Other("Failed to execute runtime API call".into()))
}
/// Call into the runtime API to fetch the metadata and the runtime event.
pub async fn fetch_chainhead_call_metadata_runtime(
&self,
) -> Result<(Metadata, RuntimeVersion), Error> {
let (bytes, event) = {
let mut sub = self.subscribe_chainhead_follow(true).await?;
let subscription_id_rpc = match sub.subscription_id() {
Some(id) => id.clone(),
None => return Err(Error::Other("Subscription without ID".into())),
};
println!("Recv from jsonrpsee: {:?}", subscription_id_rpc);
// TODO: Jsonrpsee needs update.
let subscription_id = "A".to_string();
let event = match sub.next().await {
Some(event) => event,
None => return Err(Error::Other("Subscription without ID".into())),
};
let event = event?;
let event = match event {
FollowEvent::Initialized(init) => init,
_ => {
return Err(Error::Other(
"Unexpected event from chainHead_follow".into(),
))
}
};
let bytes = self
.fetch_chainhead_call(
subscription_id,
event.finalized_block_hash,
"Metadata_metadata".into(),
&vec![],
)
.await?;
println!("Dropping subscription");
// Manually cancel the susbcription
// {"id": 1, "method": "eth_unsubscribe", "params": ["0xcd0c3e8af590364c09d0fa6a1210faf5"]}
let result: bool = self
.client
.request(
"chainHead_unstable_unfollow",
rpc_params![subscription_id_rpc],
)
.await?;
println!("Subscription dropped with result={:?}", result);
drop(sub);
(bytes, event)
};
let metadata: OpaqueMetadata = Decode::decode(&mut &bytes[..])?;
let bytes = metadata.0;
let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?;
let metadata: Metadata = meta.try_into()?;
let runtime_version = match event.finalized_block_runtime {
Some(RuntimeEvent::Valid(event)) => event.spec,
_ => return Err(Error::Other("Expected runtime event".into())),
};
Ok((metadata, runtime_version))
}
/// Subscribe to finalized block headers.
///
/// Note: this may not produce _every_ block in the finalized chain;