mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
fix/chain_head: Ensure correct events for finalized branch (#13632)
* chain_head/follow: Ensure correct events for finalized branch Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Reenable tests Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Do some clean ups and add some more docs * Fix gramatic * Update client/rpc-spec-v2/src/chain_head/chain_head_follow.rs Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * rpc/chain_head: Introduce error for absent headers Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Bastian Köcher <info@kchr.de> Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
@@ -40,10 +40,7 @@ use sp_api::CallApiAt;
|
||||
use sp_blockchain::{
|
||||
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
|
||||
};
|
||||
use sp_runtime::{
|
||||
traits::{Block as BlockT, Header as HeaderT, One},
|
||||
Saturating,
|
||||
};
|
||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
/// Generates the events of the `chainHead_follow` method.
|
||||
@@ -116,7 +113,7 @@ struct StartupPoint<Block: BlockT> {
|
||||
/// The head of the finalized chain.
|
||||
pub finalized_hash: Block::Hash,
|
||||
/// Last finalized block number.
|
||||
pub finalized_number: <<Block as BlockT>::Header as HeaderT>::Number,
|
||||
pub finalized_number: NumberFor<Block>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
|
||||
@@ -318,10 +315,7 @@ where
|
||||
}
|
||||
|
||||
// Ensure we are only reporting blocks after the starting point.
|
||||
let Some(block_number) = self.client.number(notification.hash)? else {
|
||||
return Err(SubscriptionManagementError::BlockNumberAbsent)
|
||||
};
|
||||
if block_number < startup_point.finalized_number {
|
||||
if *notification.header.number() < startup_point.finalized_number {
|
||||
return Ok(Default::default())
|
||||
}
|
||||
|
||||
@@ -349,59 +343,48 @@ where
|
||||
return Ok(Default::default())
|
||||
};
|
||||
|
||||
// Find the parent hash.
|
||||
let Some(first_number) = self.client.number(*first_hash)? else {
|
||||
return Err(SubscriptionManagementError::BlockNumberAbsent)
|
||||
};
|
||||
let Some(parent) = self.client.hash(first_number.saturating_sub(One::one()))? else {
|
||||
return Err(SubscriptionManagementError::BlockHashAbsent)
|
||||
// Find the parent header.
|
||||
let Some(first_header) = self.client.header(*first_hash)? else {
|
||||
return Err(SubscriptionManagementError::BlockHeaderAbsent)
|
||||
};
|
||||
|
||||
let last_finalized = finalized_block_hashes
|
||||
.last()
|
||||
.expect("At least one finalized hash inserted; qed");
|
||||
let parents = std::iter::once(&parent).chain(finalized_block_hashes.iter());
|
||||
for (hash, parent) in finalized_block_hashes.iter().zip(parents) {
|
||||
// This block is already reported by the import notification.
|
||||
let parents =
|
||||
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
|
||||
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
|
||||
// Check if the block was already reported and thus, is already pinned.
|
||||
if !self.sub_handle.pin_block(*hash)? {
|
||||
continue
|
||||
}
|
||||
|
||||
// Generate only the `NewBlock` event for this block.
|
||||
if hash != last_finalized {
|
||||
// Generate `NewBlock` events for all blocks beside the last block in the list
|
||||
if i + 1 != finalized_block_hashes.len() {
|
||||
// Generate only the `NewBlock` event for this block.
|
||||
events.extend(self.generate_import_events(*hash, *parent, false));
|
||||
continue
|
||||
}
|
||||
|
||||
match self.best_block_cache {
|
||||
Some(best_block_hash) => {
|
||||
// If the best reported block is a children of the last finalized,
|
||||
// then we had a gap in notification.
|
||||
} else {
|
||||
// If we end up here and the `best_block` is a descendent of the finalized block
|
||||
// (last block in the list), it means that there were skipped notifications.
|
||||
// Otherwise `pin_block` would had returned `true`.
|
||||
//
|
||||
// When the node falls out of sync and then syncs up to the tip of the chain, it can
|
||||
// happen that we skip notifications. Then it is better to terminate the connection
|
||||
// instead of trying to send notifications for all missed blocks.
|
||||
if let Some(best_block_hash) = self.best_block_cache {
|
||||
let ancestor = sp_blockchain::lowest_common_ancestor(
|
||||
&*self.client,
|
||||
*last_finalized,
|
||||
*hash,
|
||||
best_block_hash,
|
||||
)?;
|
||||
|
||||
// A descendent of the finalized block was already reported
|
||||
// before the `NewBlock` event containing the finalized block
|
||||
// is reported.
|
||||
if ancestor.hash == *last_finalized {
|
||||
if ancestor.hash == *hash {
|
||||
return Err(SubscriptionManagementError::Custom(
|
||||
"A descendent of the finalized block was already reported".into(),
|
||||
))
|
||||
}
|
||||
self.best_block_cache = Some(*hash);
|
||||
},
|
||||
// This is the first best block event that we generate.
|
||||
None => {
|
||||
self.best_block_cache = Some(*hash);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// This is the first time we see this block. Generate the `NewBlock` event; if this is
|
||||
// the last block, also generate the `BestBlock` event.
|
||||
events.extend(self.generate_import_events(*hash, *parent, true))
|
||||
// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
|
||||
events.extend(self.generate_import_events(*hash, *parent, true))
|
||||
}
|
||||
}
|
||||
|
||||
Ok(events)
|
||||
@@ -448,17 +431,13 @@ where
|
||||
let last_finalized = notification.hash;
|
||||
|
||||
// Ensure we are only reporting blocks after the starting point.
|
||||
let Some(block_number) = self.client.number(last_finalized)? else {
|
||||
return Err(SubscriptionManagementError::BlockNumberAbsent)
|
||||
};
|
||||
if block_number < startup_point.finalized_number {
|
||||
if *notification.header.number() < startup_point.finalized_number {
|
||||
return Ok(Default::default())
|
||||
}
|
||||
|
||||
// The tree route contains the exclusive path from the last finalized block to the block
|
||||
// reported by the notification. Ensure the finalized block is also reported.
|
||||
let mut finalized_block_hashes =
|
||||
notification.tree_route.iter().cloned().collect::<Vec<_>>();
|
||||
let mut finalized_block_hashes = notification.tree_route.to_vec();
|
||||
finalized_block_hashes.push(last_finalized);
|
||||
|
||||
// If the finalized hashes were not reported yet, generate the `NewBlock` events.
|
||||
@@ -476,9 +455,8 @@ where
|
||||
|
||||
match self.best_block_cache {
|
||||
Some(block_cache) => {
|
||||
// Check if the current best block is also reported as pruned.
|
||||
let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache);
|
||||
if reported_pruned.is_none() {
|
||||
// If the best block wasn't pruned, we are done here.
|
||||
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
|
||||
events.push(finalized_event);
|
||||
return Ok(events)
|
||||
}
|
||||
@@ -499,20 +477,6 @@ where
|
||||
events.push(finalized_event);
|
||||
Ok(events)
|
||||
} else {
|
||||
let ancestor = sp_blockchain::lowest_common_ancestor(
|
||||
&*self.client,
|
||||
last_finalized,
|
||||
best_block_hash,
|
||||
)?;
|
||||
|
||||
// The client's best block must be a descendent of the last finalized block.
|
||||
// In other words, the lowest common ancestor must be the last finalized block.
|
||||
if ancestor.hash != last_finalized {
|
||||
return Err(SubscriptionManagementError::Custom(
|
||||
"The finalized block is not an ancestor of the best block".into(),
|
||||
))
|
||||
}
|
||||
|
||||
// The RPC needs to also submit a new best block changed before the
|
||||
// finalized event.
|
||||
self.best_block_cache = Some(best_block_hash);
|
||||
|
||||
@@ -36,10 +36,8 @@ pub enum SubscriptionManagementError {
|
||||
ExceededLimits,
|
||||
/// Error originated from the blockchain (client or backend).
|
||||
Blockchain(Error),
|
||||
/// The database does not contain a block number.
|
||||
BlockNumberAbsent,
|
||||
/// The database does not contain a block hash.
|
||||
BlockHashAbsent,
|
||||
/// The database does not contain a block header.
|
||||
BlockHeaderAbsent,
|
||||
/// Custom error.
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
@@ -1024,9 +1024,6 @@ async fn follow_prune_best_block() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(disable_flaky)]
|
||||
#[allow(dead_code)]
|
||||
// FIXME: https://github.com/paritytech/substrate/issues/11321
|
||||
async fn follow_forks_pruned_block() {
|
||||
let builder = TestClientBuilder::new();
|
||||
let backend = builder.backend();
|
||||
@@ -1140,9 +1137,6 @@ async fn follow_forks_pruned_block() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(disable_flaky)]
|
||||
#[allow(dead_code)]
|
||||
// FIXME: https://github.com/paritytech/substrate/issues/11321
|
||||
async fn follow_report_multiple_pruned_block() {
|
||||
let builder = TestClientBuilder::new();
|
||||
let backend = builder.backend();
|
||||
|
||||
Reference in New Issue
Block a user