Make BlockImport and Verifier async (#8472)

* Make grandpa work

* Introduce `SharedData`

* Add test and fix bugs

* Switch to `SharedData`

* Make grandpa tests working

* More Babe work

* Make it async

* Fix fix

* Use `async_trait` in sc-consensus-slots

This makes the code a little bit easier to read and also expresses that
there can always only be one call at a time to `on_slot`.

* Make grandpa tests compile

* More Babe tests work

* Fix network test

* Start fixing service test

* Finish service-test

* Fix sc-consensus-aura

* Fix fix fix

* More fixes

* Make everything compile *yeah*

* Fix build when we have Rust 1.51

* Update client/consensus/common/src/shared_data.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/common/src/shared_data.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/common/src/shared_data.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/common/src/shared_data.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/common/src/shared_data.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/babe/src/tests.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/babe/src/tests.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Fix warning

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2021-03-30 11:19:49 +02:00
committed by GitHub
parent 2998b42311
commit 217c4be226
65 changed files with 1241 additions and 715 deletions
@@ -19,17 +19,17 @@
//! Utilities for dealing with authorities, authority sets, and handoffs.
use fork_tree::ForkTree;
use parking_lot::RwLock;
use parking_lot::MappedMutexGuard;
use finality_grandpa::voter_set::VoterSet;
use parity_scale_codec::{Encode, Decode};
use log::debug;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sp_finality_grandpa::{AuthorityId, AuthorityList};
use sc_consensus::shared_data::{SharedData, SharedDataLocked};
use std::cmp::Ord;
use std::fmt::Debug;
use std::ops::Add;
use std::sync::Arc;
/// Error type returned on operations on the `AuthoritySet`.
#[derive(Debug, derive_more::Display)]
@@ -70,19 +70,30 @@ impl<N, E: std::error::Error> From<E> for Error<N, E> {
/// A shared authority set.
pub struct SharedAuthoritySet<H, N> {
inner: Arc<RwLock<AuthoritySet<H, N>>>,
inner: SharedData<AuthoritySet<H, N>>,
}
impl<H, N> Clone for SharedAuthoritySet<H, N> {
fn clone(&self) -> Self {
SharedAuthoritySet { inner: self.inner.clone() }
SharedAuthoritySet {
inner: self.inner.clone(),
}
}
}
impl<H, N> SharedAuthoritySet<H, N> {
/// Acquire a reference to the inner read-write lock.
pub(crate) fn inner(&self) -> &RwLock<AuthoritySet<H, N>> {
&*self.inner
/// Returns access to the [`AuthoritySet`].
pub(crate) fn inner(&self) -> MappedMutexGuard<AuthoritySet<H, N>> {
self.inner.shared_data()
}
/// Returns access to the [`AuthoritySet`] and locks it.
///
/// For more information see [`SharedDataLocked`].
pub(crate) fn inner_locked(
&self,
) -> SharedDataLocked<AuthoritySet<H, N>> {
self.inner.shared_data_locked()
}
}
@@ -93,17 +104,17 @@ where N: Add<Output=N> + Ord + Clone + Debug,
/// Get the earliest limit-block number that's higher or equal to the given
/// min number, if any.
pub(crate) fn current_limit(&self, min: N) -> Option<N> {
self.inner.read().current_limit(min)
self.inner().current_limit(min)
}
/// Get the current set ID. This is incremented every time the set changes.
pub fn set_id(&self) -> u64 {
self.inner.read().set_id
self.inner().set_id
}
/// Get the current authorities and their weights (for the current set ID).
pub fn current_authorities(&self) -> VoterSet<AuthorityId> {
VoterSet::new(self.inner.read().current_authorities.iter().cloned()).expect(
VoterSet::new(self.inner().current_authorities.iter().cloned()).expect(
"current_authorities is non-empty and weights are non-zero; \
constructor and all mutating operations on `AuthoritySet` ensure this; \
qed.",
@@ -112,18 +123,20 @@ where N: Add<Output=N> + Ord + Clone + Debug,
/// Clone the inner `AuthoritySet`.
pub fn clone_inner(&self) -> AuthoritySet<H, N> {
self.inner.read().clone()
self.inner().clone()
}
/// Clone the inner `AuthoritySetChanges`.
pub fn authority_set_changes(&self) -> AuthoritySetChanges<N> {
self.inner.read().authority_set_changes.clone()
self.inner().authority_set_changes.clone()
}
}
impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
fn from(set: AuthoritySet<H, N>) -> Self {
SharedAuthoritySet { inner: Arc::new(RwLock::new(set)) }
SharedAuthoritySet {
inner: SharedData::new(set),
}
}
}
@@ -592,7 +592,7 @@ mod test {
).unwrap();
assert_eq!(
*authority_set.inner().read(),
*authority_set.inner(),
AuthoritySet::new(
authorities.clone(),
set_id,
@@ -616,7 +616,7 @@ mod test {
votes: vec![],
},
set_id,
&*authority_set.inner().read(),
&*authority_set.inner(),
),
current_rounds,
},
@@ -688,7 +688,7 @@ mod test {
).unwrap();
assert_eq!(
*authority_set.inner().read(),
*authority_set.inner(),
AuthoritySet::new(
authorities.clone(),
set_id,
@@ -712,7 +712,7 @@ mod test {
votes: vec![],
},
set_id,
&*authority_set.inner().read(),
&*authority_set.inner(),
),
current_rounds,
},
@@ -781,7 +781,7 @@ mod test {
).unwrap();
assert_eq!(
*authority_set.inner().read(),
*authority_set.inner(),
AuthoritySet::new(
authorities.clone(),
set_id,
@@ -508,7 +508,7 @@ where
.best_chain()
.map_err(|e| Error::Blockchain(e.to_string()))?;
let authority_set = self.authority_set.inner().read();
let authority_set = self.authority_set.inner();
// block hash and number of the next pending authority set change in the
// given best chain.
@@ -1228,7 +1228,7 @@ where
// NOTE: lock must be held through writing to DB to avoid race. this lock
// also implicitly synchronizes the check for last finalized number
// below.
let mut authority_set = authority_set.inner().write();
let mut authority_set = authority_set.inner();
let status = client.info();
+35 -26
View File
@@ -20,13 +20,13 @@ use std::{sync::Arc, collections::HashMap};
use log::debug;
use parity_scale_codec::Encode;
use parking_lot::RwLockWriteGuard;
use sp_blockchain::{BlockStatus, well_known_cache_keys};
use sc_client_api::{backend::Backend, utils::is_descendent_of};
use sc_telemetry::TelemetryHandle;
use sp_utils::mpsc::TracingUnboundedSender;
use sp_api::TransactionFor;
use sc_consensus::shared_data::{SharedDataLockedUpgradable, SharedDataLocked};
use sp_consensus::{
BlockImport, Error as ConsensusError,
@@ -99,7 +99,7 @@ impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
let chain_info = self.inner.info();
// request justifications for all pending changes for which change blocks have already been imported
let authorities = self.authority_set.inner().read();
let authorities = self.authority_set.inner();
for pending_change in authorities.pending_changes() {
if pending_change.delay_kind == DelayKind::Finalized &&
pending_change.effective_number() > chain_info.finalized_number &&
@@ -157,30 +157,30 @@ impl<H, N> AppliedChanges<H, N> {
}
}
struct PendingSetChanges<'a, Block: 'a + BlockT> {
struct PendingSetChanges<Block: BlockT> {
just_in_case: Option<(
AuthoritySet<Block::Hash, NumberFor<Block>>,
RwLockWriteGuard<'a, AuthoritySet<Block::Hash, NumberFor<Block>>>,
SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
)>,
applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
do_pause: bool,
}
impl<'a, Block: 'a + BlockT> PendingSetChanges<'a, Block> {
impl<Block: BlockT> PendingSetChanges<Block> {
// revert the pending set change explicitly.
fn revert(self) { }
fn revert(self) {}
fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
self.just_in_case = None;
let applied_changes = ::std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
(applied_changes, self.do_pause)
}
}
impl<'a, Block: 'a + BlockT> Drop for PendingSetChanges<'a, Block> {
impl<Block: BlockT> Drop for PendingSetChanges<Block> {
fn drop(&mut self) {
if let Some((old_set, mut authorities)) = self.just_in_case.take() {
*authorities = old_set;
*authorities.upgrade() = old_set;
}
}
}
@@ -269,33 +269,38 @@ where
// when we update the authorities, we need to hold the lock
// until the block is written to prevent a race if we need to restore
// the old authority set on error or panic.
struct InnerGuard<'a, T: 'a> {
old: Option<T>,
guard: Option<RwLockWriteGuard<'a, T>>,
struct InnerGuard<'a, H, N> {
old: Option<AuthoritySet<H, N>>,
guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
}
impl<'a, T: 'a> InnerGuard<'a, T> {
fn as_mut(&mut self) -> &mut T {
impl<'a, H, N> InnerGuard<'a, H, N> {
fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
&mut **self.guard.as_mut().expect("only taken on deconstruction; qed")
}
fn set_old(&mut self, old: T) {
fn set_old(&mut self, old: AuthoritySet<H, N>) {
if self.old.is_none() {
// ignore "newer" old changes.
self.old = Some(old);
}
}
fn consume(mut self) -> Option<(T, RwLockWriteGuard<'a, T>)> {
fn consume(
mut self,
) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
if let Some(old) = self.old.take() {
Some((old, self.guard.take().expect("only taken on deconstruction; qed")))
Some((
old,
self.guard.take().expect("only taken on deconstruction; qed"),
))
} else {
None
}
}
}
impl<'a, T: 'a> Drop for InnerGuard<'a, T> {
impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
fn drop(&mut self) {
if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
*guard = old;
@@ -315,7 +320,7 @@ where
let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
let mut guard = InnerGuard {
guard: Some(self.authority_set.inner().write()),
guard: Some(self.authority_set.inner_locked()),
old: None,
};
@@ -413,10 +418,13 @@ where
);
}
let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
}
}
#[async_trait::async_trait]
impl<BE, Block: BlockT, Client, SC> BlockImport<Block>
for GrandpaBlockImport<BE, Block, Client, SC> where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
@@ -425,11 +433,13 @@ impl<BE, Block: BlockT, Client, SC> BlockImport<Block>
Client: crate::ClientForGrandpa<Block, BE>,
for<'a> &'a Client:
BlockImport<Block, Error = ConsensusError, Transaction = TransactionFor<Client, Block>>,
TransactionFor<Client, Block>: Send + 'static,
SC: Send,
{
type Error = ConsensusError;
type Transaction = TransactionFor<Client, Block>;
fn import_block(
async fn import_block(
&mut self,
mut block: BlockImportParams<Block, Self::Transaction>,
new_cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
@@ -452,7 +462,7 @@ impl<BE, Block: BlockT, Client, SC> BlockImport<Block>
// we don't want to finalize on `inner.import_block`
let mut justifications = block.justifications.take();
let import_result = (&*self.inner).import_block(block, new_cache);
let import_result = (&*self.inner).import_block(block, new_cache).await;
let mut imported_aux = {
match import_result {
@@ -556,11 +566,11 @@ impl<BE, Block: BlockT, Client, SC> BlockImport<Block>
Ok(ImportResult::Imported(imported_aux))
}
fn check_block(
async fn check_block(
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block)
self.inner.check_block(block).await
}
}
@@ -580,8 +590,7 @@ impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Clie
.iter()
.find(|(set_id, _)| *set_id == authority_set.set_id())
{
let mut authority_set = authority_set.inner().write();
authority_set.current_authorities = change.next_authorities.clone();
authority_set.inner().current_authorities = change.next_authorities.clone();
}
// index authority set hard forks by block hash so that they can be used
@@ -596,7 +605,7 @@ impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Clie
// any *pending* standard changes, checking by the block hash at which
// they were announced.
{
let mut authority_set = authority_set.inner().write();
let mut authority_set = authority_set.inner();
authority_set.pending_standard_changes = authority_set
.pending_standard_changes
+1 -1
View File
@@ -1019,7 +1019,7 @@ where
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*self.env.authority_set.inner().read(),
&*self.env.authority_set.inner(),
(new.canon_hash, new.canon_number),
);
@@ -326,7 +326,7 @@ where
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*self.persistent_data.authority_set.inner().read(),
&*self.persistent_data.authority_set.inner(),
(new.canon_hash, new.canon_number),
);
+31 -36
View File
@@ -28,9 +28,9 @@ use sc_network_test::{
use sc_network::config::ProtocolConfig;
use parking_lot::{RwLock, Mutex};
use futures_timer::Delay;
use futures::executor::block_on;
use tokio::runtime::{Runtime, Handle};
use sp_keyring::Ed25519Keyring;
use sc_client_api::backend::TransactionFor;
use sp_blockchain::Result;
use sp_api::{ApiRef, ProvideRuntimeApi};
use substrate_test_runtime_client::runtime::BlockNumber;
@@ -43,7 +43,9 @@ use sp_runtime::{Justifications, traits::{Block as BlockT, Header as HeaderT}};
use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::H256;
use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, EquivocationProof, GrandpaApi, OpaqueKeyOwnershipProof};
use sp_finality_grandpa::{
GRANDPA_ENGINE_ID, AuthorityList, EquivocationProof, GrandpaApi, OpaqueKeyOwnershipProof,
};
use authorities::AuthoritySet;
use sc_block_builder::BlockBuilderProvider;
@@ -54,7 +56,13 @@ use sp_application_crypto::key_types::GRANDPA;
type TestLinkHalf =
LinkHalf<Block, PeersFullClient, LongestChain<substrate_test_runtime_client::Backend, Block>>;
type PeerData = Mutex<Option<TestLinkHalf>>;
type GrandpaPeer = Peer<PeerData>;
type GrandpaPeer = Peer<PeerData, GrandpaBlockImport>;
type GrandpaBlockImport = crate::GrandpaBlockImport<
substrate_test_runtime_client::Backend,
Block,
PeersFullClient,
LongestChain<substrate_test_runtime_client::Backend, Block>
>;
struct GrandpaTestNet {
peers: Vec<GrandpaPeer>,
@@ -93,6 +101,7 @@ impl GrandpaTestNet {
impl TestNetFactory for GrandpaTestNet {
type Verifier = PassThroughVerifier;
type PeerData = PeerData;
type BlockImport = GrandpaBlockImport;
/// Create new test network with peers and given config.
fn from_config(_config: &ProtocolConfig) -> Self {
@@ -124,9 +133,9 @@ impl TestNetFactory for GrandpaTestNet {
PassThroughVerifier::new(false) // use non-instant finality.
}
fn make_block_import<Transaction>(&self, client: PeersClient)
fn make_block_import(&self, client: PeersClient)
-> (
BlockImportAdapter<Transaction>,
BlockImportAdapter<Self::BlockImport>,
Option<BoxJustificationImport<Block>>,
PeerData,
)
@@ -141,7 +150,7 @@ impl TestNetFactory for GrandpaTestNet {
).expect("Could not create block import for fresh peer.");
let justification_import = Box::new(import.clone());
(
BlockImportAdapter::new_full(import),
BlockImportAdapter::new(import),
Some(justification_import),
Mutex::new(Some(link)),
)
@@ -820,11 +829,7 @@ fn allows_reimporting_change_blocks() {
let mut net = GrandpaTestNet::new(api.clone(), 3, 0);
let client = net.peer(0).client().clone();
let (mut block_import, ..) = net.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(
client.clone(),
);
let (mut block_import, ..) = net.make_block_import(client.clone());
let full_client = client.as_full().unwrap();
let builder = full_client.new_block_at(&BlockId::Number(0), Default::default(), false).unwrap();
@@ -844,7 +849,7 @@ fn allows_reimporting_change_blocks() {
};
assert_eq!(
block_import.import_block(block(), HashMap::new()).unwrap(),
block_on(block_import.import_block(block(), HashMap::new())).unwrap(),
ImportResult::Imported(ImportedAux {
needs_justification: true,
clear_justification_requests: false,
@@ -855,7 +860,7 @@ fn allows_reimporting_change_blocks() {
);
assert_eq!(
block_import.import_block(block(), HashMap::new()).unwrap(),
block_on(block_import.import_block(block(), HashMap::new())).unwrap(),
ImportResult::AlreadyInChain
);
}
@@ -869,11 +874,7 @@ fn test_bad_justification() {
let mut net = GrandpaTestNet::new(api.clone(), 3, 0);
let client = net.peer(0).client().clone();
let (mut block_import, ..) = net.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(
client.clone(),
);
let (mut block_import, ..) = net.make_block_import(client.clone());
let full_client = client.as_full().expect("only full clients are used in test");
let builder = full_client.new_block_at(&BlockId::Number(0), Default::default(), false).unwrap();
@@ -895,7 +896,7 @@ fn test_bad_justification() {
};
assert_eq!(
block_import.import_block(block(), HashMap::new()).unwrap(),
block_on(block_import.import_block(block(), HashMap::new())).unwrap(),
ImportResult::Imported(ImportedAux {
needs_justification: true,
clear_justification_requests: false,
@@ -906,7 +907,7 @@ fn test_bad_justification() {
);
assert_eq!(
block_import.import_block(block(), HashMap::new()).unwrap(),
block_on(block_import.import_block(block(), HashMap::new())).unwrap(),
ImportResult::AlreadyInChain
);
}
@@ -950,9 +951,7 @@ fn voter_persists_its_votes() {
let set_state = {
let bob_client = net.peer(1).client().clone();
let (_, _, link) = net
.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(bob_client);
.make_block_import(bob_client);
let LinkHalf { persistent_data, .. } = link.lock().take().unwrap();
let PersistentData { set_state, .. } = persistent_data;
set_state
@@ -1019,9 +1018,7 @@ fn voter_persists_its_votes() {
let alice_client = net.peer(0).client().clone();
let (_block_import, _, link) = net
.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(alice_client);
.make_block_import(alice_client);
let link = link.lock().take().unwrap();
let grandpa_params = GrandpaParams {
@@ -1422,7 +1419,7 @@ fn grandpa_environment_respects_voting_rules() {
// the unrestricted environment should just return the best block
assert_eq!(
futures::executor::block_on(unrestricted_env.best_chain_containing(
block_on(unrestricted_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
21,
@@ -1431,14 +1428,14 @@ fn grandpa_environment_respects_voting_rules() {
// both the other environments should return block 16, which is 3/4 of the
// way in the unfinalized chain
assert_eq!(
futures::executor::block_on(three_quarters_env.best_chain_containing(
block_on(three_quarters_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
16,
);
assert_eq!(
futures::executor::block_on(default_env.best_chain_containing(
block_on(default_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
16,
@@ -1449,7 +1446,7 @@ fn grandpa_environment_respects_voting_rules() {
// the 3/4 environment should propose block 21 for voting
assert_eq!(
futures::executor::block_on(three_quarters_env.best_chain_containing(
block_on(three_quarters_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
21,
@@ -1458,7 +1455,7 @@ fn grandpa_environment_respects_voting_rules() {
// while the default environment will always still make sure we don't vote
// on the best block (2 behind)
assert_eq!(
futures::executor::block_on(default_env.best_chain_containing(
block_on(default_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
19,
@@ -1471,7 +1468,7 @@ fn grandpa_environment_respects_voting_rules() {
// best block, there's a hard rule that we can't cast any votes lower than
// the given base (#21).
assert_eq!(
futures::executor::block_on(default_env.best_chain_containing(
block_on(default_env.best_chain_containing(
peer.client().info().finalized_hash
)).unwrap().unwrap().1,
21,
@@ -1557,9 +1554,7 @@ fn imports_justification_for_regular_blocks_on_import() {
let mut net = GrandpaTestNet::new(api.clone(), 1, 0);
let client = net.peer(0).client().clone();
let (mut block_import, ..) = net.make_block_import::<
TransactionFor<substrate_test_runtime_client::Backend, Block>
>(client.clone());
let (mut block_import, ..) = net.make_block_import(client.clone());
let full_client = client.as_full().expect("only full clients are used in test");
let builder = full_client.new_block_at(&BlockId::Number(0), Default::default(), false).unwrap();
@@ -1607,7 +1602,7 @@ fn imports_justification_for_regular_blocks_on_import() {
import.fork_choice = Some(ForkChoiceStrategy::LongestChain);
assert_eq!(
block_import.import_block(import, HashMap::new()).unwrap(),
block_on(block_import.import_block(import, HashMap::new())).unwrap(),
ImportResult::Imported(ImportedAux {
needs_justification: false,
clear_justification_requests: false,
@@ -372,7 +372,7 @@ mod tests {
.unwrap()
.block;
client.import(BlockOrigin::Own, block).unwrap();
futures::executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
}
let genesis = client