chainHead: Error on duplicate unpin hashes (#3313)

This PR addresses an issue where calling chainHead_unpin with duplicate
hashes could lead to unintended side effects.

This backports:
https://github.com/paritytech/json-rpc-interface-spec/pull/135

While at it, have added a test to check that the global reference count
is decremented only once on unpin.

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Davide Galassi <davxy@datawok.net>
This commit is contained in:
Alexandru Vasile
2024-02-14 16:41:07 +02:00
committed by GitHub
parent 5a50b8b8a3
commit 7ec692d11c
5 changed files with 203 additions and 2 deletions
@@ -424,6 +424,8 @@ where
// Block is not part of the subscription.
Err(ChainHeadRpcError::InvalidBlock)
},
Err(SubscriptionManagementError::DuplicateHashes) =>
Err(ChainHeadRpcError::InvalidDuplicateHashes),
Err(_) => Err(ChainHeadRpcError::InvalidBlock),
}
}
@@ -32,6 +32,9 @@ pub enum Error {
/// Wait-for-continue event not generated.
#[error("Wait for continue event was not generated for the subscription")]
InvalidContinue,
/// Received duplicate hashes for the `chainHead_unpin` method.
#[error("Received duplicate hashes for the `chainHead_unpin` method")]
InvalidDuplicateHashes,
/// Invalid parameter provided to the RPC method.
#[error("Invalid parameter: {0}")]
InvalidParam(String),
@@ -49,6 +52,8 @@ pub mod rpc_spec_v2 {
pub const INVALID_RUNTIME_CALL: i32 = -32802;
/// Wait-for-continue event not generated.
pub const INVALID_CONTINUE: i32 = -32803;
/// Received duplicate hashes for the `chainHead_unpin` method.
pub const INVALID_DUPLICATE_HASHES: i32 = -32804;
}
/// General purpose errors, as defined in
@@ -71,6 +76,8 @@ impl From<Error> for ErrorObject<'static> {
ErrorObject::owned(rpc_spec_v2::INVALID_RUNTIME_CALL, msg, None::<()>),
Error::InvalidContinue =>
ErrorObject::owned(rpc_spec_v2::INVALID_CONTINUE, msg, None::<()>),
Error::InvalidDuplicateHashes =>
ErrorObject::owned(rpc_spec_v2::INVALID_DUPLICATE_HASHES, msg, None::<()>),
Error::InvalidParam(_) =>
ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>),
Error::InternalError(_) =>
@@ -38,6 +38,9 @@ pub enum SubscriptionManagementError {
/// The specified subscription ID is not present.
#[error("Subscription is absent")]
SubscriptionAbsent,
/// The unpin method was called with duplicate hashes.
#[error("Duplicate hashes")]
DuplicateHashes,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
@@ -52,7 +55,8 @@ impl PartialEq for SubscriptionManagementError {
(Self::Blockchain(_), Self::Blockchain(_)) |
(Self::BlockHashAbsent, Self::BlockHashAbsent) |
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true,
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
(Self::DuplicateHashes, Self::DuplicateHashes) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
@@ -22,7 +22,7 @@ use sc_client_api::Backend;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
@@ -750,11 +750,27 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}
/// Ensure the provided hashes are unique.
fn ensure_hash_uniqueness(
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
let mut set = HashSet::new();
hashes.into_iter().try_for_each(|hash| {
if !set.insert(hash) {
Err(SubscriptionManagementError::DuplicateHashes)
} else {
Ok(())
}
})
}
pub fn unpin_blocks(
&mut self,
sub_id: &str,
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
Self::ensure_hash_uniqueness(hashes.clone())?;
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
@@ -985,6 +1001,76 @@ mod tests {
assert!(block_state.is_none());
}
#[test]
fn unpin_duplicate_hashes() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_1)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_2)
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
// Pin all blocks for the first subscription.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
// Pin only block 2 for the second subscription.
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
// Check reference count.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
// Unpin the same block twice.
let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
// Check reference count must be unaltered.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
// Unpin the blocks correctly.
subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
assert_eq!(subs.global_blocks.get(&hash_1), None);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
}
#[test]
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
@@ -1617,6 +1617,108 @@ async fn follow_with_unpin() {
assert!(sub.next::<FollowEvent<String>>().await.is_none());
}
#[tokio::test]
async fn unpin_duplicate_hashes() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: 3,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
},
)
.into_rpc();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_hash = format!("{:?}", block.header.hash());
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
// Try to unpin duplicate hashes.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, vec![&block_hash, &block_hash]],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
);
// Block tree:
// finalized_block -> block -> block2
let block2 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block.hash())
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_hash_2 = format!("{:?}", block2.header.hash());
client.import(BlockOrigin::Own, block2.clone()).await.unwrap();
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
// Try to unpin duplicate hashes.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, vec![&block_hash, &block_hash_2, &block_hash]],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
);
// Can unpin blocks.
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_hash, &block_hash_2]])
.await
.unwrap();
}
#[tokio::test]
async fn follow_with_multiple_unpin_hashes() {
let builder = TestClientBuilder::new();