rpc: Use the blocks pinning API for chainHead methods (#13233)

* rpc/chain_head: Add backend to subscription management

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Pin blocks internally and adjust testing

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client/in_mem: Reference for the number of pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tests: Check in-memory references to pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Remove unused comment

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Place subscription handle under `Arc` and unpin blocks on drop

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tests: Check all pinned blocks are unpinned on drop

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* rpc/tests: Retry fetching the pinned references for CI correctness

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client/service: Use 512 as maximum number of pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Fix merging conflicts

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Adjust subscriptions to use pinning API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head/tests: Test subscription management

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Adjust chain_head follow to the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Adjust chain_head.rs to the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head/tests: Adjust test.rs to the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client/builder: Use new chainHead API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client/in_mem: ChainHead no longer uses `in_mem::children`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* chain_head: Add block state machine

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Address feedback

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Use new_native_or_wasm_executor

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Remove 'static on Backend

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Add documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Lock blocks before async blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head_follower: Remove static on backend

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/service/src/builder.rs

Co-authored-by: Davide Galassi <davxy@datawok.net>

* Update client/service/src/builder.rs

Co-authored-by: Davide Galassi <davxy@datawok.net>

* chain_head: Add BlockHeaderAbsent to the PartialEq impl

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client: Add better documentation around pinning constants

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Move subscription to dedicated module

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subscription: Rename global pin / unpin functions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: parity-processbot <>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
Co-authored-by: Davide Galassi <davxy@datawok.net>
This commit is contained in:
Alexandru Vasile
2023-05-02 18:00:34 +03:00
committed by GitHub
parent bedb497d55
commit 0e28645153
11 changed files with 1428 additions and 364 deletions
+1
View File
@@ -9441,6 +9441,7 @@ dependencies = [
"sc-block-builder",
"sc-chain-spec",
"sc-client-api",
"sc-service",
"sc-transaction-pool-api",
"sc-utils",
"serde",
+23 -2
View File
@@ -618,6 +618,7 @@ where
states: RwLock<HashMap<Block::Hash, InMemoryBackend<HashFor<Block>>>>,
blockchain: Blockchain<Block>,
import_lock: RwLock<()>,
pinned_blocks: RwLock<HashMap<Block::Hash, i64>>,
}
impl<Block: BlockT> Backend<Block>
@@ -625,13 +626,28 @@ where
Block::Hash: Ord,
{
/// Create a new instance of in-mem backend.
///
/// # Warning
///
/// For testing purposes only!
pub fn new() -> Self {
Backend {
states: RwLock::new(HashMap::new()),
blockchain: Blockchain::new(),
import_lock: Default::default(),
pinned_blocks: Default::default(),
}
}
/// Return the number of references active for a pinned block.
///
/// # Warning
///
/// For testing purposes only!
pub fn pin_refs(&self, hash: &<Block as BlockT>::Hash) -> Option<i64> {
let blocks = self.pinned_blocks.read();
blocks.get(hash).map(|value| *value)
}
}
impl<Block: BlockT> backend::AuxStore for Backend<Block>
@@ -781,11 +797,16 @@ where
false
}
fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
let mut blocks = self.pinned_blocks.write();
*blocks.entry(hash).or_default() += 1;
Ok(())
}
fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
let mut blocks = self.pinned_blocks.write();
blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1);
}
}
impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
+1
View File
@@ -43,5 +43,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
assert_matches = "1.3.0"
@@ -24,7 +24,7 @@ use crate::{
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
subscription::SubscriptionManagement,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
};
@@ -44,12 +44,12 @@ use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
/// An API for chain head RPC calls.
pub struct ChainHead<BE, Block: BlockT, Client> {
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Backend of the chain.
@@ -57,16 +57,14 @@ pub struct ChainHead<BE, Block: BlockT, Client> {
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// Keep track of the pinned blocks for each subscription.
subscriptions: Arc<SubscriptionManagement<Block>>,
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of pinned blocks allowed per connection.
max_pinned_blocks: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
/// Create a new [`ChainHead`].
pub fn new<GenesisHash: AsRef<[u8]>>(
client: Arc<Client>,
@@ -74,16 +72,20 @@ impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
Self {
client,
backend,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new()),
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
backend,
)),
genesis_hash,
max_pinned_blocks,
_phantom: PhantomData,
}
}
@@ -159,9 +161,8 @@ where
return Err(err)
},
};
// Keep track of the subscription.
let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else {
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
@@ -177,7 +178,7 @@ where
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
sub_handle,
subscriptions.clone(),
runtime_updates,
sub_id.clone(),
);
@@ -202,19 +203,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
let _block_guard = block_guard;
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
@@ -226,10 +236,10 @@ where
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
follow_subscription,
&follow_subscription,
hash
);
handle.stop();
subscriptions.remove_subscription(&follow_subscription);
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
@@ -246,16 +256,19 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(None)
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
}
self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
@@ -286,19 +299,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
let _block_guard = block_guard;
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
@@ -367,21 +389,29 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
// Reject subscription if runtime_updates is false.
if !handle.has_runtime_updates() {
if !block_guard.has_runtime_updates() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
@@ -417,15 +447,17 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<()> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(())
};
if !handle.unpin_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
match self.subscriptions.unpin_block(&follow_subscription, hash) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}
Ok(())
}
}
@@ -24,7 +24,7 @@ use crate::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionHandle, SubscriptionManagementError},
subscription::{SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
@@ -44,13 +44,13 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::HashSet, sync::Arc};
/// Generates the events of the `chainHead_follow` method.
pub struct ChainHeadFollower<BE, Block: BlockT, Client> {
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Subscription handle.
sub_handle: SubscriptionHandle<Block>,
/// Subscriptions handle.
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
/// Subscription was started with the runtime updates flag.
runtime_updates: bool,
/// Subscription ID.
@@ -59,12 +59,12 @@ pub struct ChainHeadFollower<BE, Block: BlockT, Client> {
best_block_cache: Option<Block::Hash>,
}
impl<BE, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
/// Create a new [`ChainHeadFollower`].
pub fn new(
client: Arc<Client>,
backend: Arc<BE>,
sub_handle: SubscriptionHandle<Block>,
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
runtime_updates: bool,
sub_id: String,
) -> Self {
@@ -221,7 +221,7 @@ where
// The initialized event is the first one sent.
let finalized_block_hash = startup_point.finalized_hash;
self.sub_handle.pin_block(finalized_block_hash)?;
self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
@@ -235,7 +235,7 @@ where
finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
self.sub_handle.pin_block(child)?;
self.sub_handle.pin_block(&self.sub_id, child)?;
let new_runtime = self.generate_runtime_event(child, Some(parent));
@@ -310,7 +310,7 @@ where
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
// The block was already pinned by the initial block events or by the finalized event.
if !self.sub_handle.pin_block(notification.hash)? {
if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
return Ok(Default::default())
}
@@ -352,7 +352,7 @@ where
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
// Check if the block was already reported and thus, is already pinned.
if !self.sub_handle.pin_block(*hash)? {
if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
continue
}
@@ -564,6 +564,10 @@ where
stream_item = stream.next();
stop_event = next_stop_event;
}
// If we got here either the substrate streams have closed
// or the `Stop` receiver was triggered.
let _ = sink.send(&FollowEvent::<String>::Stop);
}
/// Generate the block events for the `chainHead_follow` method.
@@ -1,285 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Subscription management for tracking subscription IDs to pinned blocks.
use futures::channel::oneshot;
use parking_lot::RwLock;
use sp_blockchain::Error;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
};
/// Subscription management error.
#[derive(Debug)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
ExceededLimits,
/// Error originated from the blockchain (client or backend).
Blockchain(Error),
/// The database does not contain a block header.
BlockHeaderAbsent,
/// Custom error.
Custom(String),
}
impl From<Error> for SubscriptionManagementError {
fn from(err: Error) -> Self {
SubscriptionManagementError::Blockchain(err)
}
}
/// Inner subscription data structure.
struct SubscriptionInner<Block: BlockT> {
/// The `runtime_updates` parameter flag of the subscription.
runtime_updates: bool,
/// Signals the "Stop" event.
tx_stop: Option<oneshot::Sender<()>>,
/// The blocks pinned.
blocks: HashSet<Block::Hash>,
/// The maximum number of pinned blocks allowed per subscription.
max_pinned_blocks: usize,
}
/// Manage the blocks of a specific subscription ID.
#[derive(Clone)]
pub struct SubscriptionHandle<Block: BlockT> {
inner: Arc<RwLock<SubscriptionInner<Block>>>,
}
impl<Block: BlockT> SubscriptionHandle<Block> {
/// Construct a new [`SubscriptionHandle`].
fn new(runtime_updates: bool, tx_stop: oneshot::Sender<()>, max_pinned_blocks: usize) -> Self {
SubscriptionHandle {
inner: Arc::new(RwLock::new(SubscriptionInner {
runtime_updates,
tx_stop: Some(tx_stop),
blocks: HashSet::new(),
max_pinned_blocks,
})),
}
}
/// Trigger the stop event for the current subscription.
///
/// This can happen on internal failure (ie, the pruning deleted the block from memory)
/// or if the user exceeded the amount of available pinned blocks.
pub fn stop(&self) {
let mut inner = self.inner.write();
if let Some(tx_stop) = inner.tx_stop.take() {
let _ = tx_stop.send(());
}
}
/// Pin a new block for the current subscription ID.
///
/// Returns whether the value was newly inserted if the block can be pinned.
/// Otherwise, returns an error if the maximum number of blocks has been exceeded.
pub fn pin_block(&self, hash: Block::Hash) -> Result<bool, SubscriptionManagementError> {
let mut inner = self.inner.write();
if inner.blocks.len() == inner.max_pinned_blocks {
// We have reached the limit. However, the block can be already inserted.
if inner.blocks.contains(&hash) {
return Ok(false)
} else {
return Err(SubscriptionManagementError::ExceededLimits)
}
}
Ok(inner.blocks.insert(hash))
}
/// Unpin a new block for the current subscription ID.
///
/// Returns whether the value was present in the set.
pub fn unpin_block(&self, hash: &Block::Hash) -> bool {
let mut inner = self.inner.write();
inner.blocks.remove(hash)
}
/// Check if the block hash is present for the provided subscription ID.
///
/// Returns `true` if the set contains the block.
pub fn contains_block(&self, hash: &Block::Hash) -> bool {
let inner = self.inner.read();
inner.blocks.contains(hash)
}
/// Get the `runtime_updates` flag of this subscription.
pub fn has_runtime_updates(&self) -> bool {
let inner = self.inner.read();
inner.runtime_updates
}
}
/// Manage block pinning / unpinning for subscription IDs.
pub struct SubscriptionManagement<Block: BlockT> {
/// Manage subscription by mapping the subscription ID
/// to a set of block hashes.
inner: RwLock<HashMap<String, SubscriptionHandle<Block>>>,
}
impl<Block: BlockT> SubscriptionManagement<Block> {
/// Construct a new [`SubscriptionManagement`].
pub fn new() -> Self {
SubscriptionManagement { inner: RwLock::new(HashMap::new()) }
}
/// Insert a new subscription ID.
///
/// If the subscription was not previously inserted, the method returns a tuple of
/// the receiver that is triggered upon the "Stop" event and the subscription
/// handle. Otherwise, when the subscription ID was already inserted returns none.
pub fn insert_subscription(
&self,
subscription_id: String,
runtime_updates: bool,
max_pinned_blocks: usize,
) -> Option<(oneshot::Receiver<()>, SubscriptionHandle<Block>)> {
let mut subs = self.inner.write();
if let Entry::Vacant(entry) = subs.entry(subscription_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let handle =
SubscriptionHandle::<Block>::new(runtime_updates, tx_stop, max_pinned_blocks);
entry.insert(handle.clone());
Some((rx_stop, handle))
} else {
None
}
}
/// Remove the subscription ID with associated pinned blocks.
pub fn remove_subscription(&self, subscription_id: &String) {
let mut subs = self.inner.write();
subs.remove(subscription_id);
}
/// Obtain the specific subscription handle.
pub fn get_subscription(&self, subscription_id: &String) -> Option<SubscriptionHandle<Block>> {
let subs = self.inner.write();
subs.get(subscription_id).and_then(|handle| Some(handle.clone()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256;
use substrate_test_runtime_client::runtime::Block;
#[test]
fn subscription_check_id() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
let handle = subs.get_subscription(&id);
assert!(handle.is_none());
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.contains_block(&hash));
subs.remove_subscription(&id);
let handle = subs.get_subscription(&id);
assert!(handle.is_none());
}
#[test]
fn subscription_check_block() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
// Check with subscription.
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.contains_block(&hash));
assert!(!handle.unpin_block(&hash));
handle.pin_block(hash).unwrap();
assert!(handle.contains_block(&hash));
// Unpin an invalid block.
assert!(!handle.unpin_block(&H256::random()));
// Unpin the valid block.
assert!(handle.unpin_block(&hash));
assert!(!handle.contains_block(&hash));
}
#[test]
fn subscription_check_stop_event() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
// Check with subscription.
let (mut rx_stop, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
// Check the stop signal was not received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_none());
// Inserting a second time returns None.
let res = subs.insert_subscription(id.clone(), false, 10);
assert!(res.is_none());
handle.stop();
// Check the signal was received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
#[test]
fn subscription_check_data() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.has_runtime_updates());
let id2 = "abcd".to_string();
let (_, handle) = subs.insert_subscription(id2.clone(), true, 10).unwrap();
assert!(handle.has_runtime_updates());
}
#[test]
fn subscription_check_max_pinned() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
let hash_2 = H256::random();
let (_, handle) = subs.insert_subscription(id.clone(), false, 1).unwrap();
handle.pin_block(hash).unwrap();
// The same block can be pinned multiple times.
handle.pin_block(hash).unwrap();
// Exceeded number of pinned blocks.
handle.pin_block(hash_2).unwrap_err();
}
}
@@ -0,0 +1,66 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sp_blockchain::Error;
/// Subscription management error.
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
#[error("Exceeded pinning limits")]
ExceededLimits,
/// Error originated from the blockchain (client or backend).
#[error("Blockchain error {0}")]
Blockchain(Error),
/// The database does not contain a block hash.
#[error("Block hash is absent")]
BlockHashAbsent,
/// The database does not contain a block header.
#[error("Block header is absent")]
BlockHeaderAbsent,
/// The specified subscription ID is not present.
#[error("Subscription is absent")]
SubscriptionAbsent,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
}
// Blockchain error does not implement `PartialEq` needed for testing.
impl PartialEq for SubscriptionManagementError {
fn eq(&self, other: &SubscriptionManagementError) -> bool {
match (self, other) {
(Self::ExceededLimits, Self::ExceededLimits) |
// Not needed for testing.
(Self::Blockchain(_), Self::Blockchain(_)) |
(Self::BlockHashAbsent, Self::BlockHashAbsent) |
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
}
}
impl From<Error> for SubscriptionManagementError {
fn from(err: Error) -> Self {
SubscriptionManagementError::Blockchain(err)
}
}
@@ -0,0 +1,940 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::channel::oneshot;
use sc_client_api::Backend;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
time::{Duration, Instant},
};
use crate::chain_head::subscription::SubscriptionManagementError;
/// The state machine of a block of a single subscription ID.
///
/// # Motivation
///
/// Each block is registered twice: once from the `BestBlock` event
/// and once from the `Finalized` event.
///
/// The state of a block must be tracked until both events register the
/// block and the user calls `unpin`.
///
/// Otherwise, the following race might happen:
/// T0. BestBlock event: hash is tracked and pinned in backend.
/// T1. User calls unpin: hash is untracked and unpinned in backend.
/// T2. Finalized event: hash is tracked (no previous history) and pinned again.
///
/// # State Machine Transition
///
/// ```ignore
/// (register)
/// [ REGISTERED ] ---------------> [ FULLY REGISTERED ]
/// | |
/// | (unpin) | (unpin)
/// | |
/// V (register) V
/// [ UNPINNED ] -----------------> [ FULLY UNPINNED ]
/// ```
#[derive(Debug, Clone, PartialEq)]
enum BlockStateMachine {
/// The block was registered by one event (either `Finalized` or `BestBlock` event).
///
/// Unpin was not called.
Registered,
/// The block was registered by both events (`Finalized` and `BestBlock` events).
///
/// Unpin was not called.
FullyRegistered,
/// The block was registered by one event (either `Finalized` or `BestBlock` event),
///
/// Unpin __was__ called.
Unpinned,
/// The block was registered by both events (`Finalized` and `BestBlock` events).
///
/// Unpin __was__ called.
FullyUnpinned,
}
impl BlockStateMachine {
fn new() -> Self {
BlockStateMachine::Registered
}
fn advance_register(&mut self) {
match self {
BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
_ => (),
}
}
fn advance_unpin(&mut self) {
match self {
BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
_ => (),
}
}
fn was_unpinned(&self) -> bool {
match self {
BlockStateMachine::Unpinned => true,
BlockStateMachine::FullyUnpinned => true,
_ => false,
}
}
}
struct BlockState {
/// The state machine of this block.
state_machine: BlockStateMachine,
/// The timestamp when the block was inserted.
timestamp: Instant,
}
/// The state of a single subscription ID.
struct SubscriptionState<Block: BlockT> {
/// The `runtime_updates` parameter flag of the subscription.
runtime_updates: bool,
/// Signals the "Stop" event.
tx_stop: Option<oneshot::Sender<()>>,
/// Track the block hashes available for this subscription.
///
/// This implementation assumes:
/// - most of the time subscriptions keep a few blocks of the chain's head pinned
/// - iteration through the blocks happens only when the hard limit is exceeded.
///
/// Considering the assumption, iterating (in the unlike case) the hashmap O(N) is
/// more time efficient and code friendly than paying for:
/// - extra space: an extra BTreeMap<Instant, Hash> to older hashes by oldest insertion
/// - extra time: O(log(N)) for insert/remove/find each `pin` block time per subscriptions
blocks: HashMap<Block::Hash, BlockState>,
}
impl<Block: BlockT> SubscriptionState<Block> {
/// Trigger the stop event for the current subscription.
///
/// This can happen on internal failure (ie, the pruning deleted the block from memory)
/// or if the subscription exceeded the available pinned blocks.
fn stop(&mut self) {
if let Some(tx_stop) = self.tx_stop.take() {
let _ = tx_stop.send(());
}
}
/// Keep track of the given block hash for this subscription.
///
/// This does not handle pinning in the backend.
///
/// Returns:
/// - true if this is the first time that the block is registered
/// - false if the block was already registered
fn register_block(&mut self, hash: Block::Hash) -> bool {
match self.blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
let block_state = occupied.get_mut();
block_state.state_machine.advance_register();
// Block was registered twice and unpin was called.
if block_state.state_machine == BlockStateMachine::FullyUnpinned {
occupied.remove();
}
// Second time we register this block.
false
},
Entry::Vacant(vacant) => {
vacant.insert(BlockState {
state_machine: BlockStateMachine::new(),
timestamp: Instant::now(),
});
// First time we register this block.
true
},
}
}
/// A block is unregistered when the user calls `unpin`.
///
/// Returns:
/// - true if the block can be unpinned.
/// - false if the subscription does not contain the block or it was unpinned.
fn unregister_block(&mut self, hash: Block::Hash) -> bool {
match self.blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
let block_state = occupied.get_mut();
// Cannot unpin a block twice.
if block_state.state_machine.was_unpinned() {
return false
}
block_state.state_machine.advance_unpin();
// Block was registered twice and unpin was called.
if block_state.state_machine == BlockStateMachine::FullyUnpinned {
occupied.remove();
}
true
},
// Block was not tracked.
Entry::Vacant(_) => false,
}
}
/// A subscription contains a block when the block was
/// registered (`pin` was called) and the block was not `unpinned` yet.
///
/// Returns `true` if the subscription contains the block.
fn contains_block(&self, hash: Block::Hash) -> bool {
let Some(state) = self.blocks.get(&hash) else {
// Block was not tracked.
return false
};
// Subscription no longer contains the block if `unpin` was called.
!state.state_machine.was_unpinned()
}
/// Get the timestamp of the oldest inserted block.
///
/// # Note
///
/// This iterates over all the blocks of the subscription.
fn find_oldest_block_timestamp(&self) -> Instant {
let mut timestamp = Instant::now();
for (_, state) in self.blocks.iter() {
timestamp = std::cmp::min(timestamp, state.timestamp);
}
timestamp
}
}
/// Keeps a specific block pinned while the handle is alive.
/// This object ensures that the block is not unpinned while
/// executing an RPC method call.
pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
hash: Block::Hash,
runtime_updates: bool,
backend: Arc<BE>,
}
// Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for
// testing.
impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlockGuard hash {:?} runtime_updates {:?}", self.hash, self.runtime_updates)
}
}
impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
/// Construct a new [`BlockGuard`] .
fn new(
hash: Block::Hash,
runtime_updates: bool,
backend: Arc<BE>,
) -> Result<Self, SubscriptionManagementError> {
backend
.pin_block(hash)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
Ok(Self { hash, runtime_updates, backend })
}
/// The `runtime_updates` flag of the subscription.
pub fn has_runtime_updates(&self) -> bool {
self.runtime_updates
}
}
impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
fn drop(&mut self) {
self.backend.unpin_block(self.hash);
}
}
pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
/// Reference count the block hashes across all subscriptions.
///
/// The pinned blocks cannot exceed the [`Self::global_limit`] limit.
/// When the limit is exceeded subscriptions are stopped via the `Stop` event.
global_blocks: HashMap<Block::Hash, usize>,
/// The maximum number of pinned blocks across all subscriptions.
global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
local_max_pin_duration: Duration,
/// Map the subscription ID to internal details of the subscription.
subs: HashMap<String, SubscriptionState<Block>>,
/// Backend pinning / unpinning blocks.
///
/// The `Arc` is handled one level-above, but substrate exposes the backend as Arc<T>.
backend: Arc<BE>,
}
impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
/// Construct a new [`SubscriptionsInner`] from the specified limits.
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
backend: Arc<BE>,
) -> Self {
SubscriptionsInner {
global_blocks: Default::default(),
global_max_pinned_blocks,
local_max_pin_duration,
subs: Default::default(),
backend,
}
}
/// Insert a new subscription ID.
pub fn insert_subscription(
&mut self,
sub_id: String,
runtime_updates: bool,
) -> Option<oneshot::Receiver<()>> {
if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let state = SubscriptionState::<Block> {
runtime_updates,
tx_stop: Some(tx_stop),
blocks: Default::default(),
};
entry.insert(state);
Some(rx_stop)
} else {
None
}
}
/// Remove the subscription ID with associated pinned blocks.
pub fn remove_subscription(&mut self, sub_id: &str) {
let Some(mut sub) = self.subs.remove(sub_id) else {
return
};
// The `Stop` event can be generated only once.
sub.stop();
for (hash, state) in sub.blocks.iter() {
if !state.state_machine.was_unpinned() {
self.global_unregister_block(*hash);
}
}
}
/// Ensure that a new block could be pinned.
///
/// If the global number of blocks has been reached this method
/// will remove all subscriptions that have blocks older than the
/// specified pin duration.
///
/// If after removing all subscriptions that exceed the pin duration
/// there is no space for pinning a new block, then all subscriptions
/// are terminated.
///
/// Returns true if the given subscription is also terminated.
fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
if self.global_blocks.len() < self.global_max_pinned_blocks {
return false
}
// Terminate all subscriptions that have blocks older than
// the specified pin duration.
let now = Instant::now();
let to_remove: Vec<_> = self
.subs
.iter_mut()
.filter_map(|(sub_id, sub)| {
let sub_time = sub.find_oldest_block_timestamp();
// Subscriptions older than the specified pin duration should be removed.
let should_remove = match now.checked_duration_since(sub_time) {
Some(duration) => duration > self.local_max_pin_duration,
None => true,
};
should_remove.then(|| sub_id.clone())
})
.collect();
let mut is_terminated = false;
for sub_id in to_remove {
if sub_id == request_sub_id {
is_terminated = true;
}
self.remove_subscription(&sub_id);
}
// Make sure we have enough space after first pass of terminating subscriptions.
if self.global_blocks.len() < self.global_max_pinned_blocks {
return is_terminated
}
// Sanity check: cannot uphold `chainHead` guarantees anymore. We have not
// found any subscriptions that have older pinned blocks to terminate.
let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
for sub_id in to_remove {
if sub_id == request_sub_id {
is_terminated = true;
}
self.remove_subscription(&sub_id);
}
return is_terminated
}
pub fn pin_block(
&mut self,
sub_id: &str,
hash: Block::Hash,
) -> Result<bool, SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
// Block was already registered for this subscription and therefore
// globally tracked.
if !sub.register_block(hash) {
return Ok(false)
}
// Ensure we have enough space only if the hash is not globally registered.
if !self.global_blocks.contains_key(&hash) {
// Subscription ID was terminated while ensuring enough space.
if self.ensure_block_space(sub_id) {
return Err(SubscriptionManagementError::ExceededLimits)
}
}
self.global_register_block(hash)?;
Ok(true)
}
/// Register the block internally.
///
/// If the block is present the reference counter is increased.
/// If this is a new block, the block is pinned in the backend.
fn global_register_block(
&mut self,
hash: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
match self.global_blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
*occupied.get_mut() += 1;
},
Entry::Vacant(vacant) => {
self.backend
.pin_block(hash)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
vacant.insert(1);
},
};
Ok(())
}
/// Unregister the block internally.
///
/// If the block is present the reference counter is decreased.
/// If this is the last reference of the block, the block
/// is unpinned from the backend and removed from internal tracking.
fn global_unregister_block(&mut self, hash: Block::Hash) {
if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
let counter = occupied.get_mut();
if *counter == 1 {
// Unpin the block from the backend.
self.backend.unpin_block(hash);
occupied.remove();
} else {
*counter -= 1;
}
}
}
pub fn unpin_block(
&mut self,
sub_id: &str,
hash: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
// Check that unpin was not called before and the block was pinned
// for this subscription.
if !sub.unregister_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
self.global_unregister_block(hash);
Ok(())
}
pub fn lock_block(
&mut self,
sub_id: &str,
hash: Block::Hash,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let Some(sub) = self.subs.get(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
if !sub.contains_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
BlockGuard::new(hash, sub.runtime_updates, self.backend.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use sc_block_builder::BlockBuilderProvider;
use sc_service::client::new_in_mem;
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, H256};
use substrate_test_runtime_client::{
prelude::*,
runtime::{Block, RuntimeApi},
Client, ClientBlockImportExt, GenesisInit,
};
fn init_backend() -> (
Arc<sc_client_api::in_mem::Backend<Block>>,
Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
) {
let backend = Arc::new(sc_client_api::in_mem::Backend::new());
let executor = substrate_test_runtime_client::new_native_or_wasm_executor();
let client_config = sc_service::ClientConfig::default();
let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
!client_config.no_genesis,
backend.clone(),
executor.clone(),
)
.unwrap();
let client = Arc::new(
new_in_mem::<_, Block, _, RuntimeApi>(
backend.clone(),
executor,
genesis_block_builder,
None,
None,
None,
Box::new(TaskExecutor::new()),
client_config,
)
.unwrap(),
);
(backend, client)
}
#[test]
fn block_state_machine_register_unpin() {
let mut state = BlockStateMachine::new();
// Starts in `Registered` state.
assert_eq!(state, BlockStateMachine::Registered);
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyRegistered);
// Can call register multiple times.
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyRegistered);
assert!(!state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
// Can call unpin multiple times.
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
// Nothing to advance.
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
}
#[test]
fn block_state_machine_unpin_register() {
let mut state = BlockStateMachine::new();
// Starts in `Registered` state.
assert_eq!(state, BlockStateMachine::Registered);
assert!(!state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::Unpinned);
assert!(state.was_unpinned());
// Can call unpin multiple times.
state.advance_unpin();
assert_eq!(state, BlockStateMachine::Unpinned);
assert!(state.was_unpinned());
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
// Nothing to advance.
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
// Nothing to unpin.
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
}
#[test]
fn sub_state_register_twice() {
let mut sub_state = SubscriptionState::<Block> {
runtime_updates: false,
tx_stop: None,
blocks: Default::default(),
};
let hash = H256::random();
assert_eq!(sub_state.register_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
// Did not call `register_block` twice.
assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
assert_eq!(sub_state.register_block(hash), false);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
// Block is no longer tracked when: `register_block` is called twice and
// `unregister_block` is called once.
assert_eq!(sub_state.unregister_block(hash), true);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
}
#[test]
fn sub_state_register_unregister() {
let mut sub_state = SubscriptionState::<Block> {
runtime_updates: false,
tx_stop: None,
blocks: Default::default(),
};
let hash = H256::random();
// Block was not registered before.
assert_eq!(sub_state.unregister_block(hash), false);
assert_eq!(sub_state.register_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
// Did not call `register_block` twice.
assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
// Unregister block before the second `register_block`.
assert_eq!(sub_state.unregister_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
assert_eq!(sub_state.register_block(hash), false);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
// Block is no longer tracked when: `register_block` is called twice and
// `unregister_block` is called once.
assert_eq!(sub_state.unregister_block(hash), false);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
}
#[test]
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let id = "abc".to_string();
let hash = H256::random();
// Subscription not inserted.
let err = subs.lock_block(&id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
// Cannot insert the same subscription ID twice.
assert!(subs.insert_subscription(id.clone(), true).is_none());
// No block hash.
let err = subs.lock_block(&id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
subs.remove_subscription(&id);
// No subscription.
let err = subs.lock_block(&id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
}
#[test]
fn subscription_check_block() {
let (backend, mut client) = init_backend();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
// First time we are pinning the block.
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
let block = subs.lock_block(&id, hash).unwrap();
// Subscription started with runtime updates
assert_eq!(block.has_runtime_updates(), true);
let invalid_id = "abc-invalid".to_string();
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
let err = subs.lock_block(&id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
#[test]
fn subscription_ref_count() {
let (backend, mut client) = init_backend();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
// Check the global ref count.
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Ensure the block propagated to the subscription.
subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
// Insert the block for the same subscription again (simulate NewBlock + Finalized pinning)
assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
// Check the global ref count should not get incremented.
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Ensure the hash propagates for the second subscription.
let id_second = "abcd".to_string();
let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
// Check the global ref count.
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
// Ensure the block propagated to the subscription.
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
subs.unpin_block(&id, hash).unwrap();
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Cannot unpin a block twice for the same subscription.
let err = subs.unpin_block(&id, hash).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
subs.unpin_block(&id_second, hash).unwrap();
// Block unregistered from the memory.
assert!(subs.global_blocks.get(&hash).is_none());
}
#[test]
fn subscription_remove_subscription() {
let (backend, mut client) = init_backend();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
// Pin all blocks for the first subscription.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
// Pin only block 2 for the second subscription.
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
// Check reference count.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
subs.remove_subscription(&id_1);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_3).is_none());
subs.remove_subscription(&id_2);
assert!(subs.global_blocks.get(&hash_2).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_limits() {
let (backend, mut client) = init_backend();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
// Maximum number of pinned blocks is 2.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
// Both subscriptions can pin the maximum limit.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
// Check reference count.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
// Block 3 pinning will exceed the limit and both subscriptions
// are terminated because no subscription with older blocks than 10
// seconds are present.
let err = subs.pin_block(&id_1, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let err = subs.lock_block(&id_2, hash_1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_limits_with_duration() {
let (backend, mut client) = init_backend();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
// Maximum pin duration is 5 second, sleep 5 seconds to ensure we clean up
// the first subscription.
std::thread::sleep(std::time::Duration::from_secs(5));
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
// Check reference count.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
// Second subscription has only 1 block pinned. Only the first subscription is terminated.
let err = subs.pin_block(&id_1, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _block_guard = subs.lock_block(&id_2, hash_1).unwrap();
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 1);
// Force second subscription to get terminated.
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
let err = subs.pin_block(&id_2, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_stop_event() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let id = "abc".to_string();
let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap();
// Check the stop signal was not received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_none());
let sub = subs.subs.get_mut(&id).unwrap();
sub.stop();
// Check the signal was received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
}
@@ -0,0 +1,125 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::channel::oneshot;
use parking_lot::RwLock;
use sc_client_api::Backend;
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};
mod error;
mod inner;
pub use error::SubscriptionManagementError;
pub use inner::BlockGuard;
use inner::SubscriptionsInner;
/// Manage block pinning / unpinning for subscription IDs.
pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
/// Manage subscription by mapping the subscription ID
/// to a set of block hashes.
inner: RwLock<SubscriptionsInner<Block, BE>>,
}
impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
/// Construct a new [`SubscriptionManagement`].
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
backend: Arc<BE>,
) -> Self {
SubscriptionManagement {
inner: RwLock::new(SubscriptionsInner::new(
global_max_pinned_blocks,
local_max_pin_duration,
backend,
)),
}
}
/// Insert a new subscription ID.
///
/// If the subscription was not previously inserted, returns the receiver that is
/// triggered upon the "Stop" event. Otherwise, if the subscription ID was already
/// inserted returns none.
pub fn insert_subscription(
&self,
sub_id: String,
runtime_updates: bool,
) -> Option<oneshot::Receiver<()>> {
let mut inner = self.inner.write();
inner.insert_subscription(sub_id, runtime_updates)
}
/// Remove the subscription ID with associated pinned blocks.
pub fn remove_subscription(&self, sub_id: &str) {
let mut inner = self.inner.write();
inner.remove_subscription(sub_id)
}
/// The block is pinned in the backend only once when the block's hash is first encountered.
///
/// Each subscription is expected to call this method twice:
/// - once from the `NewBlock` import
/// - once from the `Finalized` import
///
/// Returns
/// - Ok(true) if the subscription did not previously contain this block
/// - Ok(false) if the subscription already contained this this
/// - Error if the backend failed to pin the block or the subscription ID is invalid
pub fn pin_block(
&self,
sub_id: &str,
hash: Block::Hash,
) -> Result<bool, SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.pin_block(sub_id, hash)
}
/// Unpin the block from the subscription.
///
/// The last subscription that unpins the block is also unpinning the block
/// from the backend.
///
/// This method is called only once per subscription.
///
/// Returns an error if the block is not pinned for the subscription or
/// the subscription ID is invalid.
pub fn unpin_block(
&self,
sub_id: &str,
hash: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.unpin_block(sub_id, hash)
}
/// Ensure the block remains pinned until the return object is dropped.
///
/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner.
/// Returns an error if the block hash is not pinned for the subscription or
/// the subscription ID is invalid.
pub fn lock_block(
&self,
sub_id: &str,
hash: Block::Hash,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.lock_block(sub_id, hash)
}
}
@@ -11,6 +11,8 @@ use jsonrpsee::{
};
use sc_block_builder::BlockBuilderProvider;
use sc_client_api::ChildInfo;
use sc_service::client::new_in_mem;
use sp_api::BlockT;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_core::{
@@ -19,15 +21,17 @@ use sp_core::{
testing::TaskExecutor,
};
use sp_version::RuntimeVersion;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use substrate_test_runtime::Transfer;
use substrate_test_runtime_client::{
prelude::*, runtime, Backend, BlockBuilderExt, Client, ClientBlockImportExt,
prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client,
ClientBlockImportExt, GenesisInit,
};
type Header = substrate_test_runtime_client::runtime::Header;
type Block = substrate_test_runtime_client::runtime::Block;
const MAX_PINNED_BLOCKS: usize = 32;
const MAX_PINNED_SECS: u64 = 60;
const CHAIN_GENESIS: [u8; 32] = [0; 32];
const INVALID_HASH: [u8; 32] = [1; 32];
const KEY: &[u8] = b":mock";
@@ -72,6 +76,7 @@ async fn setup_api() -> (
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -111,6 +116,7 @@ async fn follow_subscription_produces_blocks() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -168,6 +174,7 @@ async fn follow_with_runtime() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -273,6 +280,7 @@ async fn get_genesis() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -457,6 +465,7 @@ async fn call_runtime_without_flag() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -631,6 +640,7 @@ async fn follow_generates_initial_blocks() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -758,6 +768,7 @@ async fn follow_exceeding_pinned_blocks() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -808,6 +819,7 @@ async fn follow_with_unpin() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -888,6 +900,7 @@ async fn follow_prune_best_block() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -1044,6 +1057,7 @@ async fn follow_forks_pruned_block() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -1157,6 +1171,7 @@ async fn follow_report_multiple_pruned_block() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
@@ -1327,6 +1342,137 @@ async fn follow_report_multiple_pruned_block() {
assert_eq!(event, expected);
}
#[tokio::test]
async fn pin_block_references() {
// Manually construct an in-memory backend and client.
let backend = Arc::new(sc_client_api::in_mem::Backend::new());
let executor = substrate_test_runtime_client::new_native_or_wasm_executor();
let client_config = sc_service::ClientConfig::default();
let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
!client_config.no_genesis,
backend.clone(),
executor.clone(),
)
.unwrap();
let mut client = Arc::new(
new_in_mem::<_, Block, _, RuntimeApi>(
backend.clone(),
executor,
genesis_block_builder,
None,
None,
None,
Box::new(TaskExecutor::new()),
client_config,
)
.unwrap(),
);
let api = ChainHead::new(
client.clone(),
backend.clone(),
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
3,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
async fn wait_pinned_references<Block: BlockT>(
backend: &Arc<sc_client_api::in_mem::Backend<Block>>,
hash: &Block::Hash,
target: i64,
) {
// Retry for at most 2 minutes.
let mut retries = 120;
while backend.pin_refs(hash).unwrap() != target {
if retries == 0 {
panic!("Expected target={} pinned references for hash={:?}", target, hash);
}
retries -= 1;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash = block.header.hash();
let block_hash = format!("{:?}", hash);
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
// We need to wait a bit for:
// 1. `NewBlock` and `BestBlockChanged` notifications to propagate to the chainHead
// subscription. (pin_refs == 2)
// 2. The chainHead to call `pin_blocks` only once for the `NewBlock`
// notification (pin_refs == 3)
// 3. Both notifications to go out of scope (pin_refs == 1 (total 3 - dropped 2)).
wait_pinned_references(&backend, &hash, 1).await;
// To not exceed the number of pinned blocks, we need to unpin before the next import.
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
// Make sure unpin clears out the reference.
let refs = backend.pin_refs(&hash).unwrap();
assert_eq!(refs, 0);
// Add another 2 blocks and make sure we drop the subscription with the blocks pinned.
let mut hashes = Vec::new();
for _ in 0..2 {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let hash = block.header.hash();
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
// Ensure the imported block is propagated for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
hashes.push(hash);
}
// Make sure the pin was propagated.
for hash in &hashes {
wait_pinned_references(&backend, hash, 1).await;
}
// Drop the subscription and expect the pinned blocks to be released.
drop(sub);
// The `chainHead` detects the subscription was terminated when it tries
// to send another block.
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
for hash in &hashes {
wait_pinned_references(&backend, &hash, 0).await;
}
}
#[tokio::test]
async fn follow_finalized_before_new_block() {
let builder = TestClientBuilder::new();
@@ -1341,6 +1487,7 @@ async fn follow_finalized_before_new_block() {
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
)
.into_rpc();
+17 -5
View File
@@ -70,7 +70,11 @@ use sp_consensus::block_validation::{
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
use std::{str::FromStr, sync::Arc, time::SystemTime};
use std::{
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
/// Full client type.
pub type TFullClient<TBl, TRtApi, TExec> =
@@ -667,16 +671,24 @@ where
)
.into_rpc();
// Maximum pinned blocks per connection.
// This number is large enough to consider immediate blocks,
// but it will change to facilitate adequate limits for the pinning API.
const MAX_PINNED_BLOCKS: usize = 4096;
// Maximum pinned blocks across all connections.
// This number is large enough to consider immediate blocks.
// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;
// Any block of any subscription should not be pinned more than
// this constant. When a subscription contains a block older than this,
// the subscription becomes subject to termination.
// Note: This should be enough for immediate blocks.
const MAX_PINNED_SECONDS: u64 = 60;
let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
client.clone(),
backend.clone(),
task_executor.clone(),
client.info().genesis_hash,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECONDS),
)
.into_rpc();