mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 05:27:56 +00:00
Manual seal delayed finalize (#13999)
* up * up * added test * remove unncessary dep * cargo fmt * cargo fmt * up * Update client/consensus/manual-seal/src/lib.rs Co-authored-by: Bastian Köcher <git@kchr.de> * fix test * cargo fmt * added docs * updated doc --------- Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: parity-processbot <>
This commit is contained in:
committed by
GitHub
parent
793f04fc05
commit
57f2468e31
Generated
+1
@@ -8920,6 +8920,7 @@ dependencies = [
|
||||
"assert_matches",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"jsonrpsee",
|
||||
"log",
|
||||
"parity-scale-codec",
|
||||
|
||||
@@ -18,6 +18,7 @@ assert_matches = "1.3.0"
|
||||
async-trait = "0.1.57"
|
||||
codec = { package = "parity-scale-codec", version = "3.2.2" }
|
||||
futures = "0.3.21"
|
||||
futures-timer = "3.0.1"
|
||||
log = "0.4.17"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
thiserror = "1.0"
|
||||
|
||||
@@ -20,17 +20,22 @@
|
||||
//! This is suitable for a testing environment.
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures_timer::Delay;
|
||||
use prometheus_endpoint::Registry;
|
||||
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
|
||||
use sc_client_api::{
|
||||
backend::{Backend as ClientBackend, Finalizer},
|
||||
client::BlockchainEvents,
|
||||
};
|
||||
use sc_consensus::{
|
||||
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
|
||||
import_queue::{BasicQueue, BoxBlockImport, Verifier},
|
||||
};
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use sp_consensus::{Environment, Proposer, SelectChain};
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_inherents::CreateInherentDataProviders;
|
||||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
use std::{marker::PhantomData, sync::Arc, time::Duration};
|
||||
|
||||
mod error;
|
||||
mod finalize_block;
|
||||
@@ -84,7 +89,7 @@ where
|
||||
|
||||
/// Params required to start the instant sealing authorship task.
|
||||
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP, P> {
|
||||
/// Block import instance for well. importing blocks.
|
||||
/// Block import instance.
|
||||
pub block_import: BI,
|
||||
|
||||
/// The environment we are producing blocks for.
|
||||
@@ -136,7 +141,19 @@ pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC,
|
||||
pub create_inherent_data_providers: CIDP,
|
||||
}
|
||||
|
||||
/// Creates the background authorship task for the manual seal engine.
|
||||
/// Params required to start the delayed finalization task.
|
||||
pub struct DelayedFinalizeParams<C, S> {
|
||||
/// Block import instance.
|
||||
pub client: Arc<C>,
|
||||
|
||||
/// Handle for spawning delayed finalization tasks.
|
||||
pub spawn_handle: S,
|
||||
|
||||
/// The delay in seconds before a block is finalized.
|
||||
pub delay_sec: u64,
|
||||
}
|
||||
|
||||
/// Creates the background authorship task for the manually seal engine.
|
||||
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
|
||||
ManualSealParams {
|
||||
mut block_import,
|
||||
@@ -303,6 +320,44 @@ pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
|
||||
.await
|
||||
}
|
||||
|
||||
/// Creates a future for delayed finalization of manual sealed blocks.
|
||||
///
|
||||
/// The future needs to be spawned in the background alongside the
|
||||
/// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that
|
||||
/// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly
|
||||
/// after building them. This also means that delayed finality can not be used with
|
||||
/// [`run_instant_seal_and_finalize`].
|
||||
pub async fn run_delayed_finalize<B, CB, C, S>(
|
||||
DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
|
||||
) where
|
||||
B: BlockT + 'static,
|
||||
CB: ClientBackend<B> + 'static,
|
||||
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
|
||||
S: SpawnNamed,
|
||||
{
|
||||
let mut block_import_stream = client.import_notification_stream();
|
||||
|
||||
while let Some(notification) = block_import_stream.next().await {
|
||||
let delay = Delay::new(Duration::from_secs(delay_sec));
|
||||
let cloned_client = client.clone();
|
||||
spawn_handle.spawn(
|
||||
"delayed-finalize",
|
||||
None,
|
||||
Box::pin(async move {
|
||||
delay.await;
|
||||
finalize_block(FinalizeBlockParams {
|
||||
hash: notification.hash,
|
||||
sender: None,
|
||||
justification: None,
|
||||
finalizer: cloned_client,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
.await
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -428,6 +483,101 @@ mod tests {
|
||||
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn instant_seal_delayed_finalize() {
|
||||
let builder = TestClientBuilder::new();
|
||||
let (client, select_chain) = builder.build_with_longest_chain();
|
||||
let client = Arc::new(client);
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
let genesis_hash = client.info().genesis_hash;
|
||||
let pool = Arc::new(BasicPool::with_revalidation_type(
|
||||
Options::default(),
|
||||
true.into(),
|
||||
api(),
|
||||
None,
|
||||
RevalidationType::Full,
|
||||
spawner.clone(),
|
||||
0,
|
||||
genesis_hash,
|
||||
genesis_hash,
|
||||
));
|
||||
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
|
||||
// this test checks that blocks are created as soon as transactions are imported into the
|
||||
// pool.
|
||||
let (sender, receiver) = futures::channel::oneshot::channel();
|
||||
let mut sender = Arc::new(Some(sender));
|
||||
let commands_stream =
|
||||
pool.pool().validated_pool().import_notification_stream().map(move |_| {
|
||||
// we're only going to submit one tx so this fn will only be called once.
|
||||
let mut_sender = Arc::get_mut(&mut sender).unwrap();
|
||||
let sender = std::mem::take(mut_sender);
|
||||
EngineCommand::SealNewBlock {
|
||||
create_empty: false,
|
||||
// set to `false`, expecting to be finalized by delayed finalize
|
||||
finalize: false,
|
||||
parent_hash: None,
|
||||
sender,
|
||||
}
|
||||
});
|
||||
|
||||
let future_instant_seal = run_manual_seal(ManualSealParams {
|
||||
block_import: client.clone(),
|
||||
commands_stream,
|
||||
env,
|
||||
client: client.clone(),
|
||||
pool: pool.clone(),
|
||||
select_chain,
|
||||
create_inherent_data_providers: |_, _| async { Ok(()) },
|
||||
consensus_data_provider: None,
|
||||
});
|
||||
std::thread::spawn(|| {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
// spawn the background authorship task
|
||||
rt.block_on(future_instant_seal);
|
||||
});
|
||||
|
||||
let delay_sec = 5;
|
||||
let future_delayed_finalize = run_delayed_finalize(DelayedFinalizeParams {
|
||||
client: client.clone(),
|
||||
delay_sec,
|
||||
spawn_handle: spawner,
|
||||
});
|
||||
std::thread::spawn(|| {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
// spawn the background authorship task
|
||||
rt.block_on(future_delayed_finalize);
|
||||
});
|
||||
|
||||
let mut finality_stream = client.finality_notification_stream();
|
||||
// submit a transaction to pool.
|
||||
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
|
||||
// assert that it was successfully imported
|
||||
assert!(result.is_ok());
|
||||
// assert that the background task returns ok
|
||||
let created_block = receiver.await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
created_block,
|
||||
CreatedBlock {
|
||||
hash: created_block.hash,
|
||||
aux: ImportedAux {
|
||||
header_only: false,
|
||||
clear_justification_requests: false,
|
||||
needs_justification: false,
|
||||
bad_justification: false,
|
||||
is_new_best: true,
|
||||
}
|
||||
}
|
||||
);
|
||||
// assert that there's a new block in the db.
|
||||
assert!(client.header(created_block.hash).unwrap().is_some());
|
||||
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);
|
||||
|
||||
assert_eq!(client.info().finalized_hash, client.info().genesis_hash);
|
||||
|
||||
let finalized = finality_stream.select_next_some().await;
|
||||
assert_eq!(finalized.hash, created_block.hash);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn manual_seal_and_finalization() {
|
||||
let builder = TestClientBuilder::new();
|
||||
|
||||
@@ -160,10 +160,11 @@ pub fn send_result<T: std::fmt::Debug>(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// instant seal doesn't report errors over rpc, simply log them.
|
||||
// Sealing/Finalization with no RPC sender such as instant seal or delayed finalize doesn't
|
||||
// report errors over rpc, simply log them.
|
||||
match result {
|
||||
Ok(r) => log::info!("Instant Seal success: {:?}", r),
|
||||
Err(e) => log::error!("Instant Seal encountered an error: {}", e),
|
||||
Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r),
|
||||
Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user