diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index e3a87e8ac9..3f8358d3a2 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -699,11 +699,12 @@ mod tests { const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); pub struct AuraTestNet { - peers: Vec>>, + peers: Vec>>, started: bool, } impl TestNetFactory for AuraTestNet { + type Specialization = DummySpecialization; type Verifier = AuraVerifier; type PeerData = (); @@ -734,15 +735,15 @@ mod tests { }) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>>)>(&mut self, closure: F) { closure(&mut self.peers); } diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 89f3a563b5..38f7980435 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -27,12 +27,11 @@ use crate::block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin}; use crossbeam_channel::{self as channel, Receiver, Sender}; -use std::collections::HashSet; use std::sync::Arc; use std::thread; use runtime_primitives::traits::{ - AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor, Zero, + AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor }; use runtime_primitives::Justification; @@ -85,14 +84,8 @@ pub trait ImportQueue: Send + Sync + ImportQueueClone { fn start(&self, _link: Box>) -> Result<(), std::io::Error> { Ok(()) } - /// Clear the queue when sync is restarting. - fn clear(&self); /// Clears the import queue and stops importing. fn stop(&self); - /// Get queue status. - fn status(&self) -> ImportQueueStatus; - /// Is block with given hash currently in the queue. - fn is_importing(&self, hash: &B::Hash) -> bool; /// Import bunch of blocks. fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>); /// Import a block justification. @@ -109,15 +102,6 @@ impl Clone for Box> { } } -/// Import queue status. It isn't completely accurate. -#[derive(Debug)] -pub struct ImportQueueStatus { - /// Number of blocks that are currently in the queue. - pub importing_count: usize, - /// The number of the best block that was ever in the queue since start/last failure. - pub best_importing_number: NumberFor, -} - /// Interface to a basic block import queue that is importing blocks sequentially in a separate thread, /// with pluggable verification. #[derive(Clone)] @@ -165,18 +149,12 @@ impl BasicQueue { impl ImportQueue for BasicQueue { fn start(&self, link: Box>) -> Result<(), std::io::Error> { + let (sender, port) = channel::unbounded(); let _ = self .sender - .send(BlockImportMsg::Start(link)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); - Ok(()) - } - - fn clear(&self) { - let _ = self - .sender - .send(BlockImportMsg::Clear) + .send(BlockImportMsg::Start(link, sender)) .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") } fn stop(&self) { @@ -186,24 +164,6 @@ impl ImportQueue for BasicQueue { .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } - fn status(&self) -> ImportQueueStatus { - let (sender, port) = channel::unbounded(); - let _ = self - .sender - .send(BlockImportMsg::Status(sender)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); - port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") - } - - fn is_importing(&self, hash: &B::Hash) -> bool { - let (sender, port) = channel::unbounded(); - let _ = self - .sender - .send(BlockImportMsg::IsImporting(hash.clone(), sender)) - .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); - port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed") - } - fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return; @@ -224,11 +184,8 @@ impl ImportQueue for BasicQueue { pub enum BlockImportMsg { ImportBlocks(BlockOrigin, Vec>), - Clear, - Status(Sender>), - IsImporting(B::Hash, Sender), ImportJustification(Origin, B::Hash, NumberFor, Justification), - Start(Box>), + Start(Box>, Sender>), Stop, } @@ -251,8 +208,6 @@ struct BlockImporter { port: Receiver>, result_port: Receiver>, worker_sender: Sender>, - queue_blocks: HashSet, - best_importing_number: NumberFor, link: Option>>, justification_import: Option>, } @@ -271,8 +226,6 @@ impl BlockImporter { port, result_port, worker_sender, - queue_blocks: HashSet::new(), - best_importing_number: Zero::zero(), link: None, justification_import, }; @@ -310,25 +263,18 @@ impl BlockImporter { match msg { BlockImportMsg::ImportBlocks(origin, incoming_blocks) => { self.handle_import_blocks(origin, incoming_blocks) - } - BlockImportMsg::Clear => self.handle_clear(), - BlockImportMsg::Status(reply_sender) => self.handle_status(reply_sender), - BlockImportMsg::IsImporting(hash, reply_sender) => { - self.handle_is_importing(hash, reply_sender) - } + }, BlockImportMsg::ImportJustification(who, hash, number, justification) => { self.handle_import_justification(who, hash, number, justification) - } - BlockImportMsg::Start(link) => { + }, + BlockImportMsg::Start(link, sender) => { if let Some(justification_import) = self.justification_import.as_ref() { justification_import.on_start(&*link); } self.link = Some(link); - } - BlockImportMsg::Stop => { - self.handle_clear(); - return false; - } + let _ = sender.send(Ok(())); + }, + BlockImportMsg::Stop => return false, } true } @@ -339,15 +285,15 @@ impl BlockImporter { _ => unreachable!("Import Worker does not send ImportBlocks message; qed"), }; let mut has_error = false; + let mut hashes = vec![]; for (result, hash) in results { - self.queue_blocks.remove(&hash); + hashes.push(hash); if has_error { continue; } if result.is_err() { - self.best_importing_number = Zero::zero(); has_error = true; } @@ -389,28 +335,11 @@ impl BlockImporter { }; } if let Some(link) = self.link.as_ref() { - link.maintain_sync(); + link.blocks_processed(hashes, has_error); } true } - fn handle_clear(&mut self) { - self.queue_blocks.clear(); - self.best_importing_number = Zero::zero(); - } - - fn handle_status(&self, reply_sender: Sender>) { - let status = ImportQueueStatus { - importing_count: self.queue_blocks.len(), - best_importing_number: self.best_importing_number, - }; - let _ = reply_sender.send(status); - } - - fn handle_is_importing(&self, hash: B::Hash, reply_sender: Sender) { - let _ = reply_sender.send(self.queue_blocks.contains(&hash)); - } - fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { let success = self.justification_import.as_ref().map(|justification_import| { justification_import.import_justification(hash, number, justification) @@ -426,16 +355,6 @@ impl BlockImporter { fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { trace!(target:"sync", "Scheduling {} blocks for import", blocks.len()); - - let new_best_importing_number = blocks - .last() - .and_then(|b| b.header.as_ref().map(|h| h.number().clone())) - .unwrap_or_else(|| Zero::zero()); - self.queue_blocks - .extend(blocks.iter().map(|b| b.hash.clone())); - if new_best_importing_number > self.best_importing_number { - self.best_importing_number = new_best_importing_number; - } self.worker_sender .send(BlockImportWorkerMsg::ImportBlocks(origin, blocks)) .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); @@ -530,12 +449,12 @@ impl> BlockImportWorker { pub trait Link: Send { /// Block imported. fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) { } + /// Batch of blocks imported, with or without error. + fn blocks_processed(&self, _processed_blocks: Vec, _has_error: bool) {} /// Justification import result. fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor, _success: bool) { } /// Request a justification for the given block. fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) { } - /// Maintain sync. - fn maintain_sync(&self) {} /// Disconnect from peer. fn useless_peer(&self, _who: Origin, _reason: &str) {} /// Disconnect from peer and restart sync. @@ -674,8 +593,6 @@ mod tests { fn block_imported(&self, _hash: &Hash, _number: NumberFor) { let _ = self.sender.send(LinkMsg::BlockImported); } - fn maintain_sync(&self) { - } fn useless_peer(&self, _: Origin, _: &str) { let _ = self.sender.send(LinkMsg::Disconnected); } @@ -695,12 +612,11 @@ mod tests { let (link_sender, link_port) = channel::unbounded(); let importer_sender = BlockImporter::::new(result_port, worker_sender, None); let link = TestLink::new(link_sender); - let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()))); + let (ack_sender, start_ack_port) = channel::bounded(4); + let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone()), ack_sender)); // Ensure the importer handles Start before any result messages. - let (ack_sender, ack_port) = channel::unbounded(); - let _ = importer_sender.send(BlockImportMsg::Status(ack_sender)); - let _ = ack_port.recv(); + let _ = start_ack_port.recv(); // Send a known let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())]; diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 029e517943..733421c4df 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -17,7 +17,7 @@ //! Tests and test helpers for GRANDPA. use super::*; -use network::test::{Block, Hash, TestNetFactory, Peer, PeersClient}; +use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, PeersClient}; use network::test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles}; use parking_lot::Mutex; @@ -52,12 +52,12 @@ type PeerData = > > >; -type GrandpaPeer = Peer; +type GrandpaPeer = Peer; struct GrandpaTestNet { peers: Vec>, test_config: TestApi, - started: bool + started: bool, } impl GrandpaTestNet { @@ -68,16 +68,15 @@ impl GrandpaTestNet { test_config, }; let config = Self::default_config(); - for _ in 0..n_peers { net.add_peer(&config); } - net } } impl TestNetFactory for GrandpaTestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = PeerData; @@ -86,7 +85,7 @@ impl TestNetFactory for GrandpaTestNet { GrandpaTestNet { peers: Vec::new(), test_config: Default::default(), - started: false + started: false, } } @@ -122,7 +121,7 @@ impl TestNetFactory for GrandpaTestNet { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 65c02a39aa..47bfde1bfb 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -206,8 +206,8 @@ impl, &mut Context)> GossipTask< /// Messages sent to Protocol from elsewhere inside the system. pub enum ProtocolMsg> { - /// Tell protocol to maintain sync. - MaintainSync, + /// A batch of blocks has been processed, with or without errors. + BlocksProcessed(Vec, bool), /// Tell protocol to restart sync. RestartSync, /// Ask the protocol for its status. @@ -367,11 +367,12 @@ impl, H: ExHashT> Protocol { ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => { self.gossip_consensus_message(topic, engine_id, message) } - ProtocolMsg::MaintainSync => { + ProtocolMsg::BlocksProcessed(hashes, has_error) => { + self.sync.blocks_processed(hashes, has_error); let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); self.sync.maintain_sync(&mut context); - } + }, ProtocolMsg::RestartSync => { let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); @@ -624,16 +625,7 @@ impl, H: ExHashT> Protocol { response, ); } else { - // import_queue.import_blocks also acquires sync.write(); - // Break the cycle by doing these separately from the outside; - let new_blocks = { - self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response) - }; - - if let Some((origin, new_blocks)) = new_blocks { - let import_queue = self.sync.import_queue(); - import_queue.import_blocks(origin, new_blocks); - } + self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response); } } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 0741c9f1d9..3ccb9c1277 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -71,6 +71,7 @@ pub trait TransactionPool: Send + Sync { } /// A link implementation that connects to the network. +#[derive(Clone)] pub struct NetworkLink> { /// The protocol sender pub(crate) protocol_sender: Sender>, @@ -83,6 +84,10 @@ impl> Link for NetworkLink { let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); } + fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { + let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); + } + fn justification_imported(&self, who: NodeIndex, hash: &B::Hash, number: NumberFor, success: bool) { let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { @@ -95,10 +100,6 @@ impl> Link for NetworkLink { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } - fn maintain_sync(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::MaintainSync); - } - fn useless_peer(&self, who: NodeIndex, reason: &str) { trace!(target:"sync", "Useless peer {}, {}", who, reason); self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 1cf639ee65..6ba6617d7a 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use std::cmp::max; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; use log::{debug, trace, warn}; @@ -26,10 +27,11 @@ use consensus::import_queue::{ImportQueue, IncomingBlock}; use client::error::Error as ClientError; use crate::blocks::BlockCollection; use runtime_primitives::Justification; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero}; use runtime_primitives::generic::BlockId; use crate::message::{self, generic::Message as GenericMessage}; use crate::config::Roles; +use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -303,6 +305,8 @@ pub struct ChainSync { required_block_attributes: message::BlockAttributes, justifications: PendingJustifications, import_queue: Box>, + queue_blocks: HashSet, + best_importing_number: NumberFor, is_stopping: AtomicBool, is_offline: Arc, is_major_syncing: Arc, @@ -367,6 +371,8 @@ impl ChainSync { justifications: PendingJustifications::new(), required_block_attributes, import_queue, + queue_blocks: Default::default(), + best_importing_number: Zero::zero(), is_stopping: Default::default(), is_offline, is_major_syncing, @@ -377,11 +383,6 @@ impl ChainSync { self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) } - /// Returns import queue reference. - pub(crate) fn import_queue(&self) -> Box> { - self.import_queue.clone() - } - fn state(&self, best_seen: &Option>) -> SyncState { match best_seen { &Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading, @@ -410,7 +411,7 @@ impl ChainSync { let previous_state = self.state(&previous_best_seen); if let Some(info) = protocol.peer_info(who) { - match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) { + match (block_status(&*protocol.client(), &self.queue_blocks, info.best_hash), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); let reason = format!("Error legimimately reading blockchain status: {:?}", e); @@ -424,7 +425,7 @@ impl ChainSync { let reason = format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); protocol.report_peer(who, Severity::Bad(reason)); }, - (Ok(BlockStatus::Unknown), _) if self.import_queue.status().importing_count > MAJOR_SYNC_BLOCKS => { + (Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => { // when actively syncing the common point moves too fast. debug!(target:"sync", "New peer with unknown best hash {} ({}), assuming common block.", self.best_queued_hash, self.best_queued_number); self.peers.insert(who, PeerSync { @@ -498,7 +499,7 @@ impl ChainSync { who: NodeIndex, request: message::BlockRequest, response: message::BlockResponse - ) -> Option<(BlockOrigin, Vec>)> { + ) { let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { let mut blocks = response.blocks; if request.direction == message::Direction::Descending { @@ -553,24 +554,24 @@ impl ChainSync { let n = n - As::sa(1); peer.state = PeerSyncState::AncestorSearch(n); Self::request_ancestry(protocol, who, n); - return None; + return; }, Ok(_) => { // genesis mismatch trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string())); - return None; + return; }, Err(e) => { let reason = format!("Error answering legitimate blockchain query: {:?}", e); protocol.report_peer(who, Severity::Useless(reason)); - return None; + return; } } }, None => { trace!(target:"sync", "Invalid response when searching for ancestor from {}", who); protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor".to_string())); - return None; + return; } } }, @@ -593,7 +594,14 @@ impl ChainSync { self.block_queued(&hash, number); } self.maintain_sync(protocol); - Some((origin, new_blocks)) + let new_best_importing_number = new_blocks + .last() + .and_then(|b| b.header.as_ref().map(|h| h.number().clone())) + .unwrap_or_else(|| Zero::zero()); + self.queue_blocks + .extend(new_blocks.iter().map(|b| b.hash.clone())); + self.best_importing_number = max(new_best_importing_number, self.best_importing_number); + self.import_queue.import_blocks(origin, new_blocks); } /// Handle new justification data. @@ -644,6 +652,16 @@ impl ChainSync { self.maintain_sync(protocol); } + /// A batch of blocks have been processed, with or without errors. + pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + for hash in processed_blocks { + self.queue_blocks.remove(&hash); + } + if has_error { + self.best_importing_number = Zero::zero(); + } + } + /// Maintain the sync process (download new blocks, fetch justifications). pub fn maintain_sync(&mut self, protocol: &mut Context) { if self.is_stopping.load(Ordering::SeqCst) { @@ -787,7 +805,7 @@ impl ChainSync { } fn is_known(&self, protocol: &mut Context, hash: &B::Hash) -> bool { - block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } /// Handle disconnected peer. @@ -813,7 +831,8 @@ impl ChainSync { /// Restart the sync process. pub(crate) fn restart(&mut self, protocol: &mut Context) { - self.import_queue.clear(); + self.queue_blocks.clear(); + self.best_importing_number = Zero::zero(); self.blocks.clear(); match protocol.client().info() { Ok(info) => { @@ -884,9 +903,8 @@ impl ChainSync { // Issue a request for a peer to download new blocks, if any are available fn download_new(&mut self, protocol: &mut Context, who: NodeIndex) { if let Some(ref mut peer) = self.peers.get_mut(&who) { - let import_status = self.import_queue.status(); // when there are too many blocks in the queue => do not try to download new blocks - if import_status.importing_count > MAX_IMPORTING_BLOCKS { + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: "sync", "Too many blocks in the queue."); return; } @@ -931,10 +949,10 @@ impl ChainSync { /// Get block status, taking into account import queue. fn block_status( chain: &crate::chain::Client, - queue: &ImportQueue, + queue_blocks: &HashSet, hash: B::Hash) -> Result { - if queue.is_importing(&hash) { + if queue_blocks.contains(&hash) { return Ok(BlockStatus::Queued); } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 1ff52c9916..47750080a6 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -93,7 +93,8 @@ pub struct NoopLink { } impl Link for NoopLink { } /// The test specialization. -pub struct DummySpecialization { } +#[derive(Clone)] +pub struct DummySpecialization; impl NetworkSpecialization for DummySpecialization { fn status(&self) -> Vec { @@ -117,23 +118,92 @@ impl NetworkSpecialization for DummySpecialization { pub type PeersClient = client::Client; -pub struct Peer { +#[derive(Clone)] +/// A Link that can wait for a block to have been imported. +pub struct TestLink + Clone> { + import_done: Arc, + hash: Arc>, + link: NetworkLink, +} + +impl + Clone> TestLink { + fn new( + protocol_sender: Sender>, + network_sender: NetworkChan + ) -> TestLink { + TestLink { + import_done: Arc::new(AtomicBool::new(false)), + hash: Arc::new(Mutex::new(Default::default())), + link: NetworkLink { + protocol_sender, + network_sender, + } + } + } + + /// Set the hash which will be awaited for import. + fn with_hash(&self, hash: Hash) { + self.import_done.store(false, Ordering::SeqCst); + *self.hash.lock() = hash; + } + + /// Simulate a synchronous import. + fn wait_for_import(&self) { + while !self.import_done.load(Ordering::SeqCst) { + thread::sleep(Duration::from_millis(20)); + } + } +} + +impl + Clone> Link for TestLink { + fn block_imported(&self, hash: &Hash, number: NumberFor) { + if hash == &*self.hash.lock() { + self.import_done.store(true, Ordering::SeqCst); + } + self.link.block_imported(hash, number); + } + + fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { + self.link.blocks_processed(processed_blocks, has_error); + } + + fn justification_imported(&self, who: NodeIndex, hash: &Hash, number:NumberFor, success: bool) { + self.link.justification_imported(who, hash, number, success); + } + + fn request_justification(&self, hash: &Hash, number: NumberFor) { + self.link.request_justification(hash, number); + } + + fn useless_peer(&self, who: NodeIndex, reason: &str) { + self.link.useless_peer(who, reason); + } + + fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { + self.link.note_useless_and_restart_sync(who, reason); + } + + fn restart(&self) { + self.link.restart(); + } +} + +pub struct Peer + Clone> { pub is_offline: Arc, pub is_major_syncing: Arc, pub peers: Arc>>>, client: Arc, network_to_protocol_sender: Sender>, - pub protocol_sender: Sender>, - - network_port: Mutex>, + pub protocol_sender: Sender>, + network_link: TestLink, + network_port: Arc>>, pub import_queue: Box>, - network_sender: NetworkChan, pub data: D, best_hash: Mutex>, finalized_hash: Mutex>, } -impl Peer { +impl + Clone> Peer { fn new( is_offline: Arc, is_major_syncing: Arc, @@ -141,12 +211,14 @@ impl Peer { client: Arc, import_queue: Box>, network_to_protocol_sender: Sender>, - protocol_sender: Sender>, + protocol_sender: Sender>, network_sender: NetworkChan, network_port: NetworkPort, data: D, ) -> Self { - let network_port = Mutex::new(network_port); + let network_port = Arc::new(Mutex::new(network_port)); + let network_link = TestLink::new(protocol_sender.clone(), network_sender.clone()); + import_queue.start(Box::new(network_link.clone())).expect("Test ImportQueue always starts"); Peer { is_offline, is_major_syncing, @@ -155,7 +227,7 @@ impl Peer { network_to_protocol_sender, protocol_sender, import_queue, - network_sender, + network_link, network_port, data, best_hash: Mutex::new(None), @@ -171,12 +243,6 @@ impl Peer { .header(&BlockId::Hash(info.chain.best_hash)) .unwrap() .unwrap(); - let network_link = NetworkLink { - protocol_sender: self.protocol_sender.clone(), - network_sender: self.network_sender.clone(), - }; - - self.import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts"); let _ = self .protocol_sender .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); @@ -237,8 +303,7 @@ impl Peer { /// Whether this peer is done syncing (has no messages to send). fn is_done(&self) -> bool { - self.import_queue.status().importing_count == 0 && - self.network_port.lock().receiver().is_empty() + self.network_port.lock().receiver().is_empty() } /// Execute a "sync step". This is called for each peer after it sends a packet. @@ -362,7 +427,7 @@ impl Peer { ); let header = block.header.clone(); at = hash; - + self.network_link.with_hash(hash); self.import_queue.import_blocks( origin, vec![IncomingBlock { @@ -373,10 +438,8 @@ impl Peer { justification: None, }], ); - // Simulate a synchronous import. - while self.import_queue.status().importing_count > 0 { - thread::sleep(Duration::from_millis(20)); - } + // Simulate a sync import. + self.network_link.wait_for_import(); } at } @@ -436,7 +499,18 @@ impl TransactionPool for EmptyTransactionPool { fn on_broadcasted(&self, _: HashMap>) {} } +pub trait SpecializationFactory { + fn create() -> Self; +} + +impl SpecializationFactory for DummySpecialization { + fn create() -> DummySpecialization { + DummySpecialization + } +} + pub trait TestNetFactory: Sized { + type Specialization: NetworkSpecialization + Clone + SpecializationFactory; type Verifier: 'static + Verifier; type PeerData: Default; @@ -445,9 +519,9 @@ pub trait TestNetFactory: Sized { fn make_verifier(&self, client: Arc, config: &ProtocolConfig) -> Arc; /// Get reference to peer. - fn peer(&self, i: usize) -> &Peer; - fn peers(&self) -> &Vec>>; - fn mut_peers>>)>(&mut self, closure: F); + fn peer(&self, i: usize) -> &Peer; + fn peers(&self) -> &Vec>>; + fn mut_peers>>)>(&mut self, closure: F); fn started(&self) -> bool; fn set_started(&mut self, now: bool); @@ -483,9 +557,9 @@ pub trait TestNetFactory: Sized { let (network_sender, network_port) = network_channel(ProtocolId::default()); let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); - let specialization = DummySpecialization {}; let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); + let specialization = self::SpecializationFactory::create(); let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( is_offline.clone(), @@ -514,7 +588,7 @@ pub trait TestNetFactory: Sized { )); self.mut_peers(|peers| { - peers.push(peer.clone()) + peers.push(peer) }); } @@ -666,11 +740,12 @@ pub trait TestNetFactory: Sized { } pub struct TestNet { - peers: Vec>>, + peers: Vec>>, started: bool, } impl TestNetFactory for TestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = (); @@ -688,15 +763,15 @@ impl TestNetFactory for TestNet { Arc::new(PassThroughVerifier(false)) } - fn peer(&self, i: usize) -> &Peer<()> { + fn peer(&self, i: usize) -> &Peer<(), Self::Specialization> { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>>)>(&mut self, closure: F) { closure(&mut self.peers); } @@ -728,6 +803,7 @@ impl JustificationImport for ForceFinalized { pub struct JustificationTestNet(TestNet); impl TestNetFactory for JustificationTestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = (); @@ -741,15 +817,15 @@ impl TestNetFactory for JustificationTestNet { self.0.make_verifier(client, config) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { self.0.peer(i) } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { self.0.peers() } - fn mut_peers>>)>(&mut self, closure: F ) { + fn mut_peers>>)>(&mut self, closure: F ) { self.0.mut_peers(closure) }