chainHead: Report unique hashes for pruned blocks (#3667)

This PR ensures that the reported pruned blocks are unique.

While at it, ensure that the best block event is properly generated when
the last best block is a fork that will be pruned in the future.

To achieve this, the chainHead keeps a LRU set of reported pruned blocks
to ensure the following are not reported twice:

```bash
	 finalized -> block 1 -> block 2 -> block 3
	
	                      -> block 2 -> block 4 -> block 5
	
	           -> block 1 -> block 2_f -> block 6 -> block 7 -> block 8
```

When block 7 is finalized the branch [block 2; block 3] is reported as
pruned.
When block 8 is finalized the branch [block 2; block 4; block 5] should
be reported as pruned, however block 2 was already reported as pruned at
the previous step.

This is a side-effect of the pruned blocks being reported at level N -
1. For example, if all pruned forks would be reported with the first
encounter (when block 6 is finalized we know that block 3 and block 5
are stale), we would not need the LRU cache.

cc @paritytech/subxt-team  

Closes https://github.com/paritytech/polkadot-sdk/issues/3658

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
Alexandru Vasile
2024-04-17 18:29:29 +03:00
committed by GitHub
parent ca7c01c8d8
commit bfbf7f5d6f
5 changed files with 272 additions and 51 deletions
@@ -75,7 +75,7 @@ pub struct ChainHeadConfig {
/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;
pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
@@ -19,7 +19,7 @@
//! Implementation of the `chainHead_follow` method.
use crate::chain_head::{
chain_head::LOG_TARGET,
chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
event::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
@@ -37,6 +37,7 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
@@ -68,7 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Subscription ID.
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
current_best_block: Option<Block::Hash>,
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
@@ -90,7 +93,10 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
current_best_block: None,
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
max_lagging_distance,
}
}
@@ -303,18 +309,20 @@ where
/// Generate the initial events reported by the RPC `follow` method.
///
/// Returns the initial events that should be reported directly, together with pruned
/// block hashes that should be ignored by the `Finalized` event.
/// Returns the initial events that should be reported directly.
fn generate_init_events(
&mut self,
startup_point: &StartupPoint<Block>,
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
{
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
// The initialized event is the first one sent.
let initial_blocks = init.finalized_block_descendants;
let finalized_block_hashes = init.finalized_block_hashes;
// These are the pruned blocks that we should not report again.
for pruned in init.pruned_forks {
self.pruned_blocks.insert(pruned, ());
}
let finalized_block_hash = startup_point.finalized_hash;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
@@ -345,11 +353,11 @@ where
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.best_block_cache = Some(best_block_hash);
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
};
Ok((finalized_block_descendants, init.pruned_forks))
Ok(finalized_block_descendants)
}
/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the
@@ -377,19 +385,19 @@ where
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
match self.best_block_cache {
match self.current_best_block {
Some(block_cache) => {
// The RPC layer has not reported this block as best before.
// Note: This handles the race with the finalized branch.
if block_cache != block_hash {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
} else {
vec![new_block]
}
},
None => {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
},
}
@@ -458,7 +466,7 @@ where
// When the node falls out of sync and then syncs up to the tip of the chain, it can
// happen that we skip notifications. Then it is better to terminate the connection
// instead of trying to send notifications for all missed blocks.
if let Some(best_block_hash) = self.best_block_cache {
if let Some(best_block_hash) = self.current_best_block {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
*hash,
@@ -481,13 +489,10 @@ where
}
/// Get all pruned block hashes from the provided stale heads.
///
/// The result does not include hashes from `to_ignore`.
fn get_pruned_hashes(
&self,
&mut self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
to_ignore: &mut HashSet<Block::Hash>,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
@@ -497,11 +502,13 @@ where
// Collect only blocks that are not part of the canonical chain.
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if !to_ignore.remove(&block.hash) {
Some(block.hash)
} else {
None
if self.pruned_blocks.get(&block.hash).is_some() {
// The block was already reported as pruned.
return None
}
self.pruned_blocks.insert(block.hash, ());
Some(block.hash)
}))
}
@@ -515,7 +522,6 @@ where
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
to_ignore: &mut HashSet<Block::Hash>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
@@ -536,25 +542,32 @@ where
// Report all pruned blocks from the notification that are not
// part of the fork we need to ignore.
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized, to_ignore)?;
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;
let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});
match self.best_block_cache {
Some(block_cache) => {
// If the best block wasn't pruned, we are done here.
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
events.push(finalized_event);
return Ok(events)
}
if let Some(current_best_block) = self.current_best_block {
// The best reported block is in the pruned list. Report a new best block.
let is_in_pruned_list =
pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
// The block is not the last finalized block.
//
// It can be either:
// - a descendant of the last finalized block
// - a block on a fork that will be pruned in the future.
//
// In those cases, we emit a new best block.
let is_not_last_finalized = current_best_block != last_finalized;
// The best block is reported as pruned. Therefore, we need to signal a new
// best block event before submitting the finalized event.
if is_in_pruned_list || is_not_last_finalized {
// We need to generate a best block event.
let best_block_hash = self.client.info().best_hash;
if best_block_hash == block_cache {
// Defensive check against state missmatch.
if best_block_hash == current_best_block {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
// step of the finalization and it should be up to date.
@@ -564,23 +577,18 @@ where
"[follow][id={:?}] Client does not contain different best block",
self.sub_id,
);
events.push(finalized_event);
Ok(events)
} else {
// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
self.current_best_block = Some(best_block_hash);
events
.push(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }));
}
},
None => {
events.push(finalized_event);
Ok(events)
},
}
}
events.push(finalized_event);
Ok(events)
}
/// Submit the events from the provided stream to the RPC client
@@ -589,7 +597,6 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
@@ -612,7 +619,7 @@ where
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
@@ -682,7 +689,7 @@ where
.map(|response| NotificationType::MethodResponse(response));
let startup_point = StartupPoint::from(self.client.info());
let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
let initial_events = match self.generate_init_events(&startup_point) {
Ok(blocks) => blocks,
Err(err) => {
debug!(
@@ -702,7 +709,6 @@ where
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);
self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await
self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
}
}
@@ -187,6 +187,62 @@ async fn setup_api() -> (
(client, api, sub, sub_id, block)
}
async fn import_block(
mut client: Arc<Client<Backend>>,
parent_hash: <Block as BlockT>::Hash,
parent_number: u64,
) -> Block {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(parent_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
block
}
async fn import_best_block_with_tx(
mut client: Arc<Client<Backend>>,
parent_hash: <Block as BlockT>::Hash,
parent_number: u64,
tx: Transfer,
) -> Block {
let mut block_builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(parent_number)
.build()
.unwrap();
block_builder.push_transfer(tx).unwrap();
let block = block_builder.build().unwrap().block;
client.import_as_best(BlockOrigin::Own, block.clone()).await.unwrap();
block
}
/// Check the subscription produces a new block and a best block event.
///
/// The macro is used instead of a fn to preserve the lines of code in case of panics.
macro_rules! check_new_and_best_block_events {
($sub:expr, $block_hash:expr, $parent_hash:expr) => {
let event: FollowEvent<String> = get_next_event($sub).await;
let expected = FollowEvent::NewBlock(NewBlock {
block_hash: format!("{:?}", $block_hash),
parent_block_hash: format!("{:?}", $parent_hash),
new_runtime: None,
with_runtime: false,
});
assert_eq!(event, expected);
let event: FollowEvent<String> = get_next_event($sub).await;
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: format!("{:?}", $block_hash),
});
assert_eq!(event, expected);
};
}
#[tokio::test]
async fn follow_subscription_produces_blocks() {
let builder = TestClientBuilder::new();
@@ -3644,3 +3700,160 @@ async fn chain_head_limit_reached() {
// Initialized must always be reported first.
let _event: FollowEvent<String> = get_next_event(&mut sub).await;
}
#[tokio::test]
async fn follow_unique_pruned_blocks() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
max_lagging_distance: MAX_LAGGING_DISTANCE,
},
)
.into_rpc();
let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
assert_eq!(event, expected);
// Block tree:
//
// finalized -> block 1 -> block 2 -> block 3
//
// -> block 2 -> block 4 -> block 5
//
// -> block 1 -> block 2_f -> block 6
// ^^^ finalized
// -> block 7
// ^^^ finalized
// -> block 8
// ^^^ finalized
// The chainHead will see block 5 as the best block. However, the
// client will finalize the block 6, which is on another fork.
//
// When the block 6 is finalized, block 2 block 3 block 4 and block 5 are placed on an invalid
// fork. However, pruning of blocks happens on level N - 1.
// Therefore, no pruned blocks are reported yet.
//
// When the block 7 is finalized, block 3 is detected as stale. At this step, block 2 and 3
// are reported as pruned.
//
// When the block 8 is finalized, block 5 block 4 and block 2 are detected as stale. However,
// only blocks 5 and 4 are reported as pruned. This is because the block 2 was previously
// reported.
// Initial setup steps:
let block_1_hash =
import_block(client.clone(), client.chain_info().genesis_hash, 0).await.hash();
let block_2_f_hash = import_block(client.clone(), block_1_hash, 1).await.hash();
let block_6_hash = import_block(client.clone(), block_2_f_hash, 2).await.hash();
// Import block 2 as best on the fork.
let mut tx_alice_ferdie = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Ferdie.into(),
amount: 41,
nonce: 0,
};
let block_2_hash =
import_best_block_with_tx(client.clone(), block_1_hash, 1, tx_alice_ferdie.clone())
.await
.hash();
let block_3_hash = import_block(client.clone(), block_2_hash, 2).await.hash();
// Fork block 4.
tx_alice_ferdie.nonce = 1;
let block_4_hash = import_best_block_with_tx(client.clone(), block_2_hash, 2, tx_alice_ferdie)
.await
.hash();
let block_5_hash = import_block(client.clone(), block_4_hash, 3).await.hash();
// Check expected events generated by the setup.
{
// Check block 1 -> block 2f -> block 6.
check_new_and_best_block_events!(&mut sub, block_1_hash, finalized_hash);
check_new_and_best_block_events!(&mut sub, block_2_f_hash, block_1_hash);
check_new_and_best_block_events!(&mut sub, block_6_hash, block_2_f_hash);
// Check (block 1 ->) block 2 -> block 3.
check_new_and_best_block_events!(&mut sub, block_2_hash, block_1_hash);
check_new_and_best_block_events!(&mut sub, block_3_hash, block_2_hash);
// Check (block 1 -> block 2 ->) block 4 -> block 5.
check_new_and_best_block_events!(&mut sub, block_4_hash, block_2_hash);
check_new_and_best_block_events!(&mut sub, block_5_hash, block_4_hash);
}
// Finalize the block 6 from the fork.
client.finalize_block(block_6_hash, None).unwrap();
// Expect to report the best block changed before the finalized event.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: format!("{:?}", block_6_hash),
});
assert_eq!(event, expected);
// Block 2 must be reported as pruned, even if it was the previous best.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec![
format!("{:?}", block_1_hash),
format!("{:?}", block_2_f_hash),
format!("{:?}", block_6_hash),
],
pruned_block_hashes: vec![],
});
assert_eq!(event, expected);
// Pruned hash can be unpinned.
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let hash = format!("{:?}", block_2_hash);
let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
// Import block 7 and check it.
let block_7_hash = import_block(client.clone(), block_6_hash, 3).await.hash();
check_new_and_best_block_events!(&mut sub, block_7_hash, block_6_hash);
// Finalize the block 7.
client.finalize_block(block_7_hash, None).unwrap();
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec![format!("{:?}", block_7_hash)],
pruned_block_hashes: vec![format!("{:?}", block_2_hash), format!("{:?}", block_3_hash)],
});
assert_eq!(event, expected);
// Check block 8.
let block_8_hash = import_block(client.clone(), block_7_hash, 4).await.hash();
check_new_and_best_block_events!(&mut sub, block_8_hash, block_7_hash);
// Finalize the block 8.
client.finalize_block(block_8_hash, None).unwrap();
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec![format!("{:?}", block_8_hash)],
pruned_block_hashes: vec![format!("{:?}", block_4_hash), format!("{:?}", block_5_hash)],
});
assert_eq!(event, expected);
}