mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 12:17:58 +00:00
chainHead: Support multiple hashes for chainHead_unpin method (#2295)
This PR adds support for multiple hashes being passed to the `chainHeda_unpin` parameters. The `hash` parameter is renamed to `hash_or_hashes` per https://github.com/paritytech/json-rpc-interface-spec/pull/111. While at it, a new integration test is added to check the unpinning of multiple hashes. The API is checked against a hash or a vector of hashes. cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -21,6 +21,7 @@
|
||||
//! API trait of the chain head.
|
||||
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
|
||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
|
||||
use sp_rpc::list::ListOrValue;
|
||||
|
||||
#[rpc(client, server)]
|
||||
pub trait ChainHeadApi<Hash> {
|
||||
@@ -109,16 +110,22 @@ pub trait ChainHeadApi<Hash> {
|
||||
call_parameters: String,
|
||||
) -> RpcResult<MethodResponse>;
|
||||
|
||||
/// Unpin a block reported by the `follow` method.
|
||||
/// Unpin a block or multiple blocks reported by the `follow` method.
|
||||
///
|
||||
/// Ongoing operations that require the provided block
|
||||
/// will continue normally.
|
||||
///
|
||||
/// When this method returns an error, it is guaranteed that no blocks have been unpinned.
|
||||
///
|
||||
/// # Unstable
|
||||
///
|
||||
/// This method is unstable and subject to change in the future.
|
||||
#[method(name = "chainHead_unstable_unpin", blocking)]
|
||||
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
|
||||
fn chain_head_unstable_unpin(
|
||||
&self,
|
||||
follow_subscription: String,
|
||||
hash_or_hashes: ListOrValue<Hash>,
|
||||
) -> RpcResult<()>;
|
||||
|
||||
/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
|
||||
/// `operationWaitingForContinue` event.
|
||||
|
||||
@@ -48,6 +48,7 @@ use sc_client_api::{
|
||||
use sp_api::CallApiAt;
|
||||
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
|
||||
use sp_core::{traits::CallContext, Bytes};
|
||||
use sp_rpc::list::ListOrValue;
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
use std::{marker::PhantomData, sync::Arc, time::Duration};
|
||||
|
||||
@@ -432,9 +433,16 @@ where
|
||||
fn chain_head_unstable_unpin(
|
||||
&self,
|
||||
follow_subscription: String,
|
||||
hash: Block::Hash,
|
||||
hash_or_hashes: ListOrValue<Block::Hash>,
|
||||
) -> RpcResult<()> {
|
||||
match self.subscriptions.unpin_block(&follow_subscription, hash) {
|
||||
let result = match hash_or_hashes {
|
||||
ListOrValue::Value(hash) =>
|
||||
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
|
||||
ListOrValue::List(hashes) =>
|
||||
self.subscriptions.unpin_blocks(&follow_subscription, hashes),
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(SubscriptionManagementError::SubscriptionAbsent) => {
|
||||
// Invalid invalid subscription ID.
|
||||
|
||||
@@ -750,22 +750,36 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpin_block(
|
||||
pub fn unpin_blocks(
|
||||
&mut self,
|
||||
sub_id: &str,
|
||||
hash: Block::Hash,
|
||||
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
|
||||
) -> Result<(), SubscriptionManagementError> {
|
||||
let Some(sub) = self.subs.get_mut(sub_id) else {
|
||||
return Err(SubscriptionManagementError::SubscriptionAbsent)
|
||||
};
|
||||
|
||||
// Check that unpin was not called before and the block was pinned
|
||||
// for this subscription.
|
||||
if !sub.unregister_block(hash) {
|
||||
return Err(SubscriptionManagementError::BlockHashAbsent)
|
||||
// Ensure that all blocks are part of the subscription before removing individual
|
||||
// blocks.
|
||||
for hash in hashes.clone() {
|
||||
if !sub.contains_block(hash) {
|
||||
return Err(SubscriptionManagementError::BlockHashAbsent);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this needs to be separate from the global mappings to avoid barrow checker
|
||||
// thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from
|
||||
// `self.global_unregister_block`. Although the borrowing is correct, since different
|
||||
// fields of the structure are borrowed, one at a time.
|
||||
for hash in hashes.clone() {
|
||||
sub.unregister_block(hash);
|
||||
}
|
||||
|
||||
// Block have been removed from the subscription. Remove them from the global tracking.
|
||||
for hash in hashes {
|
||||
self.global_unregister_block(hash);
|
||||
}
|
||||
|
||||
self.global_unregister_block(hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1029,11 +1043,11 @@ mod tests {
|
||||
assert_eq!(block.has_runtime(), true);
|
||||
|
||||
let invalid_id = "abc-invalid".to_string();
|
||||
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
|
||||
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
|
||||
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
|
||||
|
||||
// Unpin the block.
|
||||
subs.unpin_block(&id, hash).unwrap();
|
||||
subs.unpin_blocks(&id, vec![hash]).unwrap();
|
||||
let err = subs.lock_block(&id, hash, 1).unwrap_err();
|
||||
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
|
||||
}
|
||||
@@ -1077,13 +1091,13 @@ mod tests {
|
||||
// Ensure the block propagated to the subscription.
|
||||
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
|
||||
|
||||
subs.unpin_block(&id, hash).unwrap();
|
||||
subs.unpin_blocks(&id, vec![hash]).unwrap();
|
||||
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
|
||||
// Cannot unpin a block twice for the same subscription.
|
||||
let err = subs.unpin_block(&id, hash).unwrap_err();
|
||||
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
|
||||
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
|
||||
|
||||
subs.unpin_block(&id_second, hash).unwrap();
|
||||
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
|
||||
// Block unregistered from the memory.
|
||||
assert!(subs.global_blocks.get(&hash).is_none());
|
||||
}
|
||||
|
||||
@@ -94,22 +94,23 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
|
||||
inner.pin_block(sub_id, hash)
|
||||
}
|
||||
|
||||
/// Unpin the block from the subscription.
|
||||
/// Unpin the blocks from the subscription.
|
||||
///
|
||||
/// The last subscription that unpins the block is also unpinning the block
|
||||
/// from the backend.
|
||||
/// Blocks are reference counted and when the last subscription unpins a given block, the block
|
||||
/// is also unpinned from the backend.
|
||||
///
|
||||
/// This method is called only once per subscription.
|
||||
///
|
||||
/// Returns an error if the block is not pinned for the subscription or
|
||||
/// the subscription ID is invalid.
|
||||
pub fn unpin_block(
|
||||
/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
|
||||
/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
|
||||
/// been unpinned.
|
||||
pub fn unpin_blocks(
|
||||
&self,
|
||||
sub_id: &str,
|
||||
hash: Block::Hash,
|
||||
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
|
||||
) -> Result<(), SubscriptionManagementError> {
|
||||
let mut inner = self.inner.write();
|
||||
inner.unpin_block(sub_id, hash)
|
||||
inner.unpin_blocks(sub_id, hashes)
|
||||
}
|
||||
|
||||
/// Ensure the block remains pinned until the return object is dropped.
|
||||
|
||||
@@ -1591,14 +1591,17 @@ async fn follow_with_unpin() {
|
||||
// Unpin an invalid subscription ID must return Ok(()).
|
||||
let invalid_hash = hex_string(&INVALID_HASH);
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash])
|
||||
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Valid subscription with invalid block hash.
|
||||
let invalid_hash = hex_string(&INVALID_HASH);
|
||||
let err = api
|
||||
.call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash])
|
||||
.call::<_, serde_json::Value>(
|
||||
"chainHead_unstable_unpin",
|
||||
rpc_params![&sub_id, &invalid_hash],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err,
|
||||
@@ -1606,7 +1609,10 @@ async fn follow_with_unpin() {
|
||||
);
|
||||
|
||||
// To not exceed the number of pinned blocks, we need to unpin before the next import.
|
||||
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Block tree:
|
||||
// finalized_block -> block -> block2
|
||||
@@ -1645,6 +1651,160 @@ async fn follow_with_unpin() {
|
||||
assert!(sub.next::<FollowEvent<String>>().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn follow_with_multiple_unpin_hashes() {
|
||||
let builder = TestClientBuilder::new();
|
||||
let backend = builder.backend();
|
||||
let mut client = Arc::new(builder.build());
|
||||
|
||||
let api = ChainHead::new(
|
||||
client.clone(),
|
||||
backend,
|
||||
Arc::new(TaskExecutor::default()),
|
||||
CHAIN_GENESIS,
|
||||
ChainHeadConfig {
|
||||
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
|
||||
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
|
||||
subscription_max_ongoing_operations: MAX_OPERATIONS,
|
||||
operation_max_storage_items: MAX_PAGINATION_LIMIT,
|
||||
},
|
||||
)
|
||||
.into_rpc();
|
||||
|
||||
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
|
||||
let sub_id = sub.subscription_id();
|
||||
let sub_id = serde_json::to_string(&sub_id).unwrap();
|
||||
|
||||
// Import 3 blocks.
|
||||
let block_1 = BlockBuilderBuilder::new(&*client)
|
||||
.on_parent_block(client.chain_info().genesis_hash)
|
||||
.with_parent_block_number(0)
|
||||
.build()
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block;
|
||||
let block_1_hash = block_1.header.hash();
|
||||
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
|
||||
|
||||
let block_2 = BlockBuilderBuilder::new(&*client)
|
||||
.on_parent_block(block_1.hash())
|
||||
.with_parent_block_number(1)
|
||||
.build()
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block;
|
||||
let block_2_hash = block_2.header.hash();
|
||||
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
|
||||
|
||||
let block_3 = BlockBuilderBuilder::new(&*client)
|
||||
.on_parent_block(block_2.hash())
|
||||
.with_parent_block_number(2)
|
||||
.build()
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block;
|
||||
let block_3_hash = block_3.header.hash();
|
||||
client.import(BlockOrigin::Own, block_3.clone()).await.unwrap();
|
||||
|
||||
// Ensure the imported block is propagated and pinned for this subscription.
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::Initialized(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::NewBlock(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::BestBlockChanged(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::NewBlock(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::BestBlockChanged(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::NewBlock(_)
|
||||
);
|
||||
assert_matches!(
|
||||
get_next_event::<FollowEvent<String>>(&mut sub).await,
|
||||
FollowEvent::BestBlockChanged(_)
|
||||
);
|
||||
|
||||
// Unpin an invalid subscription ID must return Ok(()).
|
||||
let invalid_hash = hex_string(&INVALID_HASH);
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Valid subscription with invalid block hash.
|
||||
let err = api
|
||||
.call::<_, serde_json::Value>(
|
||||
"chainHead_unstable_unpin",
|
||||
rpc_params![&sub_id, &invalid_hash],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err,
|
||||
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
|
||||
);
|
||||
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// One block hash is invalid. Block 1 is already unpinned.
|
||||
let err = api
|
||||
.call::<_, serde_json::Value>(
|
||||
"chainHead_unstable_unpin",
|
||||
rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err,
|
||||
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
|
||||
);
|
||||
|
||||
// Unpin multiple blocks.
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Check block 2 and 3 are unpinned.
|
||||
let err = api
|
||||
.call::<_, serde_json::Value>(
|
||||
"chainHead_unstable_unpin",
|
||||
rpc_params![&sub_id, &block_2_hash],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err,
|
||||
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
|
||||
);
|
||||
|
||||
let err = api
|
||||
.call::<_, serde_json::Value>(
|
||||
"chainHead_unstable_unpin",
|
||||
rpc_params![&sub_id, &block_3_hash],
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err,
|
||||
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn follow_prune_best_block() {
|
||||
let builder = TestClientBuilder::new();
|
||||
@@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() {
|
||||
let sub_id = sub.subscription_id();
|
||||
let sub_id = serde_json::to_string(&sub_id).unwrap();
|
||||
let hash = format!("{:?}", block_2_hash);
|
||||
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap();
|
||||
let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -2305,7 +2465,10 @@ async fn pin_block_references() {
|
||||
wait_pinned_references(&backend, &hash, 1).await;
|
||||
|
||||
// To not exceed the number of pinned blocks, we need to unpin before the next import.
|
||||
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
|
||||
let _res: () = api
|
||||
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Make sure unpin clears out the reference.
|
||||
let refs = backend.pin_refs(&hash).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user