mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 13:01:01 +00:00
chainHead/follow: Provide multiple block hashes to the initialized event (#3445)
This PR extends the Initialized event of the chainHead_follow subscription. Now, the event provides multiple finalized block hashes. This information allows clients that are disconnected, and that want to reconnect, to not lose information about the state of the chain. At the moment, the spec encourages servers to provide at least 1 minute of finalized blocks (~10 blocks). The users are responsible for unpinning these blocks at a later time. This PR tries to report at least 1 finalized block and at most 16 blocks, if they are available. Closes: https://github.com/paritytech/polkadot-sdk/issues/3432 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
@@ -42,7 +42,14 @@ use sp_blockchain::{
|
|||||||
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
|
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
|
||||||
};
|
};
|
||||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||||
use std::{collections::HashSet, sync::Arc};
|
use std::{
|
||||||
|
collections::{HashSet, VecDeque},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// The maximum number of finalized blocks provided by the
|
||||||
|
/// `Initialized` event.
|
||||||
|
const MAX_FINALIZED_BLOCKS: usize = 16;
|
||||||
|
|
||||||
use super::subscription::InsertedSubscriptionData;
|
use super::subscription::InsertedSubscriptionData;
|
||||||
|
|
||||||
@@ -95,6 +102,8 @@ struct InitialBlocks<Block: BlockT> {
|
|||||||
///
|
///
|
||||||
/// It is a tuple of (block hash, parent hash).
|
/// It is a tuple of (block hash, parent hash).
|
||||||
finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
|
finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
|
||||||
|
/// Hashes of the last finalized blocks
|
||||||
|
finalized_block_hashes: VecDeque<Block::Hash>,
|
||||||
/// Blocks that should not be reported as pruned by the `Finalized` event.
|
/// Blocks that should not be reported as pruned by the `Finalized` event.
|
||||||
///
|
///
|
||||||
/// Substrate database will perform the pruning of height N at
|
/// Substrate database will perform the pruning of height N at
|
||||||
@@ -178,13 +187,14 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the in-memory blocks of the client, starting from the provided finalized hash.
|
/// Get the in-memory blocks of the client, starting from the provided finalized hash.
|
||||||
|
///
|
||||||
|
/// The reported blocks are pinned by this function.
|
||||||
fn get_init_blocks_with_forks(
|
fn get_init_blocks_with_forks(
|
||||||
&self,
|
&self,
|
||||||
startup_point: &StartupPoint<Block>,
|
finalized: Block::Hash,
|
||||||
) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
|
) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
|
||||||
let blockchain = self.backend.blockchain();
|
let blockchain = self.backend.blockchain();
|
||||||
let leaves = blockchain.leaves()?;
|
let leaves = blockchain.leaves()?;
|
||||||
let finalized = startup_point.finalized_hash;
|
|
||||||
let mut pruned_forks = HashSet::new();
|
let mut pruned_forks = HashSet::new();
|
||||||
let mut finalized_block_descendants = Vec::new();
|
let mut finalized_block_descendants = Vec::new();
|
||||||
let mut unique_descendants = HashSet::new();
|
let mut unique_descendants = HashSet::new();
|
||||||
@@ -198,17 +208,47 @@ where
|
|||||||
// Ensure a `NewBlock` event is generated for all children of the
|
// Ensure a `NewBlock` event is generated for all children of the
|
||||||
// finalized block. Describe the tree route as (child_node, parent_node)
|
// finalized block. Describe the tree route as (child_node, parent_node)
|
||||||
// Note: the order of elements matters here.
|
// Note: the order of elements matters here.
|
||||||
let parents = std::iter::once(finalized).chain(blocks.clone());
|
let mut parent = finalized;
|
||||||
|
for child in blocks {
|
||||||
|
let pair = (child, parent);
|
||||||
|
|
||||||
for pair in blocks.zip(parents) {
|
|
||||||
if unique_descendants.insert(pair) {
|
if unique_descendants.insert(pair) {
|
||||||
|
// The finalized block is pinned below.
|
||||||
|
self.sub_handle.pin_block(&self.sub_id, child)?;
|
||||||
finalized_block_descendants.push(pair);
|
finalized_block_descendants.push(pair);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parent = child;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(InitialBlocks { finalized_block_descendants, pruned_forks })
|
let mut current_block = finalized;
|
||||||
|
// The header of the finalized block must not be pruned.
|
||||||
|
let Some(header) = blockchain.header(current_block)? else {
|
||||||
|
return Err(SubscriptionManagementError::BlockHeaderAbsent);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Report at most `MAX_FINALIZED_BLOCKS`. Note: The node might not have that many blocks.
|
||||||
|
let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
|
||||||
|
|
||||||
|
// Pin the finalized block.
|
||||||
|
self.sub_handle.pin_block(&self.sub_id, current_block)?;
|
||||||
|
finalized_block_hashes.push_front(current_block);
|
||||||
|
current_block = *header.parent_hash();
|
||||||
|
|
||||||
|
for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
|
||||||
|
let Ok(Some(header)) = blockchain.header(current_block) else { break };
|
||||||
|
// Block cannot be reported if pinning fails.
|
||||||
|
if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
|
||||||
|
break
|
||||||
|
};
|
||||||
|
|
||||||
|
finalized_block_hashes.push_front(current_block);
|
||||||
|
current_block = *header.parent_hash();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate the initial events reported by the RPC `follow` method.
|
/// Generate the initial events reported by the RPC `follow` method.
|
||||||
@@ -220,18 +260,17 @@ where
|
|||||||
startup_point: &StartupPoint<Block>,
|
startup_point: &StartupPoint<Block>,
|
||||||
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
|
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
|
||||||
{
|
{
|
||||||
let init = self.get_init_blocks_with_forks(startup_point)?;
|
let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
|
||||||
|
|
||||||
let initial_blocks = init.finalized_block_descendants;
|
|
||||||
|
|
||||||
// The initialized event is the first one sent.
|
// The initialized event is the first one sent.
|
||||||
let finalized_block_hash = startup_point.finalized_hash;
|
let initial_blocks = init.finalized_block_descendants;
|
||||||
self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?;
|
let finalized_block_hashes = init.finalized_block_hashes;
|
||||||
|
|
||||||
|
let finalized_block_hash = startup_point.finalized_hash;
|
||||||
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
|
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
|
||||||
|
|
||||||
let initialized_event = FollowEvent::Initialized(Initialized {
|
let initialized_event = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash,
|
finalized_block_hashes: finalized_block_hashes.into(),
|
||||||
finalized_block_runtime,
|
finalized_block_runtime,
|
||||||
with_runtime: self.with_runtime,
|
with_runtime: self.with_runtime,
|
||||||
});
|
});
|
||||||
@@ -240,8 +279,6 @@ where
|
|||||||
|
|
||||||
finalized_block_descendants.push(initialized_event);
|
finalized_block_descendants.push(initialized_event);
|
||||||
for (child, parent) in initial_blocks.into_iter() {
|
for (child, parent) in initial_blocks.into_iter() {
|
||||||
self.sub_handle.pin_block(&self.sub_id, child)?;
|
|
||||||
|
|
||||||
let new_runtime = self.generate_runtime_event(child, Some(parent));
|
let new_runtime = self.generate_runtime_event(child, Some(parent));
|
||||||
|
|
||||||
let event = FollowEvent::NewBlock(NewBlock {
|
let event = FollowEvent::NewBlock(NewBlock {
|
||||||
|
|||||||
@@ -111,8 +111,8 @@ impl From<ApiError> for RuntimeEvent {
|
|||||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Initialized<Hash> {
|
pub struct Initialized<Hash> {
|
||||||
/// The hash of the latest finalized block.
|
/// The hash of the lastest finalized blocks.
|
||||||
pub finalized_block_hash: Hash,
|
pub finalized_block_hashes: Vec<Hash>,
|
||||||
/// The runtime version of the finalized block.
|
/// The runtime version of the finalized block.
|
||||||
///
|
///
|
||||||
/// # Note
|
/// # Note
|
||||||
@@ -135,12 +135,12 @@ impl<Hash: Serialize> Serialize for Initialized<Hash> {
|
|||||||
{
|
{
|
||||||
if self.with_runtime {
|
if self.with_runtime {
|
||||||
let mut state = serializer.serialize_struct("Initialized", 2)?;
|
let mut state = serializer.serialize_struct("Initialized", 2)?;
|
||||||
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
|
state.serialize_field("finalizedBlockHashes", &self.finalized_block_hashes)?;
|
||||||
state.serialize_field("finalizedBlockRuntime", &self.finalized_block_runtime)?;
|
state.serialize_field("finalizedBlockRuntime", &self.finalized_block_runtime)?;
|
||||||
state.end()
|
state.end()
|
||||||
} else {
|
} else {
|
||||||
let mut state = serializer.serialize_struct("Initialized", 1)?;
|
let mut state = serializer.serialize_struct("Initialized", 1)?;
|
||||||
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
|
state.serialize_field("finalizedBlockHashes", &self.finalized_block_hashes)?;
|
||||||
state.end()
|
state.end()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -348,13 +348,13 @@ mod tests {
|
|||||||
fn follow_initialized_event_no_updates() {
|
fn follow_initialized_event_no_updates() {
|
||||||
// Runtime flag is false.
|
// Runtime flag is false.
|
||||||
let event: FollowEvent<String> = FollowEvent::Initialized(Initialized {
|
let event: FollowEvent<String> = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: "0x1".into(),
|
finalized_block_hashes: vec!["0x1".into()],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
let ser = serde_json::to_string(&event).unwrap();
|
let ser = serde_json::to_string(&event).unwrap();
|
||||||
let exp = r#"{"event":"initialized","finalizedBlockHash":"0x1"}"#;
|
let exp = r#"{"event":"initialized","finalizedBlockHashes":["0x1"]}"#;
|
||||||
assert_eq!(ser, exp);
|
assert_eq!(ser, exp);
|
||||||
|
|
||||||
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
|
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
|
||||||
@@ -373,7 +373,7 @@ mod tests {
|
|||||||
|
|
||||||
let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.into() });
|
let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.into() });
|
||||||
let mut initialized = Initialized {
|
let mut initialized = Initialized {
|
||||||
finalized_block_hash: "0x1".into(),
|
finalized_block_hashes: vec!["0x1".into()],
|
||||||
finalized_block_runtime: Some(runtime_event),
|
finalized_block_runtime: Some(runtime_event),
|
||||||
with_runtime: true,
|
with_runtime: true,
|
||||||
};
|
};
|
||||||
@@ -381,7 +381,7 @@ mod tests {
|
|||||||
|
|
||||||
let ser = serde_json::to_string(&event).unwrap();
|
let ser = serde_json::to_string(&event).unwrap();
|
||||||
let exp = concat!(
|
let exp = concat!(
|
||||||
r#"{"event":"initialized","finalizedBlockHash":"0x1","#,
|
r#"{"event":"initialized","finalizedBlockHashes":["0x1"],"#,
|
||||||
r#""finalizedBlockRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","#,
|
r#""finalizedBlockRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","#,
|
||||||
r#""specVersion":1,"implVersion":0,"apis":{},"transactionVersion":0}}}"#,
|
r#""specVersion":1,"implVersion":0,"apis":{},"transactionVersion":0}}}"#,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ async fn follow_subscription_produces_blocks() {
|
|||||||
// Initialized must always be reported first.
|
// Initialized must always be reported first.
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -255,7 +255,7 @@ async fn follow_with_runtime() {
|
|||||||
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.clone().into() }));
|
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.clone().into() }));
|
||||||
// Runtime must always be reported with the first event.
|
// Runtime must always be reported with the first event.
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime,
|
finalized_block_runtime,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -1344,7 +1344,7 @@ async fn follow_generates_initial_blocks() {
|
|||||||
// Initialized must always be reported first.
|
// Initialized must always be reported first.
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -1896,7 +1896,7 @@ async fn follow_prune_best_block() {
|
|||||||
// Initialized must always be reported first.
|
// Initialized must always be reported first.
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -2081,6 +2081,7 @@ async fn follow_forks_pruned_block() {
|
|||||||
// ^^^ finalized
|
// ^^^ finalized
|
||||||
// -> block 1 -> block 2_f -> block 3_f
|
// -> block 1 -> block 2_f -> block 3_f
|
||||||
//
|
//
|
||||||
|
let finalized_hash = client.info().finalized_hash;
|
||||||
|
|
||||||
let block_1 = BlockBuilderBuilder::new(&*client)
|
let block_1 = BlockBuilderBuilder::new(&*client)
|
||||||
.on_parent_block(client.chain_info().genesis_hash)
|
.on_parent_block(client.chain_info().genesis_hash)
|
||||||
@@ -2090,6 +2091,7 @@ async fn follow_forks_pruned_block() {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.block;
|
.block;
|
||||||
|
let block_1_hash = block_1.header.hash();
|
||||||
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
|
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
|
||||||
|
|
||||||
let block_2 = BlockBuilderBuilder::new(&*client)
|
let block_2 = BlockBuilderBuilder::new(&*client)
|
||||||
@@ -2100,6 +2102,7 @@ async fn follow_forks_pruned_block() {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.block;
|
.block;
|
||||||
|
let block_2_hash = block_2.header.hash();
|
||||||
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
|
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
|
||||||
|
|
||||||
let block_3 = BlockBuilderBuilder::new(&*client)
|
let block_3 = BlockBuilderBuilder::new(&*client)
|
||||||
@@ -2156,7 +2159,12 @@ async fn follow_forks_pruned_block() {
|
|||||||
// Initialized must always be reported first.
|
// Initialized must always be reported first.
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", block_3_hash),
|
finalized_block_hashes: vec![
|
||||||
|
format!("{:?}", finalized_hash),
|
||||||
|
format!("{:?}", block_1_hash),
|
||||||
|
format!("{:?}", block_2_hash),
|
||||||
|
format!("{:?}", block_3_hash),
|
||||||
|
],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -2310,7 +2318,7 @@ async fn follow_report_multiple_pruned_block() {
|
|||||||
// Initialized must always be reported first.
|
// Initialized must always be reported first.
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
@@ -2632,7 +2640,7 @@ async fn follow_finalized_before_new_block() {
|
|||||||
let finalized_hash = client.info().finalized_hash;
|
let finalized_hash = client.info().finalized_hash;
|
||||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||||
let expected = FollowEvent::Initialized(Initialized {
|
let expected = FollowEvent::Initialized(Initialized {
|
||||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
|
||||||
finalized_block_runtime: None,
|
finalized_block_runtime: None,
|
||||||
with_runtime: false,
|
with_runtime: false,
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user