diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index d425f1b691..ff4f995a8c 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2099,6 +2099,15 @@ dependencies = [ "parking_lot_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "parking_lot" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lock_api 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "parking_lot_core" version = "0.2.14" @@ -2122,6 +2131,18 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "parking_lot_core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -3406,11 +3427,14 @@ version = "0.1.0" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "sr-version 0.1.0", "substrate-primitives 0.1.0", + "substrate-test-client 0.1.0", "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4857,8 +4881,10 @@ dependencies = [ "checksum parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" "checksum parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d4d05f1349491390b1730afba60bb20d55761bef489a954546b58b4b34e1e2ac" "checksum parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0802bff09003b291ba756dc7e79313e51cc31667e94afbe847def490424cde5" +"checksum parking_lot 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9723236a9525c757d9725b993511e3fc941e33f27751942232f0058298297edf" "checksum parking_lot_core 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" "checksum parking_lot_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad7f7e6ebdc79edff6fdcb87a55b620174f7a989e3eb31b65231f4af57f00b8c" +"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9" "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 647e48405a..7f8c81d5ab 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -25,7 +25,7 @@ use runtime_primitives::{ Justification, generic::{BlockId, SignedBlock}, }; -use consensus::{ImportBlock, ImportResult, BlockOrigin}; +use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, BlockOrigin}; use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash, ApiRef, ProvideRuntimeApi, Digest, DigestItem, @@ -1019,7 +1019,7 @@ impl consensus::BlockImport for Client E: CallExecutor + Clone + Send + Sync, Block: BlockT, { - type Error = Error; + type Error = ConsensusError; /// Import a checked and validated block. If a justification is provided in /// `ImportBlock` then `finalized` *must* be true. @@ -1044,9 +1044,10 @@ impl consensus::BlockImport for Client let parent_hash = header.parent_hash().clone(); - match self.backend.blockchain().status(BlockId::Hash(parent_hash))? { - blockchain::BlockStatus::InChain => {}, - blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + match self.backend.blockchain().status(BlockId::Hash(parent_hash)) { + Ok(blockchain::BlockStatus::InChain) => {}, + Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent), + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()) } let import_headers = if post_digests.is_empty() { @@ -1081,7 +1082,7 @@ impl consensus::BlockImport for Client "best" => ?hash, "origin" => ?origin ); - result.map_err(|e| e.into()) + result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) } } diff --git a/substrate/core/consensus/aura/Cargo.toml b/substrate/core/consensus/aura/Cargo.toml index 7994afc984..dee59d8f06 100644 --- a/substrate/core/consensus/aura/Cargo.toml +++ b/substrate/core/consensus/aura/Cargo.toml @@ -21,14 +21,11 @@ parking_lot = "0.4" error-chain = "0.12" log = "0.3" substrate-consensus-common = { path = "../common" } -substrate-network = { path = "../../network" } [dev-dependencies] substrate-keyring = { path = "../../keyring" } substrate-executor = { path = "../../executor" } +substrate-network = { path = "../../network", features = ["test-helpers"]} substrate-service = { path = "../../service" } substrate-test-client = { path = "../../test-client" } env_logger = "0.4" - -[target.'cfg(test)'.dependencies] -substrate-network = { path = "../../network", features = ["test-helpers"], optional = true } diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index e6a16df79b..911ae6304a 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -37,7 +37,6 @@ extern crate substrate_consensus_aura_primitives as aura_primitives; extern crate substrate_consensus_common as consensus_common; extern crate tokio; extern crate sr_version as runtime_version; -extern crate substrate_network as network; extern crate parking_lot; #[macro_use] @@ -48,6 +47,8 @@ extern crate futures; #[cfg(test)] extern crate substrate_keyring as keyring; #[cfg(test)] +extern crate substrate_network as network; +#[cfg(test)] extern crate substrate_service as service; #[cfg(test)] extern crate substrate_test_client as test_client; @@ -60,13 +61,13 @@ use std::sync::Arc; use std::time::Duration; use codec::Encode; -use consensus_common::{Authorities, BlockImport, Environment, Proposer}; +use consensus_common::{Authorities, BlockImport, Environment, Error as ConsensusError, Proposer}; +use consensus_common::import_queue::{Verifier, BasicQueue}; use client::ChainHead; use client::block_builder::api::BlockBuilder as BlockBuilderApi; use consensus_common::{ImportBlock, BlockOrigin}; use runtime_primitives::{generic, generic::BlockId, Justification, BasicInherentData}; use runtime_primitives::traits::{Block, Header, Digest, DigestItemFor, ProvideRuntimeApi}; -use network::import_queue::{Verifier, BasicQueue}; use primitives::{AuthorityId, ed25519}; use futures::{Stream, Future, IntoFuture, future::{self, Either}}; @@ -214,7 +215,6 @@ pub fn start_aura( Error: ::std::error::Error + Send + 'static + From<::consensus_common::Error>, { let make_authorship = move || { - use futures::future; let client = client.clone(); let pair = local_key.clone(); @@ -596,7 +596,7 @@ pub fn import_queue( make_inherent: MakeInherent, ) -> AuraImportQueue where B: Block, - C: Authorities + BlockImport + ProvideRuntimeApi + Send + Sync, + C: Authorities + BlockImport + ProvideRuntimeApi + Send + Sync, C::Api: BlockBuilderApi, DigestItemFor: CompatibleDigestItem, E: ExtraVerification, diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index 08689721a0..7f95b39a14 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -5,6 +5,8 @@ authors = ["Parity Technologies "] description = "Common utilities for substrate consensus" [dependencies] +log = "0.4" +parking_lot = "0.7" substrate-primitives = { path= "../../primitives" } error-chain = "0.12" futures = "0.1" @@ -13,3 +15,6 @@ sr-primitives = { path = "../../sr-primitives" } tokio = "0.1.7" parity-codec = "2.1" parity-codec-derive = "2.0" + +[dev-dependencies] +substrate-test-client = { path = "../../test-client" } diff --git a/substrate/core/consensus/common/src/error.rs b/substrate/core/consensus/common/src/error.rs index ccf57adb9f..14a3fb81e1 100644 --- a/substrate/core/consensus/common/src/error.rs +++ b/substrate/core/consensus/common/src/error.rs @@ -84,5 +84,11 @@ error_chain! { description("Other error") display("Other error: {}", e.description()) } + + /// Error from the client while importing + ClientImport(reason: String) { + description("Import failed"), + display("Import failed: {}", reason), + } } } diff --git a/substrate/core/network/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs similarity index 51% rename from substrate/core/network/src/import_queue.rs rename to substrate/core/consensus/common/src/import_queue.rs index 58919ea75a..3a048f781a 100644 --- a/substrate/core/network/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -24,30 +24,38 @@ //! The `BasicQueue` and `BasicVerifier` traits allow serial queues to be //! instantiated simply. +use block_import::{ImportBlock, BlockImport, ImportResult, BlockOrigin}; use std::collections::{HashSet, VecDeque}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use parking_lot::{Condvar, Mutex, RwLock}; -use network_libp2p::{NodeIndex, Severity}; use primitives::AuthorityId; use runtime_primitives::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; -pub use blocks::BlockData; -use client::error::Error as ClientError; -use error::{ErrorKind, Error}; -use protocol::Context; -use service::ExecuteInContext; -use sync::ChainSync; - -pub use consensus::{ImportBlock, BlockImport, ImportResult, BlockOrigin}; +use error::Error as ConsensusError; /// Shared block import struct used by the queue. -pub type SharedBlockImport = Arc + Send + Sync>; +pub type SharedBlockImport = Arc + Send + Sync>; -#[cfg(any(test, feature = "test-helpers"))] -use std::cell::RefCell; +/// Maps to the Origin used by the network. +pub type Origin = usize; + +/// Block data used by the queue. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct IncomingBlock { + /// Block header hash. + pub hash: ::Hash, + /// Block header if requested. + pub header: Option<::Header>, + /// Block body if requested. + pub body: Option::Extrinsic>>, + /// Justification if requested. + pub justification: Option, + /// The peer, we received this from + pub origin: Option, +} /// Verify a justification of a block pub trait Verifier: Send + Sync + Sized { @@ -69,7 +77,7 @@ pub trait ImportQueue: Send + Sync { /// /// This is called automatically by the network service when synchronization /// begins. - fn start(&self, _link: L) -> Result<(), Error> where + fn start(&self, _link: L) -> Result<(), std::io::Error> where Self: Sized, L: 'static + Link, { @@ -84,7 +92,7 @@ pub trait ImportQueue: Send + Sync { /// Is block with given hash currently in the queue. fn is_importing(&self, hash: &B::Hash) -> bool; /// Import bunch of blocks. - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); + fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); } /// Import queue status. It isn't completely accurate. @@ -105,9 +113,9 @@ pub struct BasicQueue> { } /// Locks order: queue, queue_blocks, best_importing_number -struct AsyncImportQueueData { +pub struct AsyncImportQueueData { signal: Condvar, - queue: Mutex>)>>, + queue: Mutex>)>>, queue_blocks: RwLock>, best_importing_number: RwLock<<::Header as HeaderT>::Number>, is_stopping: AtomicBool, @@ -126,7 +134,8 @@ impl> BasicQueue { } impl AsyncImportQueueData { - fn new() -> Self { + /// Instantiate a new async import queue data. + pub fn new() -> Self { Self { signal: Default::default(), queue: Mutex::new(VecDeque::new()), @@ -135,13 +144,18 @@ impl AsyncImportQueueData { is_stopping: Default::default(), } } + + // Signals to stop importing new blocks. + pub fn stop(&self) { + self.is_stopping.store(true, Ordering::SeqCst); + } } impl> ImportQueue for BasicQueue { fn start>( &self, link: L, - ) -> Result<(), Error> { + ) -> Result<(), std::io::Error> { debug_assert!(self.handle.lock().is_none()); let qdata = self.data.clone(); @@ -149,7 +163,7 @@ impl> ImportQueue for BasicQueue { let block_import = self.block_import.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { import_thread(block_import, link, qdata, verifier) - }).map_err(|err| Error::from(ErrorKind::Io(err)))?); + })?); Ok(()) } @@ -168,7 +182,7 @@ impl> ImportQueue for BasicQueue { { // Perform storing the stop flag and signalling under a single lock. let _queue_lock = self.data.queue.lock(); - self.data.is_stopping.store(true, Ordering::SeqCst); + self.data.stop(); self.data.signal.notify_one(); } @@ -187,7 +201,7 @@ impl> ImportQueue for BasicQueue { self.data.queue_blocks.read().contains(hash) } - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return; } @@ -197,8 +211,8 @@ impl> ImportQueue for BasicQueue { let mut queue = self.data.queue.lock(); let mut queue_blocks = self.data.queue_blocks.write(); let mut best_importing_number = self.data.best_importing_number.write(); - let new_best_importing_number = blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero()); - queue_blocks.extend(blocks.iter().map(|b| b.block.hash.clone())); + let new_best_importing_number = blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero()); + queue_blocks.extend(blocks.iter().map(|b| b.hash.clone())); if new_best_importing_number > *best_importing_number { *best_importing_number = new_best_importing_number; } @@ -241,7 +255,7 @@ fn import_thread, V: Verifier>( } }; - let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); + let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.hash.clone()).collect(); if !import_many_blocks( &*block_import, &link, @@ -269,68 +283,16 @@ pub trait Link: Send { /// Maintain sync. fn maintain_sync(&self) { } /// Disconnect from peer. - fn useless_peer(&self, _who: NodeIndex, _reason: &str) { } + fn useless_peer(&self, _who: Origin, _reason: &str) { } /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&self, _who: NodeIndex, _reason: &str) { } + fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) { } /// Restart sync. fn restart(&self) { } } -/// A link implementation that does nothing. -pub struct NoopLink; - -impl Link for NoopLink { } - -/// A link implementation that connects to the network. -pub struct NetworkLink> { - /// The chain-sync handle - pub(crate) sync: Weak>>, - /// Network context. - pub(crate) context: Weak, -} - -impl> NetworkLink { - /// Execute closure with locked ChainSync. - fn with_sync, &mut Context)>(&self, closure: F) { - if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) { - service.execute_in_context(move |protocol| { - let mut sync = sync.write(); - closure(&mut *sync, protocol) - }); - } - } -} - -impl> Link for NetworkLink { - fn block_imported(&self, hash: &B::Hash, number: NumberFor) { - self.with_sync(|sync, _| sync.block_imported(&hash, number)) - } - - fn maintain_sync(&self) { - self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) - } - - fn useless_peer(&self, who: NodeIndex, reason: &str) { - trace!(target:"sync", "Useless peer {}, {}", who, reason); - self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) - } - - fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { - trace!(target:"sync", "Bad peer {}, {}", who, reason); - self.with_sync(|sync, protocol| { - protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? - sync.restart(protocol); - }) - } - - fn restart(&self) { - self.with_sync(|sync, protocol| sync.restart(protocol)) - } -} - /// Block import successful result. #[derive(Debug, PartialEq)] -enum BlockImportResult { +pub enum BlockImportResult { /// Imported known block. ImportedKnown(H, N), /// Imported unknown block. @@ -339,13 +301,13 @@ enum BlockImportResult), + IncompleteHeader(Option), /// Block verification failed, can't be imported - VerificationFailed(Option, String), + VerificationFailed(Option, String), /// Block is known to be Bad - BadBlock(Option), + BadBlock(Option), /// Block has an unknown parent UnknownParent, /// Other Error. @@ -353,11 +315,11 @@ enum BlockImportError { } /// Import a bunch of blocks. -fn import_many_blocks<'a, B: BlockT, V: Verifier>( - import_handle: &BlockImport, +pub fn import_many_blocks<'a, B: BlockT, V: Verifier>( + import_handle: &BlockImport, link: &Link, qdata: Option<&AsyncImportQueueData>, - blocks: (BlockOrigin, Vec>), + blocks: (BlockOrigin, Vec>), verifier: Arc ) -> bool { @@ -366,8 +328,8 @@ fn import_many_blocks<'a, B: BlockT, V: Verifier>( let mut imported = 0; let blocks_range = match ( - blocks.first().and_then(|b| b.block.header.as_ref().map(|h| h.number())), - blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number())), + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), ) { (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), (Some(first), Some(_)) => format!(" ({})", first), @@ -401,15 +363,14 @@ fn import_many_blocks<'a, B: BlockT, V: Verifier>( } /// Single block import function. -fn import_single_block>( - import_handle: &BlockImport, +pub fn import_single_block>( + import_handle: &BlockImport, block_origin: BlockOrigin, - block: BlockData, + block: IncomingBlock, verifier: Arc ) -> Result::Header as HeaderT>::Number>, BlockImportError> { let peer = block.origin; - let block = block.block; let (header, justification) = match (block.header, block.justification) { (Some(header), justification) => (header, justification), @@ -465,7 +426,7 @@ fn import_single_block>( } /// Process single block import result. -fn process_import_result( +pub fn process_import_result( link: &Link, result: Result::Header as HeaderT>::Number>, BlockImportError> ) -> usize @@ -503,291 +464,3 @@ fn process_import_result( }, } } - - -#[cfg(any(test, feature = "test-helpers"))] -struct ImportCB(RefCell>) -> bool>>>); - -#[cfg(any(test, feature = "test-helpers"))] -impl ImportCB { - fn new() -> Self { - ImportCB(RefCell::new(None)) - } - fn set(&self, cb: Box) - where F: 'static + Fn(BlockOrigin, Vec>) -> bool - { - *self.0.borrow_mut() = Some(cb); - } - fn call(&self, origin: BlockOrigin, data: Vec>) -> bool { - let b = self.0.borrow(); - b.as_ref().expect("The Callback has been set before. qed.")(origin, data) - } -} - -#[cfg(any(test, feature = "test-helpers"))] -unsafe impl Send for ImportCB {} -#[cfg(any(test, feature = "test-helpers"))] -unsafe impl Sync for ImportCB {} - - -#[cfg(any(test, feature = "test-helpers"))] -/// A Verifier that accepts all blocks and passes them on with the configured -/// finality to be imported. -pub struct PassThroughVerifier(pub bool); - -#[cfg(any(test, feature = "test-helpers"))] -/// This Verifiyer accepts all data as valid -impl Verifier for PassThroughVerifier { - fn verify( - &self, - origin: BlockOrigin, - header: B::Header, - justification: Option, - body: Option> - ) -> Result<(ImportBlock, Option>), String> { - Ok((ImportBlock { - origin, - header, - body, - finalized: self.0, - justification, - post_digests: vec![], - auxiliary: Vec::new(), - }, None)) - } -} - -/// Blocks import queue that is importing blocks in the same thread. -/// The boolean value indicates whether blocks should be imported without instant finality. -#[cfg(any(test, feature = "test-helpers"))] -pub struct SyncImportQueue> { - verifier: Arc, - link: ImportCB, - block_import: SharedBlockImport, -} - -#[cfg(any(test, feature = "test-helpers"))] -impl> SyncImportQueue { - /// Create a new SyncImportQueue wrapping the given Verifier and block import - /// handle. - pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { - let queue = SyncImportQueue { - verifier, - link: ImportCB::new(), - block_import, - }; - - let v = queue.verifier.clone(); - let import_handle = queue.block_import.clone(); - queue.link.set(Box::new(move |origin, new_blocks| { - let verifier = v.clone(); - import_many_blocks( - &*import_handle, - &NoopLink, - None, - (origin, new_blocks), - verifier, - ) - })); - - queue - } -} - -#[cfg(any(test, feature = "test-helpers"))] -impl> ImportQueue for SyncImportQueue -{ - fn start>( - &self, - link: L, - ) -> Result<(), Error> { - let v = self.verifier.clone(); - let import_handle = self.block_import.clone(); - self.link.set(Box::new(move |origin, new_blocks| { - let verifier = v.clone(); - import_many_blocks( - &*import_handle, - &link, - None, - (origin, new_blocks), - verifier, - ) - })); - Ok(()) - } - fn clear(&self) { } - - fn stop(&self) { } - - fn status(&self) -> ImportQueueStatus { - ImportQueueStatus { - importing_count: 0, - best_importing_number: Zero::zero(), - } - } - - fn is_importing(&self, _hash: &B::Hash) -> bool { - false - } - - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { - self.link.call(origin, blocks); - } -} - -#[cfg(test)] -pub mod tests { - use client; - use message; - use test_client::{self, TestClient}; - use test_client::runtime::{Block, Hash}; - use runtime_primitives::generic::BlockId; - use std::cell::Cell; - use super::*; - - struct TestLink { - imported: Cell, - maintains: Cell, - disconnects: Cell, - restarts: Cell, - } - - impl TestLink { - fn new() -> TestLink { - TestLink { - imported: Cell::new(0), - maintains: Cell::new(0), - disconnects: Cell::new(0), - restarts: Cell::new(0), - } - } - - fn total(&self) -> usize { - self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get() - } - } - - impl Link for TestLink { - fn block_imported(&self, _hash: &Hash, _number: NumberFor) { - self.imported.set(self.imported.get() + 1); - } - fn maintain_sync(&self) { - self.maintains.set(self.maintains.get() + 1); - } - fn useless_peer(&self, _: NodeIndex, _: &str) { - self.disconnects.set(self.disconnects.get() + 1); - } - fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) { - self.useless_peer(id, r); - self.restart(); - } - fn restart(&self) { - self.restarts.set(self.restarts.get() + 1); - } - } - - fn prepare_good_block() -> (client::Client, Hash, u64, BlockData) { - let client = test_client::new(); - let block = client.new_block().unwrap().bake().unwrap(); - client.import(BlockOrigin::File, block).unwrap(); - - let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1); - let block = message::BlockData:: { - hash: client.block_hash(1).unwrap().unwrap(), - header: client.header(&BlockId::Number(1)).unwrap(), - body: None, - receipt: None, - message_queue: None, - justification: client.justification(&BlockId::Number(1)).unwrap(), - }; - - (client, hash, number, BlockData { block, origin: Some(0) }) - } - - #[test] - fn import_single_good_block_works() { - let (_, hash, number, block) = prepare_good_block(); - assert_eq!( - import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Ok(BlockImportResult::ImportedUnknown(hash, number)) - ); - } - - #[test] - fn import_single_good_known_block_is_ignored() { - let (client, hash, number, block) = prepare_good_block(); - assert_eq!( - import_single_block(&client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Ok(BlockImportResult::ImportedKnown(hash, number)) - ); - } - - #[test] - fn import_single_good_block_without_header_fails() { - let (_, _, _, mut block) = prepare_good_block(); - block.block.header = None; - assert_eq!( - import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Err(BlockImportError::IncompleteHeader(Some(0))) - ); - } - - #[test] - fn process_import_result_works() { - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - assert_eq!(link.imported.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); - assert_eq!(link.total(), 1); - assert_eq!(link.imported.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); - assert_eq!(link.total(), 1); - assert_eq!(link.disconnects.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::UnknownParent)), 0); - assert_eq!(link.total(), 1); - assert_eq!(link.restarts.get(), 1); - - let link = TestLink::new(); - assert_eq!(process_import_result::(&link, Err(BlockImportError::Error)), 0); - assert_eq!(link.total(), 1); - assert_eq!(link.restarts.get(), 1); - } - - #[test] - fn import_many_blocks_stops_when_stopping() { - let (_, _, _, block) = prepare_good_block(); - let qdata = AsyncImportQueueData::new(); - let verifier = Arc::new(PassThroughVerifier(true)); - qdata.is_stopping.store(true, Ordering::SeqCst); - let client = test_client::new(); - assert!(!import_many_blocks( - &client, - &mut TestLink::new(), - Some(&qdata), - (BlockOrigin::File, vec![block.clone(), block]), - verifier - )); - } - - #[test] - fn async_import_queue_drops() { - // Perform this test multiple times since it exhibits non-deterministic behavior. - for _ in 0..100 { - let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier, Arc::new(test_client::new())); - queue.start(TestLink::new()).unwrap(); - drop(queue); - } - } -} diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 76c370effd..875f4c216a 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -24,8 +24,11 @@ extern crate substrate_primitives as primitives; extern crate futures; +extern crate parking_lot; extern crate sr_version as runtime_version; extern crate sr_primitives as runtime_primitives; +#[cfg(any(test, feature = "test-helpers"))] +extern crate substrate_test_client as test_client; extern crate tokio; extern crate parity_codec as codec; @@ -33,6 +36,7 @@ extern crate parity_codec_derive; #[macro_use] extern crate error_chain; +#[macro_use] extern crate log; use std::sync::Arc; @@ -44,6 +48,7 @@ use futures::prelude::*; pub mod offline_tracker; pub mod error; mod block_import; +pub mod import_queue; pub mod evaluation; // block size limit. diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 3e699ba382..abd737d357 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -89,7 +89,7 @@ use client::{ }; use client::blockchain::HeaderBackend; use codec::{Encode, Decode}; -use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities}; +use consensus_common::{BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, Authorities}; use runtime_primitives::traits::{ NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, }; @@ -834,7 +834,7 @@ impl, RA, PRA> BlockImport PRA: ProvideRuntimeApi, PRA::Api: GrandpaApi, { - type Error = ClientError; + type Error = ConsensusError; fn import_block(&self, mut block: ImportBlock, new_authorities: Option>) -> Result @@ -847,7 +847,12 @@ impl, RA, PRA> BlockImport let maybe_change = self.api.runtime_api().grandpa_pending_change( &BlockId::hash(*block.header.parent_hash()), &block.header.digest().clone(), - )?; + ); + + let maybe_change = match maybe_change { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(maybe) => maybe, + }; // when we update the authorities, we need to hold the lock // until the block is written to prevent a race if we need to restore @@ -858,11 +863,9 @@ impl, RA, PRA> BlockImport let mut authorities = self.authority_set.inner().write(); let old_set = authorities.clone(); - let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> { + let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ConsensusError> { let error = || { - Err(ClientErrorKind::Backend( - "invalid authority set change: multiple pending changes on the same chain".to_string() - ).into()) + Err(ConsensusErrorKind::ClientImport("Incorrect base hash".to_string()).into()) }; if *base == hash { return error(); } @@ -872,7 +875,12 @@ impl, RA, PRA> BlockImport self.inner.backend().blockchain(), BlockId::Hash(parent_hash), BlockId::Hash(*base), - )?; + ); + + let tree_route = match tree_route { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(route) => route, + }; if tree_route.common_block().hash == *base { return error(); @@ -905,18 +913,25 @@ impl, RA, PRA> BlockImport *authorities = old_set; } e - })?; + }); - if import_result != ImportResult::Queued { - return Ok(import_result); - } + let import_result = match import_result { + Ok(ImportResult::Queued) => ImportResult::Queued, + Ok(r) => return Ok(r), + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + }; let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| { canonical_at_height(&self.inner, (hash, number), canon_number) - })?; + }); - if !enacts_change { - return Ok(import_result); + match enacts_change { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(enacted) => { + if !enacted { + return Ok(import_result); + } + } } match justification { @@ -925,7 +940,12 @@ impl, RA, PRA> BlockImport justification, self.authority_set.set_id(), &self.authority_set.current_authorities(), - )?; + ); + + let justification = match justification { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(justification) => justification, + }; let result = finalize_block( &*self.inner, @@ -943,16 +963,18 @@ impl, RA, PRA> BlockImport }, Err(ExitOrError::AuthoritiesChanged(new)) => { debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number); - if let Err(_) = self.authority_set_change.unbounded_send(new) { - return Err(ClientErrorKind::Backend( - "imported and finalized change block but grandpa voter is no longer running".to_string() - ).into()); + if let Err(e) = self.authority_set_change.unbounded_send(new) { + return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()); } }, - Err(ExitOrError::Error(_)) => { - return Err(ClientErrorKind::Backend( - "imported change block but failed to finalize it, node may be in an inconsistent state".to_string() - ).into()); + Err(ExitOrError::Error(e)) => { + match e { + Error::Grandpa(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), + Error::Network(error) => return Err(ConsensusErrorKind::ClientImport(error).into()), + Error::Blockchain(error) => return Err(ConsensusErrorKind::ClientImport(error).into()), + Error::Client(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), + Error::Timer(error) => return Err(ConsensusErrorKind::ClientImport(error.to_string()).into()), + } }, } }, diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index de6c4f1cb3..a357fa81da 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -18,7 +18,7 @@ use super::*; use network::test::{Block, Hash, TestNetFactory, Peer, PeersClient}; -use network::import_queue::{PassThroughVerifier}; +use network::test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles}; use parking_lot::Mutex; use tokio::runtime::current_thread; @@ -29,7 +29,7 @@ use client::{ }; use test_client::{self, runtime::BlockNumber}; use codec::Decode; -use consensus_common::BlockOrigin; +use consensus_common::{BlockOrigin, Error as ConsensusError}; use std::{collections::HashSet, result}; use runtime_primitives::traits::{ApiRef, ProvideRuntimeApi, RuntimeApiInfo}; use runtime_primitives::generic::BlockId; @@ -99,7 +99,7 @@ impl TestNetFactory for GrandpaTestNet { } fn make_block_import(&self, client: Arc) - -> (Arc + Send + Sync>, PeerData) + -> (Arc + Send + Sync>, PeerData) { let (import, link) = block_import( client, diff --git a/substrate/core/network/src/chain.rs b/substrate/core/network/src/chain.rs index 8547f9006e..166dfe55a7 100644 --- a/substrate/core/network/src/chain.rs +++ b/substrate/core/network/src/chain.rs @@ -19,7 +19,7 @@ use client::{self, Client as SubstrateClient, ClientInfo, BlockStatus, CallExecutor}; use client::error::Error; use client::light::fetcher::ChangesProof; -use consensus::BlockImport; +use consensus::{BlockImport, Error as ConsensusError}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::{BlockId}; use consensus::{ImportBlock, ImportResult}; @@ -30,7 +30,7 @@ use primitives::{H256, Blake2Hasher, AuthorityId}; pub trait Client: Send + Sync { /// Import a new block. Parent is supposed to be existing in the blockchain. fn import(&self, block: ImportBlock, new_authorities: Option>) - -> Result; + -> Result; /// Get blockchain info. fn info(&self) -> Result, Error>; @@ -73,12 +73,12 @@ pub trait Client: Send + Sync { impl Client for SubstrateClient where B: client::backend::Backend + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static, - Self: BlockImport, + Self: BlockImport, Block: BlockT, RA: Send + Sync { fn import(&self, block: ImportBlock, new_authorities: Option>) - -> Result + -> Result { (self as &SubstrateClient).import_block(block, new_authorities) } diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index b6ba3f7883..5f10051b15 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -55,7 +55,6 @@ mod chain; mod blocks; mod on_demand; pub mod config; -pub mod import_queue; pub mod consensus_gossip; pub mod error; pub mod message; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 60133b25c1..42c5954532 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -24,14 +24,13 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, use runtime_primitives::generic::BlockId; use network_libp2p::{NodeIndex, Severity}; use codec::{Encode, Decode}; - +use consensus::import_queue::ImportQueue; use message::{self, Message}; use message::generic::Message as GenericMessage; use consensus_gossip::ConsensusGossip; use specialization::NetworkSpecialization; use sync::{ChainSync, Status as SyncStatus, SyncState}; use service::{TransactionPool, ExHashT}; -use import_queue::ImportQueue; use config::{ProtocolConfig, Roles}; use chain::Client; use client::light::fetcher::ChangesProof; @@ -196,7 +195,9 @@ impl, H: ExHashT> Protocol { on_demand: Option>>, transaction_pool: Arc>, specialization: S, - ) -> error::Result { + ) -> error::Result + where I: ImportQueue + { let info = chain.info()?; let sync = ChainSync::new(config.roles, &info, import_queue); let protocol = Protocol { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index d633e0397d..5487accd1b 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -20,17 +20,19 @@ use std::{io, thread}; use std::time::Duration; use futures::{self, Future, Stream, stream, sync::oneshot}; use parking_lot::{Mutex, RwLock}; -use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind}; +use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, NodeIndex, ErrorKind, Severity}; use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol}; use io::NetSyncIo; +use consensus::import_queue::{ImportQueue, Link}; use consensus_gossip::ConsensusGossip; use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus}; use config::Params; use error::Error; use specialization::NetworkSpecialization; -use import_queue::ImportQueue; -use runtime_primitives::traits::{Block as BlockT}; +use runtime_primitives::traits::{Block as BlockT, NumberFor}; +use sync::ChainSync; +use std::sync::Weak; use tokio::{runtime::Runtime, timer::Interval}; /// Type that represents fetch completion future. @@ -65,6 +67,53 @@ pub trait ExecuteInContext: Send + Sync { fn execute_in_context)>(&self, closure: F); } +/// A link implementation that connects to the network. +pub struct NetworkLink> { + /// The chain-sync handle + pub(crate) sync: Weak>>, + /// Network context. + pub(crate) context: Weak, +} + +impl> NetworkLink { + /// Execute closure with locked ChainSync. + fn with_sync, &mut Context)>(&self, closure: F) { + if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) { + service.execute_in_context(move |protocol| { + let mut sync = sync.write(); + closure(&mut *sync, protocol) + }); + } + } +} + +impl> Link for NetworkLink { + fn block_imported(&self, hash: &B::Hash, number: NumberFor) { + self.with_sync(|sync, _| sync.block_imported(&hash, number)) + } + + fn maintain_sync(&self) { + self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) + } + + fn useless_peer(&self, who: NodeIndex, reason: &str) { + trace!(target:"sync", "Useless peer {}, {}", who, reason); + self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) + } + + fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { + trace!(target:"sync", "Bad peer {}, {}", who, reason); + self.with_sync(|sync, protocol| { + protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? + sync.restart(protocol); + }) + } + + fn restart(&self) { + self.with_sync(|sync, protocol| sync.restart(protocol)) + } +} + /// Substrate network service. Handles network IO and manages connectivity. pub struct Service, H: ExHashT> { /// Network service @@ -85,7 +134,9 @@ impl, H: ExHashT> Service, protocol_id: ProtocolId, import_queue: Arc, - ) -> Result>, Error> { + ) -> Result>, Error> + where I: ImportQueue + { let handler = Arc::new(Protocol::new( params.config, params.chain, @@ -106,7 +157,7 @@ impl, H: ExHashT> Service ChainSync { who: NodeIndex, _request: message::BlockRequest, response: message::BlockResponse - ) -> Option<(BlockOrigin, Vec>)> { - let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&who) { + ) -> Option<(BlockOrigin, Vec>)> { + let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { PeerSyncState::DownloadingNew(start_block) => { self.blocks.clear_peer_download(who); peer.state = PeerSyncState::Available; self.blocks.insert(start_block, response.blocks, who); - self.blocks.drain(self.best_queued_number + As::sa(1)) + self.blocks + .drain(self.best_queued_number + As::sa(1)) + .into_iter() + .map(|block_data| { + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + justification: block_data.block.justification, + origin: block_data.origin, + } + }).collect() }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; - response.blocks.into_iter().map(|b| blocks::BlockData { - origin: Some(who), - block: b + response.blocks.into_iter().map(|b| { + IncomingBlock { + hash: b.hash, + header: b.header, + body: b.body, + justification: b.justification, + origin: Some(who), + } }).collect() }, PeerSyncState::AncestorSearch(n) => { @@ -253,11 +269,11 @@ impl ChainSync { }; let best_seen = self.best_seen_block(); - let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n)); + let is_best = new_blocks.first().and_then(|b| b.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n)); let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; if let Some((hash, number)) = new_blocks.last() - .and_then(|b| b.block.header.as_ref().map(|h| (b.block.hash.clone(), *h.number()))) + .and_then(|b| b.header.as_ref().map(|h| (b.hash.clone(), *h.number()))) { self.block_queued(&hash, number); } diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs new file mode 100644 index 0000000000..6104be87f0 --- /dev/null +++ b/substrate/core/network/src/test/block_import.rs @@ -0,0 +1,171 @@ +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Testing block import logic. + +use consensus::import_queue::{import_single_block, process_import_result}; +use consensus::import_queue::{AsyncImportQueueData, BasicQueue, BlockImportError, BlockImportResult}; +use test_client::{self, TestClient}; +use test_client::runtime::{Block, Hash}; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::NumberFor; +use std::cell::Cell; +use super::*; + +struct TestLink { + imported: Cell, + maintains: Cell, + disconnects: Cell, + restarts: Cell, +} + +impl TestLink { + fn new() -> TestLink { + TestLink { + imported: Cell::new(0), + maintains: Cell::new(0), + disconnects: Cell::new(0), + restarts: Cell::new(0), + } + } + + fn total(&self) -> usize { + self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get() + } +} + +impl Link for TestLink { + fn block_imported(&self, _hash: &Hash, _number: NumberFor) { + self.imported.set(self.imported.get() + 1); + } + fn maintain_sync(&self) { + self.maintains.set(self.maintains.get() + 1); + } + fn useless_peer(&self, _: NodeIndex, _: &str) { + self.disconnects.set(self.disconnects.get() + 1); + } + fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) { + self.useless_peer(id, r); + self.restart(); + } + fn restart(&self) { + self.restarts.set(self.restarts.get() + 1); + } +} + +fn prepare_good_block() -> (client::Client, Hash, u64, IncomingBlock) { + let client = test_client::new(); + let block = client.new_block().unwrap().bake().unwrap(); + client.import(BlockOrigin::File, block).unwrap(); + + let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1); + let header = client.header(&BlockId::Number(1)).unwrap(); + let justification = client.justification(&BlockId::Number(1)).unwrap(); + (client, hash, number, IncomingBlock { + hash, + header, + body: None, + justification, + origin: Some(0) + }) +} + +#[test] +fn import_single_good_block_works() { + let (_, hash, number, block) = prepare_good_block(); + assert_eq!( + import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + Ok(BlockImportResult::ImportedUnknown(hash, number)) + ); +} + +#[test] +fn import_single_good_known_block_is_ignored() { + let (client, hash, number, block) = prepare_good_block(); + assert_eq!( + import_single_block(&client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + Ok(BlockImportResult::ImportedKnown(hash, number)) + ); +} + +#[test] +fn import_single_good_block_without_header_fails() { + let (_, _, _, mut block) = prepare_good_block(); + block.header = None; + assert_eq!( + import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + Err(BlockImportError::IncompleteHeader(Some(0))) + ); +} + +#[test] +fn process_import_result_works() { + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + assert_eq!(link.total(), 1); + + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + assert_eq!(link.total(), 1); + assert_eq!(link.imported.get(), 1); + + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); + assert_eq!(link.total(), 1); + assert_eq!(link.imported.get(), 1); + + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.disconnects.get(), 1); + + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::UnknownParent)), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.restarts.get(), 1); + + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::Error)), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.restarts.get(), 1); +} + +#[test] +fn import_many_blocks_stops_when_stopping() { + let (_, _, _, block) = prepare_good_block(); + let qdata = AsyncImportQueueData::new(); + let verifier = Arc::new(PassThroughVerifier(true)); + qdata.stop(); + let client = test_client::new(); + assert!(!import_many_blocks( + &client, + &mut TestLink::new(), + Some(&qdata), + (BlockOrigin::File, vec![block.clone(), block]), + verifier + )); +} + +#[test] +fn async_import_queue_drops() { + // Perform this test multiple times since it exhibits non-deterministic behavior. + for _ in 0..100 { + let verifier = Arc::new(PassThroughVerifier(true)); + let queue = BasicQueue::new(verifier, Arc::new(test_client::new())); + queue.start(TestLink::new()).unwrap(); + drop(queue); + } +} diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index f8b6705164..b2beba05db 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -18,33 +18,176 @@ #[cfg(test)] mod sync; +#[cfg(test)] +mod block_import; use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; use parking_lot::RwLock; use client; -use client::error::Error as ClientError; use client::block_builder::BlockBuilder; +use runtime_primitives::Justification; use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Zero}; use io::SyncIo; +use primitives::AuthorityId; use protocol::{Context, Protocol, ProtocolContext}; use config::ProtocolConfig; -use service::TransactionPool; +use service::{NetworkLink, TransactionPool}; use network_libp2p::{NodeIndex, PeerId, Severity}; use keyring::Keyring; use codec::Encode; -use import_queue::{SyncImportQueue, PassThroughVerifier, Verifier}; -use consensus::BlockOrigin; +use consensus::{BlockImport, BlockOrigin, ImportBlock}; +use consensus::Error as ConsensusError; +use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock}; +use consensus::import_queue::{Link, SharedBlockImport, Verifier}; use specialization::NetworkSpecialization; use consensus_gossip::ConsensusGossip; -use import_queue::{BlockImport, ImportQueue}; use service::ExecuteInContext; use test_client; pub use test_client::runtime::{Block, Hash, Transfer, Extrinsic}; pub use test_client::TestClient; +#[cfg(any(test, feature = "test-helpers"))] +use std::cell::RefCell; + +#[cfg(any(test, feature = "test-helpers"))] +struct ImportCB(RefCell>) -> bool>>>); + +#[cfg(any(test, feature = "test-helpers"))] +impl ImportCB { + fn new() -> Self { + ImportCB(RefCell::new(None)) + } + fn set(&self, cb: Box) + where F: 'static + Fn(BlockOrigin, Vec>) -> bool + { + *self.0.borrow_mut() = Some(cb); + } + fn call(&self, origin: BlockOrigin, data: Vec>) -> bool { + let b = self.0.borrow(); + b.as_ref().expect("The Callback has been set before. qed.")(origin, data) + } +} + +#[cfg(any(test, feature = "test-helpers"))] +unsafe impl Send for ImportCB {} +#[cfg(any(test, feature = "test-helpers"))] +unsafe impl Sync for ImportCB {} + + +#[cfg(any(test, feature = "test-helpers"))] +/// A Verifier that accepts all blocks and passes them on with the configured +/// finality to be imported. +pub struct PassThroughVerifier(pub bool); + +#[cfg(any(test, feature = "test-helpers"))] +/// This Verifiyer accepts all data as valid +impl Verifier for PassThroughVerifier { + fn verify( + &self, + origin: BlockOrigin, + header: B::Header, + justification: Option, + body: Option> + ) -> Result<(ImportBlock, Option>), String> { + Ok((ImportBlock { + origin, + header, + body, + finalized: self.0, + justification, + post_digests: vec![], + auxiliary: Vec::new(), + }, None)) + } +} + +/// A link implementation that does nothing. +pub struct NoopLink; + +impl Link for NoopLink { } + +/// Blocks import queue that is importing blocks in the same thread. +/// The boolean value indicates whether blocks should be imported without instant finality. +#[cfg(any(test, feature = "test-helpers"))] +pub struct SyncImportQueue> { + verifier: Arc, + link: ImportCB, + block_import: SharedBlockImport, +} + +#[cfg(any(test, feature = "test-helpers"))] +impl> SyncImportQueue { + /// Create a new SyncImportQueue wrapping the given Verifier and block import + /// handle. + pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { + let queue = SyncImportQueue { + verifier, + link: ImportCB::new(), + block_import, + }; + + let v = queue.verifier.clone(); + let import_handle = queue.block_import.clone(); + queue.link.set(Box::new(move |origin, new_blocks| { + let verifier = v.clone(); + import_many_blocks( + &*import_handle, + &NoopLink, + None, + (origin, new_blocks), + verifier, + ) + })); + + queue + } +} + +#[cfg(any(test, feature = "test-helpers"))] +impl> ImportQueue for SyncImportQueue +{ + fn start>( + &self, + link: L, + ) -> Result<(), std::io::Error> { + let v = self.verifier.clone(); + let import_handle = self.block_import.clone(); + self.link.set(Box::new(move |origin, new_blocks| { + let verifier = v.clone(); + import_many_blocks( + &*import_handle, + &link, + None, + (origin, new_blocks), + verifier, + ) + })); + Ok(()) + } + fn clear(&self) { } + + fn stop(&self) { } + + fn status(&self) -> ImportQueueStatus { + ImportQueueStatus { + importing_count: 0, + best_importing_number: Zero::zero(), + } + } + + fn is_importing(&self, _hash: &B::Hash) -> bool { + false + } + + fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { + self.link.call(origin, blocks); + } +} + struct DummyContextExecutor(Arc>, Arc>>); unsafe impl Send for DummyContextExecutor {} unsafe impl Sync for DummyContextExecutor {} @@ -157,7 +300,7 @@ impl, D> Peer { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - let network_link = ::import_queue::NetworkLink { + let network_link = NetworkLink { sync: Arc::downgrade(self.sync.sync()), context: Arc::downgrade(&self.executor), }; @@ -237,8 +380,6 @@ impl, D> Peer { pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, mut edit_block: F) where F: FnMut(BlockBuilder) -> Block { - use blocks::BlockData; - for _ in 0..count { let builder = self.client.new_block().unwrap(); let block = edit_block(builder); @@ -248,17 +389,15 @@ impl, D> Peer { // NOTE: if we use a non-synchronous queue in the test-net in the future, // this may not work. - self.import_queue.import_blocks(origin, vec![BlockData { + self.import_queue.import_blocks(origin, vec![ + IncomingBlock { origin: None, - block: ::message::BlockData:: { hash, header: Some(header), body: Some(block.extrinsics), - receipt: None, - message_queue: None, justification: None, }, - }]); + ]); } } @@ -330,7 +469,7 @@ pub trait TestNetFactory: Sized { /// Get custom block import handle for fresh client, along with peer data. fn make_block_import(&self, client: Arc) - -> (Arc + Send + Sync>, Self::PeerData) + -> (Arc + Send + Sync>, Self::PeerData) { (client, Default::default()) } diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index 6d6dc558f4..15a4deb42a 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -21,7 +21,7 @@ use futures::Future; use runtime_primitives::generic::{SignedBlock, BlockId}; use runtime_primitives::traits::{As, Block, Header}; -use network::import_queue::{ImportQueue, Link, BlockData}; +use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link}; use network::message; use consensus_common::BlockOrigin; @@ -123,7 +123,15 @@ pub fn import_blocks(mut config: FactoryFullConfiguration, exit: E, message_queue: None }; // import queue handles verification and importing it into the client - queue.import_blocks(BlockOrigin::File, vec![BlockData:: { block, origin: None }]); + queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock::{ + hash: block.hash, + header: block.header, + body: block.body, + justification: block.justification, + origin: None, + } + ]); } else { warn!("Error reading block data at {}.", b); break; diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index df50d54529..29b6768374 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -23,7 +23,8 @@ use chain_spec::ChainSpec; use client_db; use client::{self, Client, runtime_api::{Metadata, TaggedTransactionQueue}}; use {error, Service, maybe_start_server}; -use network::{self, OnDemand, import_queue::ImportQueue}; +use consensus_common::import_queue::ImportQueue; +use network::{self, OnDemand}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use runtime_primitives::{BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::{BlockId, SignedBlock}}; @@ -279,9 +280,9 @@ pub trait ServiceFactory: 'static + Sized { /// Extended light service type. type LightService: ServiceTrait>; /// ImportQueue for full client - type FullImportQueue: network::import_queue::ImportQueue + 'static; + type FullImportQueue: consensus_common::import_queue::ImportQueue + 'static; /// ImportQueue for light clients - type LightImportQueue: network::import_queue::ImportQueue + 'static; + type LightImportQueue: consensus_common::import_queue::ImportQueue + 'static; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/substrate/core/test-client/src/client_ext.rs b/substrate/core/test-client/src/client_ext.rs index d4ca6361ee..a833472e44 100644 --- a/substrate/core/test-client/src/client_ext.rs +++ b/substrate/core/test-client/src/client_ext.rs @@ -17,7 +17,7 @@ //! Client extension for tests. use client::{self, Client}; -use consensus::{ImportBlock, BlockImport, BlockOrigin}; +use consensus::{ImportBlock, BlockImport, BlockOrigin, Error as ConsensusError}; use runtime_primitives::Justification; use runtime_primitives::generic::BlockId; use primitives::Blake2Hasher; @@ -27,11 +27,11 @@ use runtime; pub trait TestClient: Sized { /// Import block to the chain. No finality. fn import(&self, origin: BlockOrigin, block: runtime::Block) - -> client::error::Result<()>; + -> Result<(), ConsensusError>; /// Import block with justification, finalizes block. fn import_justified(&self, origin: BlockOrigin, block: runtime::Block, justification: Justification) - -> client::error::Result<()>; + -> Result<(), ConsensusError>; /// Finalize a block. fn finalize_block(&self, id: BlockId, justification: Option) -> client::error::Result<()>; @@ -44,10 +44,10 @@ impl TestClient for Client where B: client::backend::Backend, E: client::CallExecutor, - Self: BlockImport, + Self: BlockImport, { fn import(&self, origin: BlockOrigin, block: runtime::Block) - -> client::error::Result<()> + -> Result<(), ConsensusError> { let import = ImportBlock { origin, @@ -63,7 +63,7 @@ impl TestClient for Client } fn import_justified(&self, origin: BlockOrigin, block: runtime::Block, justification: Justification) - -> client::error::Result<()> + -> Result<(), ConsensusError> { let import = ImportBlock { origin,