Split BlockImport and JustificationImport (#1521)

* Split BlockImport and JustificationImport

* Remove unused trait impl

* Fix compile

* Fix grandpa tests

* Fix network tests
This commit is contained in:
Wei Tang
2019-01-23 16:22:08 +01:00
committed by Robert Habermeier
parent 14e8be794f
commit dd88dc6cd6
11 changed files with 143 additions and 84 deletions
-14
View File
@@ -1106,7 +1106,6 @@ impl<B, E, Block, RA> CallRuntimeAt<Block> for Client<B, E, Block, RA> where
}
}
impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA> where
B: backend::Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync,
@@ -1179,19 +1178,6 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA>
);
result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into())
}
/// Import a block justification and finalize the block. The justification
/// isn't interpreted by the client and is assumed to have been validated
/// previously. The block is finalized unconditionally.
fn import_justification(
&self,
hash: Block::Hash,
_number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.finalize_block(BlockId::Hash(hash), Some(justification), true)
.map_err(|_| ConsensusErrorKind::InvalidJustification.into())
}
}
impl<B, E, Block, RA> consensus::Authorities<Block> for Client<B, E, Block, RA> where
+7 -5
View File
@@ -63,9 +63,9 @@ use std::{sync::{Arc, mpsc}, time::Duration, thread};
use codec::Encode;
use consensus_common::{
Authorities, BlockImport, Environment, Error as ConsensusError, Proposer, ForkChoiceStrategy
Authorities, BlockImport, Environment, Proposer, ForkChoiceStrategy
};
use consensus_common::import_queue::{Verifier, BasicQueue};
use consensus_common::import_queue::{Verifier, BasicQueue, SharedBlockImport, SharedJustificationImport};
use client::ChainHead;
use client::block_builder::api::BlockBuilder as BlockBuilderApi;
use consensus_common::{ImportBlock, BlockOrigin};
@@ -542,7 +542,7 @@ impl<B: Block> ExtraVerification<B> for NothingExtra {
}
impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
C: Authorities<B> + BlockImport<B> + ProvideRuntimeApi + Send + Sync,
C: Authorities<B> + ProvideRuntimeApi + Send + Sync,
C::Api: BlockBuilderApi<B>,
DigestItemFor<B>: CompatibleDigestItem + DigestItem<AuthorityId=Ed25519AuthorityId>,
E: ExtraVerification<B>,
@@ -681,12 +681,14 @@ fn register_aura_inherent_data_provider(
/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, C, E>(
slot_duration: SlotDuration,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
client: Arc<C>,
extra: E,
inherent_data_providers: InherentDataProviders,
) -> Result<AuraImportQueue<B, C, E>, consensus_common::Error> where
B: Block,
C: Authorities<B> + BlockImport<B, Error=ConsensusError> + ProvideRuntimeApi + Send + Sync,
C: Authorities<B> + ProvideRuntimeApi + Send + Sync,
C::Api: BlockBuilderApi<B>,
DigestItemFor<B>: CompatibleDigestItem + DigestItem<AuthorityId=Ed25519AuthorityId>,
E: ExtraVerification<B>,
@@ -696,7 +698,7 @@ pub fn import_queue<B, C, E>(
let verifier = Arc::new(
AuraVerifier { client: client.clone(), extra, inherent_data_providers }
);
Ok(BasicQueue::new(verifier, client))
Ok(BasicQueue::new(verifier, block_import, justification_import))
}
#[cfg(test)]
@@ -144,15 +144,20 @@ impl<Block: BlockT> ImportBlock<Block> {
pub trait BlockImport<B: BlockT> {
type Error: ::std::error::Error + Send + 'static;
/// Called by the import queue when it is started.
fn on_start(&self, _link: &::import_queue::Link<B>) { }
/// Import a Block alongside the new authorities valid from this block forward
fn import_block(
&self,
block: ImportBlock<B>,
new_authorities: Option<Vec<AuthorityIdFor<B>>>,
) -> Result<ImportResult, Self::Error>;
}
/// Justification import trait
pub trait JustificationImport<B: BlockT> {
type Error: ::std::error::Error + Send + 'static;
/// Called by the import queue when it is started.
fn on_start(&self, _link: &::import_queue::Link<B>) { }
/// Import a Block justification and finalize the given block.
fn import_justification(
@@ -24,7 +24,7 @@
//! The `BasicQueue` and `BasicVerifier` traits allow serial queues to be
//! instantiated simply.
use block_import::{ImportBlock, BlockImport, ImportResult, BlockOrigin};
use block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin};
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -38,6 +38,9 @@ use error::Error as ConsensusError;
/// Shared block import struct used by the queue.
pub type SharedBlockImport<B> = Arc<dyn BlockImport<B, Error=ConsensusError> + Send + Sync>;
/// Shared justification import struct used by the queue.
pub type SharedJustificationImport<B> = Arc<dyn JustificationImport<B, Error=ConsensusError> + Send + Sync>;
/// Maps to the Origin used by the network.
pub type Origin = usize;
@@ -111,6 +114,7 @@ pub struct BasicQueue<B: BlockT, V: 'static + Verifier<B>> {
data: Arc<AsyncImportQueueData<B>>,
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
}
/// Locks order: queue, queue_blocks, best_importing_number
@@ -123,13 +127,14 @@ pub struct AsyncImportQueueData<B: BlockT> {
}
impl<B: BlockT, V: Verifier<B>> BasicQueue<B, V> {
/// Instantiate a new basic queue, with given verifier.
pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>) -> Self {
/// Instantiate a new basic queue, with given verifier and justification import.
pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>, justification_import: Option<SharedJustificationImport<B>>) -> Self {
Self {
handle: Mutex::new(None),
data: Arc::new(AsyncImportQueueData::new()),
verifier,
block_import,
justification_import,
}
}
}
@@ -162,8 +167,11 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
let qdata = self.data.clone();
let verifier = self.verifier.clone();
let block_import = self.block_import.clone();
let justification_import = self.justification_import.clone();
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
block_import.on_start(&link);
if let Some(justification_import) = justification_import.as_ref() {
justification_import.on_start(&link);
}
import_thread(block_import, link, qdata, verifier)
})?);
Ok(())
@@ -223,7 +231,9 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
}
fn import_justification(&self, hash: B::Hash, number: NumberFor<B>, justification: Justification) -> bool {
self.block_import.import_justification(hash, number, justification).is_ok()
self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification).is_ok()
}).unwrap_or(false)
}
}
+1 -1
View File
@@ -60,7 +60,7 @@ pub mod evaluation;
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
pub use self::error::{Error, ErrorKind};
pub use block_import::{BlockImport, ImportBlock, BlockOrigin, ImportResult, ForkChoiceStrategy};
pub use block_import::{BlockImport, JustificationImport, ImportBlock, BlockOrigin, ImportResult, ForkChoiceStrategy};
/// Trait for getting the authorities at a given block.
pub trait Authorities<B: Block> {
+25 -37
View File
@@ -91,7 +91,7 @@ use client::{
};
use client::blockchain::HeaderBackend;
use codec::{Encode, Decode};
use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities};
use consensus_common::{BlockImport, JustificationImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult};
use runtime_primitives::Justification;
use runtime_primitives::traits::{
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
@@ -992,7 +992,7 @@ pub struct GrandpaBlockImport<B, E, Block: BlockT<Hash=H256>, RA, PRA> {
api: Arc<PRA>,
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> JustificationImport<Block>
for GrandpaBlockImport<B, E, Block, RA, PRA> where
NumberFor<Block>: grandpa::BlockNumberOps,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -1032,6 +1032,29 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
}
}
fn import_justification(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.import_justification(hash, number, justification, false)
}
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
for GrandpaBlockImport<B, E, Block, RA, PRA> where
NumberFor<Block>: grandpa::BlockNumberOps,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
DigestFor<Block>: Encode,
DigestItemFor<Block>: DigestItem<AuthorityId=Ed25519AuthorityId>,
RA: Send + Sync,
PRA: ProvideRuntimeApi,
PRA::Api: GrandpaApi<Block>,
{
type Error = ConsensusError;
fn import_block(&self, mut block: ImportBlock<Block>, new_authorities: Option<Vec<Ed25519AuthorityId>>)
-> Result<ImportResult, Self::Error>
{
@@ -1160,15 +1183,6 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
Ok(import_result)
}
fn import_justification(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.import_justification(hash, number, justification, false)
}
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA>
@@ -1275,32 +1289,6 @@ fn canonical_at_height<B, E, Block: BlockT<Hash=H256>, RA>(
Ok(Some(current.hash()))
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> Authorities<Block> for GrandpaBlockImport<B, E, Block, RA, PRA>
where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
DigestItemFor<Block>: DigestItem<AuthorityId=Ed25519AuthorityId>,
{
type Error = <Client<B, E, Block, RA> as Authorities<Block>>::Error;
fn authorities(&self, at: &BlockId<Block>) -> Result<Vec<Ed25519AuthorityId>, Self::Error> {
self.inner.authorities_at(at)
}
}
impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> ProvideRuntimeApi for GrandpaBlockImport<B, E, Block, RA, PRA>
where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
PRA: ProvideRuntimeApi,
{
type Api = PRA::Api;
fn runtime_api<'a>(&'a self) -> ::runtime_primitives::traits::ApiRef<'a, Self::Api> {
self.api.runtime_api()
}
}
/// Half of a link between a block-import worker and a the background voter.
// This should remain non-clone.
pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
+5 -3
View File
@@ -30,7 +30,8 @@ use client::{
};
use test_client::{self, runtime::BlockNumber};
use codec::Decode;
use consensus_common::{BlockOrigin, Error as ConsensusError};
use consensus_common::BlockOrigin;
use consensus_common::import_queue::{SharedBlockImport, SharedJustificationImport};
use std::{collections::HashSet, result};
use runtime_primitives::traits::{ApiRef, ProvideRuntimeApi, RuntimeApiInfo};
use runtime_primitives::generic::BlockId;
@@ -100,13 +101,14 @@ impl TestNetFactory for GrandpaTestNet {
}
fn make_block_import(&self, client: Arc<PeersClient>)
-> (Arc<BlockImport<Block,Error=ConsensusError> + Send + Sync>, PeerData)
-> (SharedBlockImport<Block>, Option<SharedJustificationImport<Block>>, PeerData)
{
let (import, link) = block_import(
client,
Arc::new(self.test_config.clone())
).expect("Could not create block import for fresh peer.");
(Arc::new(import), Mutex::new(Some(link)))
let shared_import = Arc::new(import);
(shared_import.clone(), Some(shared_import), Mutex::new(Some(link)))
}
fn peer(&self, i: usize) -> &GrandpaPeer {
@@ -164,7 +164,7 @@ fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = Arc::new(PassThroughVerifier(true));
let queue = BasicQueue::new(verifier, Arc::new(test_client::new()));
let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None);
queue.start(TestLink::new()).unwrap();
drop(queue);
}
+72 -10
View File
@@ -27,7 +27,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client;
use client::block_builder::BlockBuilder;
use primitives::Ed25519AuthorityId;
use primitives::{H256, Ed25519AuthorityId};
use runtime_primitives::Justification;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero};
@@ -38,10 +38,9 @@ use service::{NetworkLink, TransactionPool};
use network_libp2p::{NodeIndex, PeerId, Severity};
use keyring::Keyring;
use codec::Encode;
use consensus::{BlockImport, BlockOrigin, ImportBlock, ForkChoiceStrategy};
use consensus::Error as ConsensusError;
use consensus::{BlockOrigin, ImportBlock, JustificationImport, ForkChoiceStrategy, Error as ConsensusError, ErrorKind as ConsensusErrorKind};
use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock};
use consensus::import_queue::{Link, SharedBlockImport, Verifier};
use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
use specialization::NetworkSpecialization;
use consensus_gossip::ConsensusGossip;
use service::ExecuteInContext;
@@ -121,17 +120,19 @@ pub struct SyncImportQueue<B: BlockT, V: Verifier<B>> {
verifier: Arc<V>,
link: ImportCB<B>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
}
#[cfg(any(test, feature = "test-helpers"))]
impl<B: 'static + BlockT, V: 'static + Verifier<B>> SyncImportQueue<B, V> {
/// Create a new SyncImportQueue wrapping the given Verifier and block import
/// handle.
pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>) -> Self {
pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>, justification_import: Option<SharedJustificationImport<B>>) -> Self {
let queue = SyncImportQueue {
verifier,
link: ImportCB::new(),
block_import,
justification_import,
};
let v = queue.verifier.clone();
@@ -197,7 +198,9 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor
number: NumberFor<B>,
justification: Justification,
) -> bool {
self.block_import.import_justification(hash, number, justification).is_ok()
self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification).is_ok()
}).unwrap_or(false)
}
}
@@ -503,9 +506,9 @@ pub trait TestNetFactory: Sized {
/// Get custom block import handle for fresh client, along with peer data.
fn make_block_import(&self, client: Arc<PeersClient>)
-> (Arc<BlockImport<Block,Error=ConsensusError> + Send + Sync>, Self::PeerData)
-> (SharedBlockImport<Block>, Option<SharedJustificationImport<Block>>, Self::PeerData)
{
(client, Default::default())
(client, None, Default::default())
}
fn default_config() -> ProtocolConfig {
@@ -528,9 +531,9 @@ pub trait TestNetFactory: Sized {
let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool);
let verifier = self.make_verifier(client.clone(), config);
let (block_import, data) = self.make_block_import(client.clone());
let (block_import, justification_import, data) = self.make_block_import(client.clone());
let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import));
let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import, justification_import));
let specialization = DummySpecialization { };
let sync = Protocol::new(
config.clone(),
@@ -707,3 +710,62 @@ impl TestNetFactory for TestNet {
self.started = new;
}
}
pub struct ForceFinalized(Arc<PeersClient>);
impl JustificationImport<Block> for ForceFinalized {
type Error = ConsensusError;
fn import_justification(
&self,
hash: H256,
_number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.0.finalize_block(BlockId::Hash(hash), Some(justification), true)
.map_err(|_| ConsensusErrorKind::InvalidJustification.into())
}
}
pub struct JustificationTestNet(TestNet);
impl TestNetFactory for JustificationTestNet {
type Verifier = PassThroughVerifier;
type PeerData = ();
fn from_config(config: &ProtocolConfig) -> Self {
JustificationTestNet(TestNet::from_config(config))
}
fn make_verifier(&self, client: Arc<PeersClient>, config: &ProtocolConfig)
-> Arc<Self::Verifier>
{
self.0.make_verifier(client, config)
}
fn peer(&self, i: usize) -> &Peer<Self::Verifier, ()> {
self.0.peer(i)
}
fn peers(&self) -> &Vec<Arc<Peer<Self::Verifier, ()>>> {
self.0.peers()
}
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::Verifier, ()>>>)>(&mut self, closure: F ) {
self.0.mut_peers(closure)
}
fn started(&self) -> bool {
self.0.started()
}
fn set_started(&mut self, new: bool) {
self.0.set_started(new)
}
fn make_block_import(&self, client: Arc<PeersClient>)
-> (SharedBlockImport<Block>, Option<SharedJustificationImport<Block>>, Self::PeerData)
{
(client.clone(), Some(Arc::new(ForceFinalized(client))), Default::default())
}
}
+1 -1
View File
@@ -68,7 +68,7 @@ fn sync_no_common_longer_chain_fails() {
#[test]
fn sync_justifications() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
let mut net = JustificationTestNet::new(3);
net.peer(0).push_blocks(20, false);
net.sync();
+8 -4
View File
@@ -120,23 +120,25 @@ construct_service_factory! {
{ |config, executor| <LightComponents<Factory>>::new(config, executor) },
FullImportQueue = AuraImportQueue<
Self::Block,
grandpa::BlockImportForService<Self>,
FullClient<Self>,
NothingExtra,
>
{ |config: &mut FactoryFullConfiguration<Self> , client: Arc<FullClient<Self>>| {
{ |config: &mut FactoryFullConfiguration<Self>, client: Arc<FullClient<Self>>| {
let slot_duration = SlotDuration::get_or_compute(&*client)?;
let (block_import, link_half) =
grandpa::block_import::<_, _, _, RuntimeApi, FullClient<Self>>(
client.clone(), client
client.clone(), client.clone()
)?;
let block_import = Arc::new(block_import);
let justification_import = block_import.clone();
config.custom.grandpa_import_setup = Some((block_import.clone(), link_half));
import_queue(
slot_duration,
block_import,
Some(justification_import),
client,
NothingExtra,
config.custom.inherent_data_providers.clone(),
).map_err(Into::into)
@@ -149,6 +151,8 @@ construct_service_factory! {
{ |config: &FactoryFullConfiguration<Self>, client: Arc<LightClient<Self>>| {
import_queue(
SlotDuration::get_or_compute(&*client)?,
client.clone(),
None,
client,
NothingExtra,
config.custom.inherent_data_providers.clone(),