diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 29d0eef3e5..30cccc214e 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -1106,7 +1106,6 @@ impl CallRuntimeAt for Client where } } - impl consensus::BlockImport for Client where B: backend::Backend, E: CallExecutor + Clone + Send + Sync, @@ -1179,19 +1178,6 @@ impl consensus::BlockImport for Client ); result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) } - - /// Import a block justification and finalize the block. The justification - /// isn't interpreted by the client and is assumed to have been validated - /// previously. The block is finalized unconditionally. - fn import_justification( - &self, - hash: Block::Hash, - _number: NumberFor, - justification: Justification, - ) -> Result<(), Self::Error> { - self.finalize_block(BlockId::Hash(hash), Some(justification), true) - .map_err(|_| ConsensusErrorKind::InvalidJustification.into()) - } } impl consensus::Authorities for Client where diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 8aed003c51..40e0852276 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -63,9 +63,9 @@ use std::{sync::{Arc, mpsc}, time::Duration, thread}; use codec::Encode; use consensus_common::{ - Authorities, BlockImport, Environment, Error as ConsensusError, Proposer, ForkChoiceStrategy + Authorities, BlockImport, Environment, Proposer, ForkChoiceStrategy }; -use consensus_common::import_queue::{Verifier, BasicQueue}; +use consensus_common::import_queue::{Verifier, BasicQueue, SharedBlockImport, SharedJustificationImport}; use client::ChainHead; use client::block_builder::api::BlockBuilder as BlockBuilderApi; use consensus_common::{ImportBlock, BlockOrigin}; @@ -542,7 +542,7 @@ impl ExtraVerification for NothingExtra { } impl Verifier for AuraVerifier where - C: Authorities + BlockImport + ProvideRuntimeApi + Send + Sync, + C: Authorities + ProvideRuntimeApi + Send + Sync, C::Api: BlockBuilderApi, DigestItemFor: CompatibleDigestItem + DigestItem, E: ExtraVerification, @@ -681,12 +681,14 @@ fn register_aura_inherent_data_provider( /// Start an import queue for the Aura consensus algorithm. pub fn import_queue( slot_duration: SlotDuration, + block_import: SharedBlockImport, + justification_import: Option>, client: Arc, extra: E, inherent_data_providers: InherentDataProviders, ) -> Result, consensus_common::Error> where B: Block, - C: Authorities + BlockImport + ProvideRuntimeApi + Send + Sync, + C: Authorities + ProvideRuntimeApi + Send + Sync, C::Api: BlockBuilderApi, DigestItemFor: CompatibleDigestItem + DigestItem, E: ExtraVerification, @@ -696,7 +698,7 @@ pub fn import_queue( let verifier = Arc::new( AuraVerifier { client: client.clone(), extra, inherent_data_providers } ); - Ok(BasicQueue::new(verifier, client)) + Ok(BasicQueue::new(verifier, block_import, justification_import)) } #[cfg(test)] diff --git a/substrate/core/consensus/common/src/block_import.rs b/substrate/core/consensus/common/src/block_import.rs index 4170662721..7eac5adc92 100644 --- a/substrate/core/consensus/common/src/block_import.rs +++ b/substrate/core/consensus/common/src/block_import.rs @@ -144,15 +144,20 @@ impl ImportBlock { pub trait BlockImport { type Error: ::std::error::Error + Send + 'static; - /// Called by the import queue when it is started. - fn on_start(&self, _link: &::import_queue::Link) { } - /// Import a Block alongside the new authorities valid from this block forward fn import_block( &self, block: ImportBlock, new_authorities: Option>>, ) -> Result; +} + +/// Justification import trait +pub trait JustificationImport { + type Error: ::std::error::Error + Send + 'static; + + /// Called by the import queue when it is started. + fn on_start(&self, _link: &::import_queue::Link) { } /// Import a Block justification and finalize the given block. fn import_justification( diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 3660a59b03..eb601a8b56 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -24,7 +24,7 @@ //! The `BasicQueue` and `BasicVerifier` traits allow serial queues to be //! instantiated simply. -use block_import::{ImportBlock, BlockImport, ImportResult, BlockOrigin}; +use block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin}; use std::collections::{HashSet, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -38,6 +38,9 @@ use error::Error as ConsensusError; /// Shared block import struct used by the queue. pub type SharedBlockImport = Arc + Send + Sync>; +/// Shared justification import struct used by the queue. +pub type SharedJustificationImport = Arc + Send + Sync>; + /// Maps to the Origin used by the network. pub type Origin = usize; @@ -111,6 +114,7 @@ pub struct BasicQueue> { data: Arc>, verifier: Arc, block_import: SharedBlockImport, + justification_import: Option>, } /// Locks order: queue, queue_blocks, best_importing_number @@ -123,13 +127,14 @@ pub struct AsyncImportQueueData { } impl> BasicQueue { - /// Instantiate a new basic queue, with given verifier. - pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { + /// Instantiate a new basic queue, with given verifier and justification import. + pub fn new(verifier: Arc, block_import: SharedBlockImport, justification_import: Option>) -> Self { Self { handle: Mutex::new(None), data: Arc::new(AsyncImportQueueData::new()), verifier, block_import, + justification_import, } } } @@ -162,8 +167,11 @@ impl> ImportQueue for BasicQueue { let qdata = self.data.clone(); let verifier = self.verifier.clone(); let block_import = self.block_import.clone(); + let justification_import = self.justification_import.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { - block_import.on_start(&link); + if let Some(justification_import) = justification_import.as_ref() { + justification_import.on_start(&link); + } import_thread(block_import, link, qdata, verifier) })?); Ok(()) @@ -223,7 +231,9 @@ impl> ImportQueue for BasicQueue { } fn import_justification(&self, hash: B::Hash, number: NumberFor, justification: Justification) -> bool { - self.block_import.import_justification(hash, number, justification).is_ok() + self.justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification).is_ok() + }).unwrap_or(false) } } diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 861eaf9554..def4d2d962 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -60,7 +60,7 @@ pub mod evaluation; const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; pub use self::error::{Error, ErrorKind}; -pub use block_import::{BlockImport, ImportBlock, BlockOrigin, ImportResult, ForkChoiceStrategy}; +pub use block_import::{BlockImport, JustificationImport, ImportBlock, BlockOrigin, ImportResult, ForkChoiceStrategy}; /// Trait for getting the authorities at a given block. pub trait Authorities { diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index ba7888e0d7..2eb6bc3023 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -91,7 +91,7 @@ use client::{ }; use client::blockchain::HeaderBackend; use codec::{Encode, Decode}; -use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities}; +use consensus_common::{BlockImport, JustificationImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult}; use runtime_primitives::Justification; use runtime_primitives::traits::{ NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, @@ -992,7 +992,7 @@ pub struct GrandpaBlockImport, RA, PRA> { api: Arc, } -impl, RA, PRA> BlockImport +impl, RA, PRA> JustificationImport for GrandpaBlockImport where NumberFor: grandpa::BlockNumberOps, B: Backend + 'static, @@ -1032,6 +1032,29 @@ impl, RA, PRA> BlockImport } } + fn import_justification( + &self, + hash: Block::Hash, + number: NumberFor, + justification: Justification, + ) -> Result<(), Self::Error> { + self.import_justification(hash, number, justification, false) + } +} + +impl, RA, PRA> BlockImport + for GrandpaBlockImport where + NumberFor: grandpa::BlockNumberOps, + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + DigestFor: Encode, + DigestItemFor: DigestItem, + RA: Send + Sync, + PRA: ProvideRuntimeApi, + PRA::Api: GrandpaApi, +{ + type Error = ConsensusError; + fn import_block(&self, mut block: ImportBlock, new_authorities: Option>) -> Result { @@ -1160,15 +1183,6 @@ impl, RA, PRA> BlockImport Ok(import_result) } - - fn import_justification( - &self, - hash: Block::Hash, - number: NumberFor, - justification: Justification, - ) -> Result<(), Self::Error> { - self.import_justification(hash, number, justification, false) - } } impl, RA, PRA> @@ -1275,32 +1289,6 @@ fn canonical_at_height, RA>( Ok(Some(current.hash())) } -impl, RA, PRA> Authorities for GrandpaBlockImport -where - B: Backend + 'static, - E: CallExecutor + 'static + Clone + Send + Sync, - DigestItemFor: DigestItem, -{ - - type Error = as Authorities>::Error; - fn authorities(&self, at: &BlockId) -> Result, Self::Error> { - self.inner.authorities_at(at) - } -} - -impl, RA, PRA> ProvideRuntimeApi for GrandpaBlockImport -where - B: Backend + 'static, - E: CallExecutor + 'static + Clone + Send + Sync, - PRA: ProvideRuntimeApi, -{ - type Api = PRA::Api; - - fn runtime_api<'a>(&'a self) -> ::runtime_primitives::traits::ApiRef<'a, Self::Api> { - self.api.runtime_api() - } -} - /// Half of a link between a block-import worker and a the background voter. // This should remain non-clone. pub struct LinkHalf, RA> { diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index b3f1d1e11c..96059eb46b 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -30,7 +30,8 @@ use client::{ }; use test_client::{self, runtime::BlockNumber}; use codec::Decode; -use consensus_common::{BlockOrigin, Error as ConsensusError}; +use consensus_common::BlockOrigin; +use consensus_common::import_queue::{SharedBlockImport, SharedJustificationImport}; use std::{collections::HashSet, result}; use runtime_primitives::traits::{ApiRef, ProvideRuntimeApi, RuntimeApiInfo}; use runtime_primitives::generic::BlockId; @@ -100,13 +101,14 @@ impl TestNetFactory for GrandpaTestNet { } fn make_block_import(&self, client: Arc) - -> (Arc + Send + Sync>, PeerData) + -> (SharedBlockImport, Option>, PeerData) { let (import, link) = block_import( client, Arc::new(self.test_config.clone()) ).expect("Could not create block import for fresh peer."); - (Arc::new(import), Mutex::new(Some(link))) + let shared_import = Arc::new(import); + (shared_import.clone(), Some(shared_import), Mutex::new(Some(link))) } fn peer(&self, i: usize) -> &GrandpaPeer { diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index 6104be87f0..8ab883124c 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -164,7 +164,7 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier, Arc::new(test_client::new())); + let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None); queue.start(TestLink::new()).unwrap(); drop(queue); } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 13fc86d1a5..72671d7240 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use parking_lot::RwLock; use client; use client::block_builder::BlockBuilder; -use primitives::Ed25519AuthorityId; +use primitives::{H256, Ed25519AuthorityId}; use runtime_primitives::Justification; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero}; @@ -38,10 +38,9 @@ use service::{NetworkLink, TransactionPool}; use network_libp2p::{NodeIndex, PeerId, Severity}; use keyring::Keyring; use codec::Encode; -use consensus::{BlockImport, BlockOrigin, ImportBlock, ForkChoiceStrategy}; -use consensus::Error as ConsensusError; +use consensus::{BlockOrigin, ImportBlock, JustificationImport, ForkChoiceStrategy, Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock}; -use consensus::import_queue::{Link, SharedBlockImport, Verifier}; +use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier}; use specialization::NetworkSpecialization; use consensus_gossip::ConsensusGossip; use service::ExecuteInContext; @@ -121,17 +120,19 @@ pub struct SyncImportQueue> { verifier: Arc, link: ImportCB, block_import: SharedBlockImport, + justification_import: Option>, } #[cfg(any(test, feature = "test-helpers"))] impl> SyncImportQueue { /// Create a new SyncImportQueue wrapping the given Verifier and block import /// handle. - pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { + pub fn new(verifier: Arc, block_import: SharedBlockImport, justification_import: Option>) -> Self { let queue = SyncImportQueue { verifier, link: ImportCB::new(), block_import, + justification_import, }; let v = queue.verifier.clone(); @@ -197,7 +198,9 @@ impl> ImportQueue for SyncImpor number: NumberFor, justification: Justification, ) -> bool { - self.block_import.import_justification(hash, number, justification).is_ok() + self.justification_import.as_ref().map(|justification_import| { + justification_import.import_justification(hash, number, justification).is_ok() + }).unwrap_or(false) } } @@ -503,9 +506,9 @@ pub trait TestNetFactory: Sized { /// Get custom block import handle for fresh client, along with peer data. fn make_block_import(&self, client: Arc) - -> (Arc + Send + Sync>, Self::PeerData) + -> (SharedBlockImport, Option>, Self::PeerData) { - (client, Default::default()) + (client, None, Default::default()) } fn default_config() -> ProtocolConfig { @@ -528,9 +531,9 @@ pub trait TestNetFactory: Sized { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(client.clone(), config); - let (block_import, data) = self.make_block_import(client.clone()); + let (block_import, justification_import, data) = self.make_block_import(client.clone()); - let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import)); + let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import, justification_import)); let specialization = DummySpecialization { }; let sync = Protocol::new( config.clone(), @@ -707,3 +710,62 @@ impl TestNetFactory for TestNet { self.started = new; } } + +pub struct ForceFinalized(Arc); + +impl JustificationImport for ForceFinalized { + type Error = ConsensusError; + + fn import_justification( + &self, + hash: H256, + _number: NumberFor, + justification: Justification, + ) -> Result<(), Self::Error> { + self.0.finalize_block(BlockId::Hash(hash), Some(justification), true) + .map_err(|_| ConsensusErrorKind::InvalidJustification.into()) + } +} + +pub struct JustificationTestNet(TestNet); + +impl TestNetFactory for JustificationTestNet { + type Verifier = PassThroughVerifier; + type PeerData = (); + + fn from_config(config: &ProtocolConfig) -> Self { + JustificationTestNet(TestNet::from_config(config)) + } + + fn make_verifier(&self, client: Arc, config: &ProtocolConfig) + -> Arc + { + self.0.make_verifier(client, config) + } + + fn peer(&self, i: usize) -> &Peer { + self.0.peer(i) + } + + fn peers(&self) -> &Vec>> { + self.0.peers() + } + + fn mut_peers>>)>(&mut self, closure: F ) { + self.0.mut_peers(closure) + } + + fn started(&self) -> bool { + self.0.started() + } + + fn set_started(&mut self, new: bool) { + self.0.set_started(new) + } + + fn make_block_import(&self, client: Arc) + -> (SharedBlockImport, Option>, Self::PeerData) + { + (client.clone(), Some(Arc::new(ForceFinalized(client))), Default::default()) + } +} diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 72d105a499..54cf157eef 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -68,7 +68,7 @@ fn sync_no_common_longer_chain_fails() { #[test] fn sync_justifications() { ::env_logger::init().ok(); - let mut net = TestNet::new(3); + let mut net = JustificationTestNet::new(3); net.peer(0).push_blocks(20, false); net.sync(); diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index dd2b14e74e..eb0e5a4f80 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -120,23 +120,25 @@ construct_service_factory! { { |config, executor| >::new(config, executor) }, FullImportQueue = AuraImportQueue< Self::Block, - grandpa::BlockImportForService, + FullClient, NothingExtra, > - { |config: &mut FactoryFullConfiguration , client: Arc>| { + { |config: &mut FactoryFullConfiguration, client: Arc>| { let slot_duration = SlotDuration::get_or_compute(&*client)?; - let (block_import, link_half) = grandpa::block_import::<_, _, _, RuntimeApi, FullClient>( - client.clone(), client + client.clone(), client.clone() )?; let block_import = Arc::new(block_import); + let justification_import = block_import.clone(); config.custom.grandpa_import_setup = Some((block_import.clone(), link_half)); import_queue( slot_duration, block_import, + Some(justification_import), + client, NothingExtra, config.custom.inherent_data_providers.clone(), ).map_err(Into::into) @@ -149,6 +151,8 @@ construct_service_factory! { { |config: &FactoryFullConfiguration, client: Arc>| { import_queue( SlotDuration::get_or_compute(&*client)?, + client.clone(), + None, client, NothingExtra, config.custom.inherent_data_providers.clone(),