Add chainHead RPC methods (#766)

* rpc/types: Add chainHead event types

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Add `chainHead` RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Fix chainHead doc links

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/rpc/rpc.rs

Co-authored-by: James Wilson <james@jsdw.me>

* tests: Test the chainHead RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc: Improve `chainhead_unstable_follow` docs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Alexandru Vasile
2023-01-19 13:53:32 +02:00
committed by GitHub
parent 977f2a3333
commit 0750ccd5a0
3 changed files with 731 additions and 2 deletions
+162 -1
View File
@@ -41,7 +41,11 @@
use super::{
rpc_params,
types,
types::{
self,
ChainHeadEvent,
FollowEvent,
},
RpcClient,
RpcClientT,
Subscription,
@@ -465,6 +469,163 @@ impl<T: Config> Rpc<T> {
self.client.request("system_dryRun", params).await?;
Ok(types::decode_dry_run_result(&mut &*result_bytes.0)?)
}
/// Subscribe to `chainHead_unstable_follow` to obtain all reported blocks by the chain.
///
/// The subscription ID can be used to make queries for the
/// block's body ([`chainhead_unstable_body`](Rpc::chainhead_unstable_follow)),
/// block's header ([`chainhead_unstable_header`](Rpc::chainhead_unstable_header)),
/// block's storage ([`chainhead_unstable_storage`](Rpc::chainhead_unstable_storage)) and submitting
/// runtime API calls at this block ([`chainhead_unstable_call`](Rpc::chainhead_unstable_call)).
///
/// # Note
///
/// When the user is no longer interested in a block, the user is responsible
/// for calling the [`chainhead_unstable_unpin`](Rpc::chainhead_unstable_unpin) method.
/// Failure to do so will result in the subscription being stopped by generating the `Stop` event.
pub async fn chainhead_unstable_follow(
&self,
runtime_updates: bool,
) -> Result<Subscription<FollowEvent<T::Hash>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_follow",
rpc_params![runtime_updates],
"chainHead_unstable_unfollow",
)
.await?;
Ok(subscription)
}
/// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_body(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
"chainHead_unstable_stopBody",
)
.await?;
Ok(subscription)
}
/// Get the block's body using the `chainHead_unstable_header` method.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_header(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Option<String>, Error> {
let header = self
.client
.request(
"chainHead_unstable_header",
rpc_params![subscription_id, hash],
)
.await?;
Ok(header)
}
/// Subscribe to `chainHead_storage` to obtain events regarding the
/// block's storage.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_storage(
&self,
subscription_id: String,
hash: T::Hash,
key: &[u8],
child_key: Option<&[u8]>,
) -> Result<Subscription<ChainHeadEvent<Option<String>>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_storage",
rpc_params![subscription_id, hash, to_hex(key), child_key.map(to_hex)],
"chainHead_unstable_stopStorage",
)
.await?;
Ok(subscription)
}
/// Subscribe to `chainHead_call` to obtain events regarding the
/// runtime API call.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_call(
&self,
subscription_id: String,
hash: T::Hash,
function: String,
call_parameters: &[u8],
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
.client
.subscribe(
"chainHead_unstable_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
"chainHead_unstable_stopCall",
)
.await?;
Ok(subscription)
}
/// Unpin a block reported by the `chainHead_follow` subscription.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](Rpc::chainhead_unstable_follow).
pub async fn chainhead_unstable_unpin(
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_unpin",
rpc_params![subscription_id, hash],
)
.await?;
Ok(())
}
/// Get genesis hash obtained from the `chainHead_genesisHash` method.
pub async fn chainhead_unstable_genesishash(&self) -> Result<T::Hash, Error> {
let hash = self
.client
.request("chainHead_unstable_genesisHash", rpc_params![])
.await?;
Ok(hash)
}
}
fn to_hex(bytes: impl AsRef<[u8]>) -> String {
+384
View File
@@ -398,6 +398,390 @@ pub struct Health {
pub should_have_peers: bool,
}
/// The operation could not be processed due to an error.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ErrorEvent {
/// Reason of the error.
pub error: String,
}
/// The runtime specification of the current block.
///
/// This event is generated for:
/// - the first announced block by the follow subscription
/// - blocks that suffered a change in runtime compared with their parents
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeVersionEvent {
/// The runtime version.
pub spec: RuntimeVersion,
}
/// The runtime event generated if the `follow` subscription
/// has set the `runtime_updates` flag.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum RuntimeEvent {
/// The runtime version of this block.
Valid(RuntimeVersionEvent),
/// The runtime could not be obtained due to an error.
Invalid(ErrorEvent),
}
/// Contain information about the latest finalized block.
///
/// # Note
///
/// This is the first event generated by the `follow` subscription
/// and is submitted only once.
///
/// If the `runtime_updates` flag is set, then this event contains
/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The runtime version of the finalized block.
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// the `follow` subscription.
pub finalized_block_runtime: Option<RuntimeEvent>,
}
/// Indicate a new non-finalized block.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NewBlock<Hash> {
/// The hash of the new block.
pub block_hash: Hash,
/// The parent hash of the new block.
pub parent_block_hash: Hash,
/// The runtime version of the new block.
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// the `follow` subscription.
pub new_runtime: Option<RuntimeEvent>,
}
/// Indicate the block hash of the new best block.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BestBlockChanged<Hash> {
/// The block hash of the new best block.
pub best_block_hash: Hash,
}
/// Indicate the finalized and pruned block hashes.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Finalized<Hash> {
/// Block hashes that are finalized.
pub finalized_block_hashes: Vec<Hash>,
/// Block hashes that are pruned (removed).
pub pruned_block_hashes: Vec<Hash>,
}
/// The event generated by the `chainHead_follow` method.
///
/// The events are generated in the following order:
/// 1. Initialized - generated only once to signal the
/// latest finalized block
/// 2. NewBlock - a new block was added.
/// 3. BestBlockChanged - indicate that the best block
/// is now the one from this event. The block was
/// announced priorly with the `NewBlock` event.
/// 4. Finalized - State the finalized and pruned blocks.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum FollowEvent<Hash> {
/// The latest finalized block.
///
/// This event is generated only once.
Initialized(Initialized<Hash>),
/// A new non-finalized block was added.
NewBlock(NewBlock<Hash>),
/// The best block of the chain.
BestBlockChanged(BestBlockChanged<Hash>),
/// A list of finalized and pruned blocks.
Finalized(Finalized<Hash>),
/// The subscription is dropped and no further events
/// will be generated.
Stop,
}
/// The result of a chain head method.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainHeadResult<T> {
/// Result of the method.
pub result: T,
}
/// The event generated by the body / call / storage methods.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum ChainHeadEvent<T> {
/// The request completed successfully.
Done(ChainHeadResult<T>),
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible(ErrorEvent),
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
/// The transaction was broadcasted to a number of peers.
///
/// # Note
///
/// The RPC does not guarantee that the peers have received the
/// transaction.
///
/// When the number of peers is zero, the event guarantees that
/// shutting down the local node will lead to the transaction
/// not being included in the chain.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionBroadcasted {
/// The number of peers the transaction was broadcasted to.
#[serde(with = "as_string")]
pub num_peers: usize,
}
/// The transaction was included in a block of the chain.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionBlock<Hash> {
/// The hash of the block the transaction was included into.
pub hash: Hash,
/// The index (zero-based) of the transaction within the body of the block.
#[serde(with = "as_string")]
pub index: usize,
}
/// The transaction could not be processed due to an error.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionError {
/// Reason of the error.
pub error: String,
}
/// The transaction was dropped because of exceeding limits.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionDropped {
/// True if the transaction was broadcasted to other peers and
/// may still be included in the block.
pub broadcasted: bool,
/// Reason of the event.
pub error: String,
}
/// Possible transaction status events.
///
/// The status events can be grouped based on their kinds as:
///
/// 1. Runtime validated the transaction:
/// - `Validated`
///
/// 2. Inside the `Ready` queue:
/// - `Broadcast`
///
/// 3. Leaving the pool:
/// - `BestChainBlockIncluded`
/// - `Invalid`
///
/// 4. Block finalized:
/// - `Finalized`
///
/// 5. At any time:
/// - `Dropped`
/// - `Error`
///
/// The subscription's stream is considered finished whenever the following events are
/// received: `Finalized`, `Error`, `Invalid` or `Dropped`. However, the user is allowed
/// to unsubscribe at any moment.
#[derive(Debug, Clone, PartialEq, Deserialize)]
// We need to manually specify the trait bounds for the `Hash` trait to ensure `into` and
// `from` still work.
#[serde(bound(deserialize = "Hash: Deserialize<'de> + Clone"))]
#[serde(from = "TransactionEventIR<Hash>")]
pub enum TransactionEvent<Hash> {
/// The transaction was validated by the runtime.
Validated,
/// The transaction was broadcasted to a number of peers.
Broadcasted(TransactionBroadcasted),
/// The transaction was included in a best block of the chain.
///
/// # Note
///
/// This may contain `None` if the block is no longer a best
/// block of the chain.
BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
/// The transaction was included in a finalized block.
Finalized(TransactionBlock<Hash>),
/// The transaction could not be processed due to an error.
Error(TransactionError),
/// The transaction is marked as invalid.
Invalid(TransactionError),
/// The client was not capable of keeping track of this transaction.
Dropped(TransactionDropped),
}
/// Intermediate representation (IR) for the transaction events
/// that handles block events only.
///
/// The block events require a JSON compatible interpretation similar to:
///
/// ```json
/// { event: "EVENT", block: { hash: "0xFF", index: 0 } }
/// ```
///
/// This IR is introduced to circumvent that the block events need to
/// be serialized/deserialized with "tag" and "content", while other
/// events only require "tag".
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event", content = "block")]
enum TransactionEventBlockIR<Hash> {
/// The transaction was included in the best block of the chain.
BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
/// The transaction was included in a finalized block of the chain.
Finalized(TransactionBlock<Hash>),
}
/// Intermediate representation (IR) for the transaction events
/// that handles non-block events only.
///
/// The non-block events require a JSON compatible interpretation similar to:
///
/// ```json
/// { event: "EVENT", num_peers: 0 }
/// ```
///
/// This IR is introduced to circumvent that the block events need to
/// be serialized/deserialized with "tag" and "content", while other
/// events only require "tag".
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
enum TransactionEventNonBlockIR {
Validated,
Broadcasted(TransactionBroadcasted),
Error(TransactionError),
Invalid(TransactionError),
Dropped(TransactionDropped),
}
/// Intermediate representation (IR) used for serialization/deserialization of the
/// [`TransactionEvent`] in a JSON compatible format.
///
/// Serde cannot mix `#[serde(tag = "event")]` with `#[serde(tag = "event", content = "block")]`
/// for specific enum variants. Therefore, this IR is introduced to circumvent this
/// restriction, while exposing a simplified [`TransactionEvent`] for users of the
/// rust ecosystem.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(bound(deserialize = "Hash: Deserialize<'de>"))]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
enum TransactionEventIR<Hash> {
Block(TransactionEventBlockIR<Hash>),
NonBlock(TransactionEventNonBlockIR),
}
impl<Hash> From<TransactionEvent<Hash>> for TransactionEventIR<Hash> {
fn from(value: TransactionEvent<Hash>) -> Self {
match value {
TransactionEvent::Validated => {
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated)
}
TransactionEvent::Broadcasted(event) => {
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted(
event,
))
}
TransactionEvent::BestChainBlockIncluded(event) => {
TransactionEventIR::Block(
TransactionEventBlockIR::BestChainBlockIncluded(event),
)
}
TransactionEvent::Finalized(event) => {
TransactionEventIR::Block(TransactionEventBlockIR::Finalized(event))
}
TransactionEvent::Error(event) => {
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Error(event))
}
TransactionEvent::Invalid(event) => {
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Invalid(event))
}
TransactionEvent::Dropped(event) => {
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Dropped(event))
}
}
}
}
impl<Hash> From<TransactionEventIR<Hash>> for TransactionEvent<Hash> {
fn from(value: TransactionEventIR<Hash>) -> Self {
match value {
TransactionEventIR::NonBlock(status) => {
match status {
TransactionEventNonBlockIR::Validated => TransactionEvent::Validated,
TransactionEventNonBlockIR::Broadcasted(event) => {
TransactionEvent::Broadcasted(event)
}
TransactionEventNonBlockIR::Error(event) => {
TransactionEvent::Error(event)
}
TransactionEventNonBlockIR::Invalid(event) => {
TransactionEvent::Invalid(event)
}
TransactionEventNonBlockIR::Dropped(event) => {
TransactionEvent::Dropped(event)
}
}
}
TransactionEventIR::Block(block) => {
match block {
TransactionEventBlockIR::Finalized(event) => {
TransactionEvent::Finalized(event)
}
TransactionEventBlockIR::BestChainBlockIncluded(event) => {
TransactionEvent::BestChainBlockIncluded(event)
}
}
}
}
}
}
/// Serialize and deserialize helper as string.
mod as_string {
use super::*;
use serde::Deserializer;
pub fn deserialize<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<usize, D::Error> {
String::deserialize(deserializer)?
.parse()
.map_err(|e| serde::de::Error::custom(format!("Parsing failed: {}", e)))
}
}
#[cfg(test)]
mod test {
use super::*;
+185 -1
View File
@@ -11,9 +11,11 @@ use crate::{
wait_for_blocks,
},
};
use assert_matches::assert_matches;
use codec::{
Compact,
Decode,
Encode,
};
use frame_metadata::RuntimeMetadataPrefixed;
use sp_core::{
@@ -24,7 +26,15 @@ use sp_core::{
use sp_keyring::AccountKeyring;
use subxt::{
error::DispatchError,
rpc::types::DryRunError,
rpc::types::{
ChainHeadEvent,
DryRunError,
FollowEvent,
Initialized,
RuntimeEvent,
RuntimeVersionEvent,
},
utils::AccountId32,
};
#[tokio::test]
@@ -281,3 +291,177 @@ async fn rpc_state_call() {
let metadata = metadata.runtime_metadata();
assert_eq!(&metadata_call, metadata);
}
#[tokio::test]
async fn chainhead_unstable_follow() {
let ctx = test_context().await;
let api = ctx.client();
// Check subscription with runtime updates set on false.
let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
// The initialized event should contain the finalized block hash.
let finalized_block_hash = api.rpc().finalized_head().await.unwrap();
assert_eq!(
event,
FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_runtime: None,
})
);
// Expect subscription to produce runtime versions.
let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
// The initialized event should contain the finalized block hash.
let finalized_block_hash = api.rpc().finalized_head().await.unwrap();
let runtime_version = ctx.client().runtime_version();
assert_matches!(
event,
FollowEvent::Initialized(init) => {
assert_eq!(init.finalized_block_hash, finalized_block_hash);
assert_eq!(init.finalized_block_runtime, Some(RuntimeEvent::Valid(RuntimeVersionEvent {
spec: runtime_version,
})));
}
);
}
#[tokio::test]
async fn chainhead_unstable_body() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap().clone();
// Subscribe to fetch the block's body.
let mut sub = api
.rpc()
.chainhead_unstable_body(sub_id, hash)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
// Expected block's extrinsics scale encoded and hex encoded.
let body = api.rpc().block(Some(hash)).await.unwrap().unwrap();
let extrinsics: Vec<Vec<u8>> =
body.block.extrinsics.into_iter().map(|ext| ext.0).collect();
let expected = format!("0x{}", hex::encode(extrinsics.encode()));
assert_matches!(event,
ChainHeadEvent::Done(done) if done.result == expected
);
}
#[tokio::test]
async fn chainhead_unstable_header() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap().clone();
let header = api.rpc().header(Some(hash)).await.unwrap().unwrap();
let expected = format!("0x{}", hex::encode(header.encode()));
let header = api
.rpc()
.chainhead_unstable_header(sub_id, hash)
.await
.unwrap()
.unwrap();
assert_eq!(header, expected);
}
#[tokio::test]
async fn chainhead_unstable_storage() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap().clone();
let alice: AccountId32 = AccountKeyring::Alice.to_account_id().into();
let addr = node_runtime::storage().system().account(alice).to_bytes();
let mut sub = api
.rpc()
.chainhead_unstable_storage(sub_id, hash, &addr, None)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
assert_matches!(event, ChainHeadEvent::<Option<String>>::Done(done) if done.result.is_some());
}
#[tokio::test]
async fn chainhead_unstable_call() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap().clone();
let alice_id = AccountKeyring::Alice.to_account_id();
let mut sub = api
.rpc()
.chainhead_unstable_call(
sub_id,
hash,
"AccountNonceApi_account_nonce".into(),
&alice_id.encode(),
)
.await
.unwrap();
let event = sub.next().await.unwrap().unwrap();
assert_matches!(event, ChainHeadEvent::<String>::Done(_));
}
#[tokio::test]
async fn chainhead_unstable_unpin() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap().clone();
assert!(api
.rpc()
.chainhead_unstable_unpin(sub_id.clone(), hash)
.await
.is_ok());
// The block was already unpinned.
assert!(api
.rpc()
.chainhead_unstable_unpin(sub_id, hash)
.await
.is_err());
}