mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 05:11:09 +00:00
chain_head/tests: Check finalized block event before new block (#13680)
* chain_head/tests: Mock client for custom block notification Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head/tests: Check finalized block event before new block Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update client/rpc-spec-v2/src/chain_head/test_utils.rs Co-authored-by: Bastian Köcher <git@kchr.de> * Update client/rpc-spec-v2/src/chain_head/test_utils.rs Co-authored-by: Bastian Köcher <git@kchr.de> * Update client/rpc-spec-v2/src/chain_head/test_utils.rs Co-authored-by: Bastian Köcher <git@kchr.de> * chain_head/tests: Run import events with 10min timeout Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head/tests: Add comments about test Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Generated
+2
-1
@@ -9275,6 +9275,7 @@ dependencies = [
|
||||
"sc-chain-spec",
|
||||
"sc-client-api",
|
||||
"sc-transaction-pool-api",
|
||||
"sc-utils",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sp-api",
|
||||
@@ -11954,7 +11955,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"digest 0.10.6",
|
||||
"rand 0.7.3",
|
||||
"rand 0.8.5",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
|
||||
@@ -43,4 +43,5 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
|
||||
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
|
||||
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
|
||||
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
|
||||
sc-utils = { version = "4.0.0-dev", path = "../utils" }
|
||||
assert_matches = "1.3.0"
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
//!
|
||||
//! Methods are prefixed by `chainHead`.
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_utils;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
|
||||
@@ -0,0 +1,320 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use sc_client_api::{
|
||||
execution_extensions::ExecutionExtensions, BlockBackend, BlockImportNotification,
|
||||
BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, FinalityNotification,
|
||||
FinalityNotifications, FinalizeSummary, ImportNotifications, KeysIter, PairsIter, StorageData,
|
||||
StorageEventStream, StorageKey, StorageProvider,
|
||||
};
|
||||
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
|
||||
use sp_api::{CallApiAt, CallApiAtParams, NumberFor, RuntimeVersion};
|
||||
use sp_blockchain::{BlockStatus, CachedHeaderMetadata, HeaderBackend, HeaderMetadata, Info};
|
||||
use sp_consensus::BlockOrigin;
|
||||
use sp_runtime::{
|
||||
generic::SignedBlock,
|
||||
traits::{Block as BlockT, Header as HeaderT},
|
||||
Justifications,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use substrate_test_runtime::{Block, Hash, Header};
|
||||
|
||||
pub struct ChainHeadMockClient<Client> {
|
||||
client: Arc<Client>,
|
||||
import_sinks: Mutex<Vec<TracingUnboundedSender<BlockImportNotification<Block>>>>,
|
||||
finality_sinks: Mutex<Vec<TracingUnboundedSender<FinalityNotification<Block>>>>,
|
||||
}
|
||||
|
||||
impl<Client> ChainHeadMockClient<Client> {
|
||||
pub fn new(client: Arc<Client>) -> Self {
|
||||
ChainHeadMockClient {
|
||||
client,
|
||||
import_sinks: Default::default(),
|
||||
finality_sinks: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn trigger_import_stream(&self, header: Header) {
|
||||
// Ensure the client called the `import_notification_stream`.
|
||||
while self.import_sinks.lock().is_empty() {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
// Build the notification.
|
||||
let (sink, _stream) = tracing_unbounded("test_sink", 100_000);
|
||||
let notification =
|
||||
BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink);
|
||||
|
||||
for sink in self.import_sinks.lock().iter_mut() {
|
||||
sink.unbounded_send(notification.clone()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn trigger_finality_stream(&self, header: Header) {
|
||||
// Ensure the client called the `finality_notification_stream`.
|
||||
while self.finality_sinks.lock().is_empty() {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
// Build the notification.
|
||||
let (sink, _stream) = tracing_unbounded("test_sink", 100_000);
|
||||
let summary = FinalizeSummary {
|
||||
header: header.clone(),
|
||||
finalized: vec![header.hash()],
|
||||
stale_heads: vec![],
|
||||
};
|
||||
let notification = FinalityNotification::from_summary(summary, sink);
|
||||
|
||||
for sink in self.finality_sinks.lock().iter_mut() {
|
||||
sink.unbounded_send(notification.clone()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ChainHead calls `import_notification_stream` and `finality_notification_stream` in order to
|
||||
// subscribe to block events.
|
||||
impl<Client> BlockchainEvents<Block> for ChainHeadMockClient<Client> {
|
||||
fn import_notification_stream(&self) -> ImportNotifications<Block> {
|
||||
let (sink, stream) = tracing_unbounded("import_notification_stream", 1024);
|
||||
self.import_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
|
||||
let (sink, stream) = tracing_unbounded("finality_notification_stream", 1024);
|
||||
self.finality_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
fn storage_changes_notification_stream(
|
||||
&self,
|
||||
_filter_keys: Option<&[StorageKey]>,
|
||||
_child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
|
||||
) -> sp_blockchain::Result<StorageEventStream<Hash>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
// The following implementations are imposed by the `chainHead` trait bounds.
|
||||
|
||||
impl<Block: BlockT, E: CallExecutor<Block>, Client: ExecutorProvider<Block, Executor = E>>
|
||||
ExecutorProvider<Block> for ChainHeadMockClient<Client>
|
||||
{
|
||||
type Executor = <Client as ExecutorProvider<Block>>::Executor;
|
||||
|
||||
fn executor(&self) -> &Self::Executor {
|
||||
self.client.executor()
|
||||
}
|
||||
|
||||
fn execution_extensions(&self) -> &ExecutionExtensions<Block> {
|
||||
self.client.execution_extensions()
|
||||
}
|
||||
}
|
||||
|
||||
impl<
|
||||
BE: sc_client_api::backend::Backend<Block> + Send + Sync + 'static,
|
||||
Block: BlockT,
|
||||
Client: StorageProvider<Block, BE>,
|
||||
> StorageProvider<Block, BE> for ChainHeadMockClient<Client>
|
||||
{
|
||||
fn storage(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
key: &StorageKey,
|
||||
) -> sp_blockchain::Result<Option<StorageData>> {
|
||||
self.client.storage(hash, key)
|
||||
}
|
||||
|
||||
fn storage_hash(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
key: &StorageKey,
|
||||
) -> sp_blockchain::Result<Option<Block::Hash>> {
|
||||
self.client.storage_hash(hash, key)
|
||||
}
|
||||
|
||||
fn storage_keys(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
prefix: Option<&StorageKey>,
|
||||
start_key: Option<&StorageKey>,
|
||||
) -> sp_blockchain::Result<KeysIter<BE::State, Block>> {
|
||||
self.client.storage_keys(hash, prefix, start_key)
|
||||
}
|
||||
|
||||
fn storage_pairs(
|
||||
&self,
|
||||
hash: <Block as BlockT>::Hash,
|
||||
prefix: Option<&StorageKey>,
|
||||
start_key: Option<&StorageKey>,
|
||||
) -> sp_blockchain::Result<PairsIter<BE::State, Block>> {
|
||||
self.client.storage_pairs(hash, prefix, start_key)
|
||||
}
|
||||
|
||||
fn child_storage(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
child_info: &ChildInfo,
|
||||
key: &StorageKey,
|
||||
) -> sp_blockchain::Result<Option<StorageData>> {
|
||||
self.client.child_storage(hash, child_info, key)
|
||||
}
|
||||
|
||||
fn child_storage_keys(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
child_info: ChildInfo,
|
||||
prefix: Option<&StorageKey>,
|
||||
start_key: Option<&StorageKey>,
|
||||
) -> sp_blockchain::Result<KeysIter<BE::State, Block>> {
|
||||
self.client.child_storage_keys(hash, child_info, prefix, start_key)
|
||||
}
|
||||
|
||||
fn child_storage_hash(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
child_info: &ChildInfo,
|
||||
key: &StorageKey,
|
||||
) -> sp_blockchain::Result<Option<Block::Hash>> {
|
||||
self.client.child_storage_hash(hash, child_info, key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block: BlockT, Client: CallApiAt<Block>> CallApiAt<Block> for ChainHeadMockClient<Client> {
|
||||
type StateBackend = <Client as CallApiAt<Block>>::StateBackend;
|
||||
|
||||
fn call_api_at(
|
||||
&self,
|
||||
params: CallApiAtParams<Block, <Client as CallApiAt<Block>>::StateBackend>,
|
||||
) -> Result<Vec<u8>, sp_api::ApiError> {
|
||||
self.client.call_api_at(params)
|
||||
}
|
||||
|
||||
fn runtime_version_at(&self, hash: Block::Hash) -> Result<RuntimeVersion, sp_api::ApiError> {
|
||||
self.client.runtime_version_at(hash)
|
||||
}
|
||||
|
||||
fn state_at(&self, at: Block::Hash) -> Result<Self::StateBackend, sp_api::ApiError> {
|
||||
self.client.state_at(at)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block: BlockT, Client: BlockBackend<Block>> BlockBackend<Block>
|
||||
for ChainHeadMockClient<Client>
|
||||
{
|
||||
fn block_body(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
|
||||
self.client.block_body(hash)
|
||||
}
|
||||
|
||||
fn block(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
|
||||
self.client.block(hash)
|
||||
}
|
||||
|
||||
fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result<sp_consensus::BlockStatus> {
|
||||
self.client.block_status(hash)
|
||||
}
|
||||
|
||||
fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
|
||||
self.client.justifications(hash)
|
||||
}
|
||||
|
||||
fn block_hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
|
||||
self.client.block_hash(number)
|
||||
}
|
||||
|
||||
fn indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
|
||||
self.client.indexed_transaction(hash)
|
||||
}
|
||||
|
||||
fn has_indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<bool> {
|
||||
self.client.has_indexed_transaction(hash)
|
||||
}
|
||||
|
||||
fn block_indexed_body(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
|
||||
self.client.block_indexed_body(hash)
|
||||
}
|
||||
fn requires_full_sync(&self) -> bool {
|
||||
self.client.requires_full_sync()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block: BlockT, Client: HeaderMetadata<Block> + Send + Sync> HeaderMetadata<Block>
|
||||
for ChainHeadMockClient<Client>
|
||||
{
|
||||
type Error = <Client as HeaderMetadata<Block>>::Error;
|
||||
|
||||
fn header_metadata(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
|
||||
self.client.header_metadata(hash)
|
||||
}
|
||||
|
||||
fn insert_header_metadata(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
header_metadata: CachedHeaderMetadata<Block>,
|
||||
) {
|
||||
self.client.insert_header_metadata(hash, header_metadata)
|
||||
}
|
||||
|
||||
fn remove_header_metadata(&self, hash: Block::Hash) {
|
||||
self.client.remove_header_metadata(hash)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block: BlockT, Client: HeaderBackend<Block> + Send + Sync> HeaderBackend<Block>
|
||||
for ChainHeadMockClient<Client>
|
||||
{
|
||||
fn header(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
|
||||
self.client.header(hash)
|
||||
}
|
||||
|
||||
fn info(&self) -> Info<Block> {
|
||||
self.client.info()
|
||||
}
|
||||
|
||||
fn status(&self, hash: Block::Hash) -> sc_client_api::blockchain::Result<BlockStatus> {
|
||||
self.client.status(hash)
|
||||
}
|
||||
|
||||
fn number(
|
||||
&self,
|
||||
hash: Block::Hash,
|
||||
) -> sc_client_api::blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
|
||||
self.client.number(hash)
|
||||
}
|
||||
|
||||
fn hash(
|
||||
&self,
|
||||
number: <<Block as BlockT>::Header as HeaderT>::Number,
|
||||
) -> sp_blockchain::Result<Option<Block::Hash>> {
|
||||
self.client.hash(number)
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
use crate::chain_head::test_utils::ChainHeadMockClient;
|
||||
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use codec::{Decode, Encode};
|
||||
use futures::Future;
|
||||
use jsonrpsee::{
|
||||
core::{error::Error, server::rpc_module::Subscription as RpcSubscription},
|
||||
types::{error::CallError, EmptyServerParams as EmptyParams},
|
||||
@@ -33,7 +36,7 @@ const CHILD_STORAGE_KEY: &[u8] = b"child";
|
||||
const CHILD_VALUE: &[u8] = b"child value";
|
||||
|
||||
async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T {
|
||||
let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next())
|
||||
let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
@@ -41,6 +44,12 @@ async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscriptio
|
||||
event
|
||||
}
|
||||
|
||||
async fn run_with_timeout<F: Future>(future: F) -> <F as Future>::Output {
|
||||
tokio::time::timeout(std::time::Duration::from_secs(60 * 10), future)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn setup_api() -> (
|
||||
Arc<Client<Backend>>,
|
||||
RpcModule<ChainHead<Backend, Block, Client<Backend>>>,
|
||||
@@ -1317,3 +1326,95 @@ async fn follow_report_multiple_pruned_block() {
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn follow_finalized_before_new_block() {
|
||||
let builder = TestClientBuilder::new();
|
||||
let backend = builder.backend();
|
||||
let mut client = Arc::new(builder.build());
|
||||
|
||||
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
|
||||
|
||||
let api = ChainHead::new(
|
||||
client_mock.clone(),
|
||||
backend,
|
||||
Arc::new(TaskExecutor::default()),
|
||||
CHAIN_GENESIS,
|
||||
MAX_PINNED_BLOCKS,
|
||||
)
|
||||
.into_rpc();
|
||||
|
||||
// Make sure the block is imported for it to be pinned.
|
||||
let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block;
|
||||
let block_1_hash = block_1.header.hash();
|
||||
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
|
||||
|
||||
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
|
||||
|
||||
// Trigger the `FinalizedNotification` for block 1 before the `BlockImportNotification`, and
|
||||
// expect for the `chainHead` to generate `NewBlock`, `BestBlock` and `Finalized` events.
|
||||
|
||||
// Trigger the Finalized notification before the NewBlock one.
|
||||
run_with_timeout(client_mock.trigger_finality_stream(block_1.header.clone())).await;
|
||||
|
||||
// Initialized must always be reported first.
|
||||
let finalized_hash = client.info().finalized_hash;
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::Initialized(Initialized {
|
||||
finalized_block_hash: format!("{:?}", finalized_hash),
|
||||
finalized_block_runtime: None,
|
||||
runtime_updates: false,
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
|
||||
// Block 1 must be reported because we triggered the finalized notification.
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::NewBlock(NewBlock {
|
||||
block_hash: format!("{:?}", block_1_hash),
|
||||
parent_block_hash: format!("{:?}", finalized_hash),
|
||||
new_runtime: None,
|
||||
runtime_updates: false,
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
|
||||
best_block_hash: format!("{:?}", block_1_hash),
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::Finalized(Finalized {
|
||||
finalized_block_hashes: vec![format!("{:?}", block_1_hash)],
|
||||
pruned_block_hashes: vec![],
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
|
||||
let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block;
|
||||
let block_2_hash = block_2.header.hash();
|
||||
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
|
||||
|
||||
// Triggering the `BlockImportNotification` notification for block 1 should have no effect
|
||||
// on the notification because the events were handled by the `FinalizedNotification`.
|
||||
// Also trigger the `BlockImportNotification` notification for block 2 to ensure
|
||||
// `NewBlock and `BestBlock` events are generated.
|
||||
|
||||
// Trigger NewBlock notification for block 1 and block 2.
|
||||
run_with_timeout(client_mock.trigger_import_stream(block_1.header)).await;
|
||||
run_with_timeout(client_mock.trigger_import_stream(block_2.header)).await;
|
||||
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::NewBlock(NewBlock {
|
||||
block_hash: format!("{:?}", block_2_hash),
|
||||
parent_block_hash: format!("{:?}", block_1_hash),
|
||||
new_runtime: None,
|
||||
runtime_updates: false,
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
|
||||
let event: FollowEvent<String> = get_next_event(&mut sub).await;
|
||||
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
|
||||
best_block_hash: format!("{:?}", block_2_hash),
|
||||
});
|
||||
assert_eq!(event, expected);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user