chainHead: Ensure reasonable distance between leaf and finalized block (#3562)

This PR ensure that the distance between any leaf and the finalized
block is within a reasonable distance.

For a new subscription, the chainHead has to provide all blocks between
the leaves of the chain and the finalized block.
 When the distance between a leaf and the finalized block is large:
 - The tree route is costly to compute
 - We could deliver an unbounded number of blocks (potentially millions)
(For more details see
https://github.com/paritytech/polkadot-sdk/pull/3445#discussion_r1507210283)

The configuration of the ChainHead is extended with:
- suspend on lagging distance: When the distance between any leaf and
the finalized block is greater than this number, the subscriptions are
suspended for a given duration.
- All active subscriptions are terminated with the `Stop` event, all
blocks are unpinned and data discarded.
- For incoming subscriptions, until the suspended period expires the
subscriptions will immediately receive the `Stop` event.
    - Defaults to 128 blocks
- suspended duration: The amount of time for which subscriptions are
suspended
    - Defaults to 30 seconds
 
 
 cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
Alexandru Vasile
2024-04-03 14:46:08 +03:00
committed by GitHub
parent cdacfb9d33
commit 287b116c3e
7 changed files with 312 additions and 163 deletions
@@ -62,6 +62,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
pub max_lagging_distance: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
@@ -88,6 +91,10 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
const MAX_LAGGING_DISTANCE: usize = 128;
/// The maximum number of `chainHead_follow` subscriptions per connection.
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
@@ -97,6 +104,7 @@ impl Default for ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
}
@@ -116,6 +124,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
@@ -140,6 +151,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend,
),
operation_max_storage_items: config.operation_max_storage_items,
max_lagging_distance: config.max_lagging_distance,
_phantom: PhantomData,
}
}
@@ -187,6 +199,7 @@ where
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;
let fut = async move {
// Ensure the current connection ID has enough space to accept a new subscription.
@@ -207,8 +220,8 @@ where
let Some(sub_data) =
reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
@@ -222,9 +235,13 @@ where
subscriptions,
with_runtime,
sub_id.clone(),
max_lagging_distance,
);
chain_head_follow.generate_events(sink, sub_data).await;
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
reserved_subscription.stop_all_subscriptions();
}
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
@@ -41,12 +41,14 @@ use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, Saturating,
};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};
/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;
@@ -67,6 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
@@ -77,8 +82,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle: SubscriptionManagement<Block, BE>,
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
max_lagging_distance,
}
}
}
@@ -186,6 +200,35 @@ where
}
}
/// Check the distance between the provided blocks does not exceed a
/// a reasonable range.
///
/// When the blocks are too far apart (potentially millions of blocks):
/// - Tree route is expensive to calculate.
/// - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(block_num) = self.client.number(block)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let Some(finalized_num) = self.client.number(finalized)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.max_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}
Ok(())
}
/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// The reported blocks are pinned by this function.
@@ -198,6 +241,13 @@ where
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();
// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
}
for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
@@ -542,7 +592,8 @@ where
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
@@ -576,7 +627,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};
@@ -591,7 +642,8 @@ where
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
// No need to propagate this error further, the client disconnected.
return Ok(())
}
}
@@ -605,6 +657,7 @@ where
// - the client disconnected.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Ok(())
}
/// Generate the block events for the `chainHead_follow` method.
@@ -612,7 +665,7 @@ where
&mut self,
sink: SubscriptionSink,
sub_data: InsertedSubscriptionData<Block>,
) {
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
let stream_import = self
.client
@@ -640,7 +693,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};
@@ -650,6 +703,6 @@ where
let stream = stream::once(futures::future::ready(initial)).chain(merged);
self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
.await
}
}
@@ -41,6 +41,9 @@ pub enum SubscriptionManagementError {
/// The unpin method was called with duplicate hashes.
#[error("Duplicate hashes")]
DuplicateHashes,
/// The distance between the leaves and the current finalized block is too large.
#[error("Distance too large")]
BlockDistanceTooLarge,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
@@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError {
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
(Self::DuplicateHashes, Self::DuplicateHashes) => true,
(Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
@@ -560,6 +560,7 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
max_ongoing_operations: usize,
/// Map the subscription ID to internal details of the subscription.
subs: HashMap<String, SubscriptionState<Block>>,
/// Backend pinning / unpinning blocks.
///
/// The `Arc` is handled one level-above, but substrate exposes the backend as Arc<T>.
@@ -623,6 +624,15 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}
/// All active subscriptions are removed.
pub fn stop_all_subscriptions(&mut self) {
let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
for sub_id in to_remove {
self.remove_subscription(&sub_id);
}
}
/// Ensure that a new block could be pinned.
///
/// If the global number of blocks has been reached this method
@@ -878,6 +888,30 @@ mod tests {
(backend, client)
}
fn produce_blocks(
mut client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
num_blocks: usize,
) -> Vec<<Block as BlockT>::Hash> {
let mut blocks = Vec::with_capacity(num_blocks);
let mut parent_hash = client.chain_info().genesis_hash;
for i in 0..num_blocks {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(i as u64)
.build()
.unwrap()
.build()
.unwrap()
.block;
parent_hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
blocks.push(block.header.hash());
}
blocks
}
#[test]
fn block_state_machine_register_unpin() {
let mut state = BlockStateMachine::new();
@@ -1003,37 +1037,10 @@ mod tests {
#[test]
fn unpin_duplicate_hashes() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_1)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_2)
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1102,18 +1109,10 @@ mod tests {
#[test]
fn subscription_check_block() {
let (backend, mut client) = init_backend();
let (backend, client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash = block.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let hashes = produce_blocks(client, 1);
let hash = hashes[0];
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1140,17 +1139,10 @@ mod tests {
#[test]
fn subscription_ref_count() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 1);
let hash = hashes[0];
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1190,37 +1182,10 @@ mod tests {
#[test]
fn subscription_remove_subscription() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_1)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_2)
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1256,37 +1221,10 @@ mod tests {
#[test]
fn subscription_check_limits() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_1 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_1)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_2)
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
// Maximum number of pinned blocks is 2.
let mut subs =
@@ -1328,37 +1266,10 @@ mod tests {
#[test]
fn subscription_check_limits_with_duration() {
let (backend, mut client) = init_backend();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_1 = block.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_1)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_2 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(hash_2)
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
let mut subs =
@@ -1456,6 +1367,39 @@ mod tests {
assert_eq!(permit_three.num_ops, 1);
}
#[test]
fn stop_all_subscriptions() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
// Pin all blocks for the first subscription.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
// Pin only block 2 for the second subscription.
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
// Check reference count.
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
assert_eq!(subs.global_blocks.len(), 3);
// Stop all active subscriptions.
subs.stop_all_subscriptions();
assert!(subs.global_blocks.is_empty());
}
#[test]
fn reserved_subscription_cleans_resources() {
let builder = TestClientBuilder::new();
@@ -233,6 +233,15 @@ impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> {
},
}
}
/// Stop all active subscriptions.
///
/// For all active subscriptions, the internal data is discarded, blocks are unpinned and the
/// `Stop` event will be generated.
pub fn stop_all_subscriptions(&self) {
let mut inner = self.inner.write();
inner.stop_all_subscriptions()
}
}
impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> {
@@ -63,6 +63,7 @@ const MAX_PINNED_BLOCKS: usize = 32;
const MAX_PINNED_SECS: u64 = 60;
const MAX_OPERATIONS: usize = 16;
const MAX_PAGINATION_LIMIT: usize = 5;
const MAX_LAGGING_DISTANCE: usize = 128;
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
const INVALID_HASH: [u8; 32] = [1; 32];
@@ -88,6 +89,7 @@ pub async fn run_server() -> std::net::SocketAddr {
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_follow_subscriptions_per_connection: 1,
max_lagging_distance: MAX_LAGGING_DISTANCE,
},
)
.into_rpc();
@@ -148,6 +150,7 @@ async fn setup_api() -> (
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -199,6 +202,8 @@ async fn follow_subscription_produces_blocks() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -268,6 +273,8 @@ async fn follow_with_runtime() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -581,6 +588,8 @@ async fn call_runtime_without_flag() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1240,6 +1249,8 @@ async fn separate_operation_ids_for_subscriptions() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1329,6 +1340,8 @@ async fn follow_generates_initial_blocks() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1485,6 +1498,8 @@ async fn follow_exceeding_pinned_blocks() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1562,6 +1577,8 @@ async fn follow_with_unpin() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1674,6 +1691,8 @@ async fn unpin_duplicate_hashes() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1777,6 +1796,8 @@ async fn follow_with_multiple_unpin_hashes() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -1931,6 +1952,8 @@ async fn follow_prune_best_block() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2117,6 +2140,8 @@ async fn follow_forks_pruned_block() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2277,6 +2302,8 @@ async fn follow_report_multiple_pruned_block() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2523,6 +2550,8 @@ async fn pin_block_references() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2661,6 +2690,8 @@ async fn follow_finalized_before_new_block() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2776,6 +2807,8 @@ async fn ensure_operation_limits_works() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: 1,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -2881,6 +2914,8 @@ async fn check_continue_operation() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: 1,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -3064,6 +3099,8 @@ async fn stop_storage_operation() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: 1,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
@@ -3351,6 +3388,88 @@ async fn storage_closest_merkle_value() {
);
}
#[tokio::test]
async fn chain_head_stop_all_subscriptions() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());
// Configure the chainHead to stop all subscriptions on lagging distance of 5 blocks.
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: 5,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
.into_rpc();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
// Import 6 blocks in total to trigger the suspension distance.
let mut parent_hash = client.chain_info().genesis_hash;
for i in 0..6 {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(i)
.build()
.unwrap()
.build()
.unwrap()
.block;
let hash = block.hash();
parent_hash = hash;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
}
let mut second_sub =
api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
// Lagging detected, the stop event is delivered immediately.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut second_sub).await,
FollowEvent::Stop
);
// Ensure that all subscriptions are stopped.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
// Other subscriptions cannot be started until the suspension period is over.
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
// Should receive the stop event immediately.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
// For the next subscription, lagging distance must be smaller.
client.finalize_block(parent_hash, None).unwrap();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
}
#[tokio::test]
async fn chain_head_single_connection_context() {
let server_addr = run_server().await;
@@ -3500,12 +3619,14 @@ async fn chain_head_limit_reached() {
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: 1,
},
)
.into_rpc();
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
// Initialized must always be reported first.
let _event: FollowEvent<String> = get_next_event(&mut sub).await;