chainHead: Limit ongoing operations (#14699)

* chainHead/api: Make storage/body/call pure RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Add mpsc channel between RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Extract mpsc::Sender via BlockGuard

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Generate and provide the method operation ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_body` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_call` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_storage` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Propagate responses of methods to chainHead_follow

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_body` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure unique operation IDs across methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Remove old method events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Add limit helper

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscription: Expose limits to `BlockGuard`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust testing to ongoing operations

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Make limits configurable via `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust testing to `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure operation limits discards items

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Improve documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Rename `OngoingOperations` -> `LimitOperations`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Rename reserve -> reserve_at_most

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Use duration const instead of u64

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscription: Use tokio::sync::Semaphore for limits

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
Alexandru Vasile
2023-08-15 15:17:41 +03:00
committed by GitHub
parent 0146cb2ffe
commit 744b783a7d
8 changed files with 394 additions and 100 deletions
@@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
/// The configuration of [`ChainHead`].
pub struct ChainHeadConfig {
/// The maximum number of pinned blocks across all subscriptions.
pub global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
}
/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;
/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
/// the subscription becomes subject to termination.
/// Note: This should be enough for immediate blocks.
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
/// The maximum number of ongoing operations per subscription.
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;
impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
}
}
}
/// An API for chain head RPC calls.
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
@@ -76,8 +111,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
config: ChainHeadConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
@@ -85,8 +119,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
backend,
)),
genesis_hash,
@@ -197,12 +232,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -252,12 +285,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -306,21 +337,27 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();
// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);
let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
};
@@ -329,7 +366,7 @@ where
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(0),
discarded_items: Some(discarded),
}))
}
@@ -342,9 +379,10 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
@@ -37,7 +37,7 @@ mod chain_head_storage;
mod subscription;
pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use chain_head::{ChainHead, ChainHeadConfig};
pub use event::{
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
@@ -21,10 +21,10 @@ use sp_blockchain::Error;
/// Subscription management error.
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
#[error("Exceeded pinning limits")]
/// The subscription has exceeded the internal limits
/// regarding the number of pinned blocks in memory or
/// the number of ongoing operations.
#[error("Exceeded pinning or operation limits")]
ExceededLimits,
/// Error originated from the blockchain (client or backend).
#[error("Blockchain error {0}")]
@@ -107,6 +107,62 @@ impl BlockStateMachine {
}
}
/// Limit the number of ongoing operations across methods.
struct LimitOperations {
/// Limit the number of ongoing operations for this subscription.
semaphore: Arc<tokio::sync::Semaphore>,
}
impl LimitOperations {
/// Constructs a new [`LimitOperations`].
fn new(max_operations: usize) -> Self {
LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
}
/// Reserves capacity to execute at least one operation and at most the requested items.
///
/// Dropping [`PermitOperations`] without executing an operation will release
/// the reserved capacity.
///
/// Returns nothing if there's no space available, else returns a permit
/// that guarantees that at least one operation can be executed.
fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
if num_ops == 0 {
return None
}
let permits = Arc::clone(&self.semaphore)
.try_acquire_many_owned(num_ops.try_into().ok()?)
.ok()?;
Some(PermitOperations { num_ops, _permit: permits })
}
}
/// Permits a number of operations to be executed.
///
/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used
/// to guarantee the RPC server can execute the number of operations.
///
/// The number of reserved items are given back to the [`LimitOperations`] on drop.
struct PermitOperations {
/// The number of operations permitted (reserved).
num_ops: usize,
/// The permit for these operations.
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl PermitOperations {
/// Returns the number of reserved elements for this permit.
///
/// This can be smaller than the number of items requested via [`LimitOperations::reserve()`].
fn num_reserved(&self) -> usize {
self.num_ops
}
}
struct BlockState {
/// The state machine of this block.
state_machine: BlockStateMachine,
@@ -124,6 +180,8 @@ struct SubscriptionState<Block: BlockT> {
///
/// This object is cloned between methods.
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
/// Limit the number of ongoing operations.
limits: LimitOperations,
/// The next operation ID.
next_operation_id: usize,
/// Track the block hashes available for this subscription.
@@ -244,6 +302,13 @@ impl<Block: BlockT> SubscriptionState<Block> {
self.next_operation_id = self.next_operation_id.wrapping_add(1);
op_id
}
/// Reserves capacity to execute at least one operation and at most the requested items.
///
/// For more details see [`PermitOperations`].
fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
self.limits.reserve_at_most(to_reserve)
}
}
/// Keeps a specific block pinned while the handle is alive.
@@ -254,6 +319,7 @@ pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: String,
permit_operations: PermitOperations,
backend: Arc<BE>,
}
@@ -272,6 +338,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: usize,
permit_operations: PermitOperations,
backend: Arc<BE>,
) -> Result<Self, SubscriptionManagementError> {
backend
@@ -283,6 +350,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
with_runtime,
response_sender,
operation_id: operation_id.to_string(),
permit_operations,
backend,
})
}
@@ -301,6 +369,13 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
pub fn operation_id(&self) -> String {
self.operation_id.clone()
}
/// Returns the number of reserved elements for this permit.
///
/// This can be smaller than the number of items requested.
pub fn num_reserved(&self) -> usize {
self.permit_operations.num_reserved()
}
}
impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
@@ -328,6 +403,8 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
local_max_pin_duration: Duration,
/// The maximum number of ongoing operations per subscription.
max_ongoing_operations: usize,
/// Map the subscription ID to internal details of the subscription.
subs: HashMap<String, SubscriptionState<Block>>,
/// Backend pinning / unpinning blocks.
@@ -341,12 +418,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
backend: Arc<BE>,
) -> Self {
SubscriptionsInner {
global_blocks: Default::default(),
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
subs: Default::default(),
backend,
}
@@ -366,6 +445,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
with_runtime,
tx_stop: Some(tx_stop),
response_sender,
limits: LimitOperations::new(self.max_ongoing_operations),
next_operation_id: 0,
blocks: Default::default(),
};
@@ -541,6 +621,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
&mut self,
sub_id: &str,
hash: Block::Hash,
to_reserve: usize,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
@@ -550,12 +631,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
let Some(permit_operations) = sub.reserve_at_most(to_reserve) else {
// Error when the server cannot execute at least one operation.
return Err(SubscriptionManagementError::ExceededLimits)
};
let operation_id = sub.next_operation_id();
BlockGuard::new(
hash,
sub.with_runtime,
sub.response_sender.clone(),
operation_id,
permit_operations,
self.backend.clone(),
)
}
@@ -574,6 +661,9 @@ mod tests {
Client, ClientBlockImportExt, GenesisInit,
};
/// Maximum number of ongoing operations per subscription ID.
const MAX_OPERATIONS_PER_SUB: usize = 16;
fn init_backend() -> (
Arc<sc_client_api::in_mem::Backend<Block>>,
Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
@@ -669,6 +759,7 @@ mod tests {
tx_stop: None,
response_sender,
next_operation_id: 0,
limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
blocks: Default::default(),
};
@@ -698,6 +789,7 @@ mod tests {
tx_stop: None,
response_sender,
next_operation_id: 0,
limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
blocks: Default::default(),
};
@@ -730,13 +822,14 @@ mod tests {
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let hash = H256::random();
// Subscription not inserted.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -744,13 +837,13 @@ mod tests {
assert!(subs.insert_subscription(id.clone(), true).is_none());
// No block hash.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
subs.remove_subscription(&id);
// No subscription.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
}
@@ -762,7 +855,8 @@ mod tests {
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -770,7 +864,7 @@ mod tests {
// First time we are pinning the block.
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
let block = subs.lock_block(&id, hash).unwrap();
let block = subs.lock_block(&id, hash, 1).unwrap();
// Subscription started with runtime updates
assert_eq!(block.has_runtime(), true);
@@ -780,7 +874,7 @@ mod tests {
// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
@@ -791,7 +885,8 @@ mod tests {
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -839,7 +934,8 @@ mod tests {
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
@@ -884,7 +980,8 @@ mod tests {
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
// Maximum number of pinned blocks is 2.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
@@ -908,10 +1005,10 @@ mod tests {
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let err = subs.lock_block(&id_2, hash_1).unwrap_err();
let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
assert!(subs.global_blocks.get(&hash_1).is_none());
@@ -934,7 +1031,8 @@ mod tests {
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
@@ -958,10 +1056,10 @@ mod tests {
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _block_guard = subs.lock_block(&id_2, hash_1).unwrap();
let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_2).is_none());
@@ -983,7 +1081,8 @@ mod tests {
fn subscription_check_stop_event() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
@@ -1000,4 +1099,30 @@ mod tests {
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
#[test]
fn ongoing_operations() {
// The object can hold at most 2 operations.
let ops = LimitOperations::new(2);
// One operation is reserved.
let permit_one = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_one.num_reserved(), 1);
// Request 2 operations, however there is capacity only for one.
let permit_two = ops.reserve_at_most(2).unwrap();
// Number of reserved permits is smaller than provided.
assert_eq!(permit_two.num_reserved(), 1);
// Try to reserve operations when there's no space.
let permit = ops.reserve_at_most(1);
assert!(permit.is_none());
// Release capacity.
drop(permit_two);
// Can reserve again
let permit_three = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_three.num_reserved(), 1);
}
}
@@ -40,12 +40,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
backend: Arc<BE>,
) -> Self {
SubscriptionManagement {
inner: RwLock::new(SubscriptionsInner::new(
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
backend,
)),
}
@@ -110,15 +112,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
/// Ensure the block remains pinned until the return object is dropped.
///
/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner.
/// Returns an error if the block hash is not pinned for the subscription or
/// the subscription ID is invalid.
/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner
/// and reserves capacity for ogoing operations.
///
/// Returns an error if the block hash is not pinned for the subscription,
/// the subscription ID is invalid or the limit of ongoing operations was exceeded.
pub fn lock_block(
&self,
sub_id: &str,
hash: Block::Hash,
to_reserve: usize,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.lock_block(sub_id, hash)
inner.lock_block(sub_id, hash, to_reserve)
}
}
@@ -36,6 +36,7 @@ type Header = substrate_test_runtime_client::runtime::Header;
type Block = substrate_test_runtime_client::runtime::Block;
const MAX_PINNED_BLOCKS: usize = 32;
const MAX_PINNED_SECS: u64 = 60;
const MAX_OPERATIONS: usize = 16;
const CHAIN_GENESIS: [u8; 32] = [0; 32];
const INVALID_HASH: [u8; 32] = [1; 32];
const KEY: &[u8] = b":mock";
@@ -79,8 +80,11 @@ async fn setup_api() -> (
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -119,8 +123,11 @@ async fn follow_subscription_produces_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -177,8 +184,11 @@ async fn follow_with_runtime() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -285,8 +295,11 @@ async fn get_genesis() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -491,8 +504,11 @@ async fn call_runtime_without_flag() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1117,8 +1133,11 @@ async fn separate_operation_ids_for_subscriptions() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1194,8 +1213,11 @@ async fn follow_generates_initial_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1322,8 +1344,11 @@ async fn follow_exceeding_pinned_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 2,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1373,8 +1398,11 @@ async fn follow_with_unpin() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 2,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1454,8 +1482,11 @@ async fn follow_prune_best_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1611,8 +1642,11 @@ async fn follow_forks_pruned_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1725,8 +1759,11 @@ async fn follow_report_multiple_pruned_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -1930,8 +1967,11 @@ async fn pin_block_references() {
backend.clone(),
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
3,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 3,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -2040,8 +2080,11 @@ async fn follow_finalized_before_new_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();
@@ -2119,3 +2162,100 @@ async fn follow_finalized_before_new_block() {
});
assert_eq!(event, expected);
}
#[tokio::test]
async fn ensure_operation_limits_works() {
let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY);
let builder = TestClientBuilder::new().add_extra_child_storage(
&child_info,
KEY.to_vec(),
CHILD_VALUE.to_vec(),
);
let backend = builder.backend();
let mut client = Arc::new(builder.build());
// Configure the chainHead with maximum 1 ongoing operations.
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: 1,
},
)
.into_rpc();
let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
let block_hash = format!("{:?}", block.header.hash());
let key = hex_string(&KEY);
let items = vec![
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
];
let response: MethodResponse = api
.call("chainHead_unstable_storage", rpc_params![&sub_id, &block_hash, items])
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => {
// Check discarded items.
assert_eq!(started.discarded_items.unwrap(), 3);
started.operation_id
},
MethodResponse::LimitReached => panic!("Expected started response"),
};
// No value associated with the provided key.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);
// The storage is finished and capactiy must be released.
let alice_id = AccountKeyring::Alice.to_account_id();
// Hex encoded scale encoded bytes representing the call parameters.
let call_parameters = hex_string(&alice_id.encode());
let response: MethodResponse = api
.call(
"chainHead_unstable_call",
[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
)
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};
// Response propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
);
}