diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 9849594686..c51730b114 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -950,7 +950,7 @@ mod tests { let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) .for_each(move |_| { net.lock().send_import_notifications(); - net.lock().route_fast(); + net.lock().sync_without_disconnects(); Ok(()) }) .map(|_| ()) diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index eb0061a133..4989a1eb94 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -20,3 +20,7 @@ parity-codec = { version = "3.3", features = ["derive"] } [dev-dependencies] test_client = { package = "substrate-test-client", path = "../../test-client" } + +[features] +default = [] +test-helpers = [] \ No newline at end of file diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 7a418ae9f4..d1c5c69d02 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -149,6 +149,18 @@ impl BasicQueue { sender: importer_sender, } } + + /// Send synchronization request to the block import channel. + /// + /// The caller should wait for Link::synchronized() call to ensure that it has synchronized + /// with ImportQueue. + #[cfg(any(test, feature = "test-helpers"))] + pub fn synchronize(&self) { + self + .sender + .send(BlockImportMsg::Synchronize) + .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); + } } impl ImportQueue for BasicQueue { @@ -191,6 +203,8 @@ pub enum BlockImportMsg { ImportJustification(Origin, B::Hash, NumberFor, Justification), Start(Box>, Sender>), Stop, + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, } pub enum BlockImportWorkerMsg { @@ -201,6 +215,8 @@ pub enum BlockImportWorkerMsg { B::Hash, )>, ), + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, } enum ImportMsgType { @@ -279,13 +295,32 @@ impl BlockImporter { let _ = sender.send(Ok(())); }, BlockImportMsg::Stop => return false, + #[cfg(any(test, feature = "test-helpers"))] + BlockImportMsg::Synchronize => { + self.worker_sender + .send(BlockImportWorkerMsg::Synchronize) + .expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed"); + }, } true } fn handle_worker_msg(&mut self, msg: BlockImportWorkerMsg) -> bool { + let link = match self.link.as_ref() { + Some(link) => link, + None => { + trace!(target: "sync", "Received import result while import-queue has no link"); + return true; + }, + }; + let results = match msg { BlockImportWorkerMsg::Imported(results) => (results), + #[cfg(any(test, feature = "test-helpers"))] + BlockImportWorkerMsg::Synchronize => { + link.synchronized(); + return true; + }, _ => unreachable!("Import Worker does not send ImportBlocks message; qed"), }; let mut has_error = false; @@ -301,14 +336,6 @@ impl BlockImporter { has_error = true; } - let link = match self.link.as_ref() { - Some(link) => link, - None => { - trace!(target: "sync", "Received import result for {} while import-queue has no link", hash); - return true; - }, - }; - match result { Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { @@ -403,8 +430,12 @@ impl> BlockImportWorker { // Working until all senders have been dropped... match msg { BlockImportWorkerMsg::ImportBlocks(origin, blocks) => { - worker.import_a_batch_of_blocks(origin, blocks) - } + worker.import_a_batch_of_blocks(origin, blocks); + }, + #[cfg(any(test, feature = "test-helpers"))] + BlockImportWorkerMsg::Synchronize => { + let _ = worker.result_sender.send(BlockImportWorkerMsg::Synchronize); + }, _ => unreachable!("Import Worker does not receive the Imported message; qed"), } } @@ -480,6 +511,9 @@ pub trait Link: Send { fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) {} /// Restart sync. fn restart(&self) {} + /// Synchronization request has been processed. + #[cfg(any(test, feature = "test-helpers"))] + fn synchronized(&self) {} } /// Block import successful result. diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml index 4dcf199e1e..5ae9d0197f 100644 --- a/substrate/core/finality-grandpa/Cargo.toml +++ b/substrate/core/finality-grandpa/Cargo.toml @@ -25,6 +25,7 @@ fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "pri grandpa = { package = "finality-grandpa", version = "0.7.1", features = ["derive-codec"] } [dev-dependencies] +consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] } network = { package = "substrate-network", path = "../network", features = ["test-helpers"] } keyring = { package = "substrate-keyring", path = "../keyring" } test_client = { package = "substrate-test-client", path = "../test-client"} diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index b4a5d40bf9..eea03a34a2 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -419,7 +419,7 @@ fn run_to_completion_with( .for_each(move |_| { net.lock().send_import_notifications(); net.lock().send_finality_notifications(); - net.lock().route_fast(); + net.lock().sync_without_disconnects(); Ok(()) }) .map(|_| ()) @@ -515,7 +515,7 @@ fn finalize_3_voters_1_observer() { .map_err(|_| ()); let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_fast(); Ok(()) }) + .for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) }) .map(|_| ()) .map_err(|_| ()); @@ -680,7 +680,7 @@ fn transition_3_voters_twice_1_observer() { .for_each(move |_| { net.lock().send_import_notifications(); net.lock().send_finality_notifications(); - net.lock().route_fast(); + net.lock().sync_without_disconnects(); Ok(()) }) .map(|_| ()) @@ -789,7 +789,7 @@ fn sync_justifications_on_change_blocks() { // the last peer should get the justification by syncing from other peers while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { - net.lock().route_fast(); + net.lock().sync_without_disconnects(); } } @@ -1198,7 +1198,7 @@ fn voter_persists_its_votes() { .for_each(move |_| { net.lock().send_import_notifications(); net.lock().send_finality_notifications(); - net.lock().route_fast(); + net.lock().sync_without_disconnects(); Ok(()) }) .map(|_| ()) diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index a477373d6d..6899243eea 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -36,6 +36,7 @@ test_client = { package = "substrate-test-client", path = "../../core/test-clien env_logger = { version = "0.6" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } test_client = { package = "substrate-test-client", path = "../../core/test-client" } +consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] } [features] default = [] diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 99520ea99a..2d3e81daa3 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -187,25 +187,25 @@ struct ContextData { } /// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait SpecTask> { - fn call_box(self: Box, spec: &mut S, context: &mut Context); +pub trait SpecTask> { + fn call_box(self: Box, spec: &mut S, context: &mut Context); } impl, F: FnOnce(&mut S, &mut Context)> SpecTask for F { - fn call_box(self: Box, spec: &mut S, context: &mut Context) { - (*self)(spec, context) - } + fn call_box(self: Box, spec: &mut S, context: &mut Context) { + (*self)(spec, context) + } } /// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait GossipTask { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context); +pub trait GossipTask { + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context); } impl, &mut Context)> GossipTask for F { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context) { - (*self)(gossip, context) - } + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context) { + (*self)(gossip, context) + } } /// Messages sent to Protocol from elsewhere inside the system. @@ -246,6 +246,9 @@ pub enum ProtocolMsg> { Stop, /// Tell protocol to perform regular maintenance. Tick, + /// Synchronization request. + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, } /// Messages sent to Protocol from Network-libp2p. @@ -258,6 +261,9 @@ pub enum FromNetworkMsg { CustomMessage(PeerId, Message), /// Let protocol know a peer is currenlty clogged. PeerClogged(PeerId, Option>), + /// Synchronization request. + #[cfg(any(test, feature = "test-helpers"))] + Synchronize, } enum Incoming> { @@ -408,6 +414,8 @@ impl, H: ExHashT> Protocol { self.stop(); return false; }, + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Synchronize => self.network_chan.send(NetworkMsg::Synchronized), } true } @@ -420,6 +428,8 @@ impl, H: ExHashT> Protocol { FromNetworkMsg::CustomMessage(who, message) => { self.on_custom_message(who, message) }, + #[cfg(any(test, feature = "test-helpers"))] + FromNetworkMsg::Synchronize => self.network_chan.send(NetworkMsg::Synchronized), } true } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 7244080bf7..9964286f30 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -466,6 +466,9 @@ pub enum NetworkMsg { Outgoing(PeerId, Message), /// Report a peer. ReportPeer(PeerId, Severity), + /// Synchronization response. + #[cfg(any(test, feature = "test-helpers"))] + Synchronized, } /// Starts the background thread that handles the networking. @@ -548,6 +551,8 @@ fn run_thread( }, } }, + #[cfg(any(test, feature = "test-helpers"))] + NetworkMsg::Synchronized => (), } Ok(()) }) diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 6f50967bba..97b56e1cb7 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -21,11 +21,9 @@ mod block_import; #[cfg(test)] mod sync; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; -use std::time::Duration; use log::trace; use client; @@ -36,7 +34,7 @@ use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; -use crossbeam_channel::{self as channel, Sender, select}; +use crossbeam_channel::{Sender, RecvError}; use futures::Future; use futures::sync::{mpsc, oneshot}; use crate::message::Message; @@ -119,62 +117,28 @@ pub type PeersClient = client::Client> { - import_done: Arc, - hash: Arc>, link: NetworkLink, - protocol_sender: Sender>, - network_sender: NetworkChan, + network_to_protocol_sender: Sender>, } impl> TestLink { fn new( protocol_sender: Sender>, + network_to_protocol_sender: Sender>, network_sender: NetworkChan ) -> TestLink { TestLink { - protocol_sender: protocol_sender.clone(), - network_sender: network_sender.clone(), - import_done: Arc::new(AtomicBool::new(false)), - hash: Arc::new(Mutex::new(Default::default())), + network_to_protocol_sender, link: NetworkLink { protocol_sender, network_sender, } } } - - fn clone_link(&self) -> Self { - TestLink { - protocol_sender: self.protocol_sender.clone(), - network_sender: self.network_sender.clone(), - import_done: self.import_done.clone(), - hash: self.hash.clone(), - link: NetworkLink { - protocol_sender: self.protocol_sender.clone(), - network_sender: self.network_sender.clone(), - } - } - } - - /// Set the hash which will be awaited for import. - fn with_hash(&self, hash: Hash) { - self.import_done.store(false, Ordering::SeqCst); - *self.hash.lock() = hash; - } - - /// Simulate a synchronous import. - fn wait_for_import(&self) { - while !self.import_done.load(Ordering::SeqCst) { - thread::sleep(Duration::from_millis(20)); - } - } } impl> Link for TestLink { fn block_imported(&self, hash: &Hash, number: NumberFor) { - if hash == &*self.hash.lock() { - self.import_done.store(true, Ordering::SeqCst); - } self.link.block_imported(hash, number); } @@ -201,6 +165,10 @@ impl> Link for TestLink { fn restart(&self) { self.link.restart(); } + + fn synchronized(&self) { + let _ = self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize); + } } pub struct Peer> { @@ -209,43 +177,137 @@ pub struct Peer> { pub peers: Arc>>>, pub peer_id: PeerId, client: Arc, - network_to_protocol_sender: Sender>, - pub protocol_sender: Sender>, - network_link: TestLink, - network_port: Arc>>, - pub import_queue: Box>, + net_proto_channel: ProtocolChannel, + pub import_queue: Box>, pub data: D, best_hash: Mutex>, finalized_hash: Mutex>, } +type MessageFilter = Fn(&NetworkMsg) -> bool; + +struct ProtocolChannel> { + buffered_messages: Mutex>>, + network_to_protocol_sender: Sender>, + client_to_protocol_sender: Sender>, + protocol_to_network_receiver: NetworkPort, +} + +impl> ProtocolChannel { + /// Create new buffered network port. + pub fn new( + network_to_protocol_sender: Sender>, + client_to_protocol_sender: Sender>, + protocol_to_network_receiver: NetworkPort, + ) -> Self { + ProtocolChannel { + buffered_messages: Mutex::new(VecDeque::new()), + network_to_protocol_sender, + client_to_protocol_sender, + protocol_to_network_receiver, + } + } + + /// Send message from network to protocol. + pub fn send_from_net(&self, message: FromNetworkMsg) { + let _ = self.network_to_protocol_sender.send(message); + + let _ = self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize); + let _ = self.wait_sync(); + } + + /// Send message from client to protocol. + pub fn send_from_client(&self, message: ProtocolMsg) { + let _ = self.client_to_protocol_sender.send(message); + + let _ = self.client_to_protocol_sender.send(ProtocolMsg::Synchronize); + let _ = self.wait_sync(); + } + + /// Wait until synchronization response is generated by the protocol. + pub fn wait_sync(&self) -> Result<(), RecvError> { + loop { + match self.protocol_to_network_receiver.receiver().recv() { + Ok(NetworkMsg::Synchronized) => return Ok(()), + Err(error) => return Err(error), + Ok(msg) => self.buffered_messages.lock().push_back(msg), + } + } + } + + /// Produce the next pending message to send to another peer. + fn pending_message(&self, message_filter: &MessageFilter) -> Option> { + if let Some(message) = self.buffered_message(message_filter) { + return Some(message); + } + + while let Some(message) = self.channel_message() { + if message_filter(&message) { + return Some(message) + } else { + self.buffered_messages.lock().push_back(message); + } + } + + None + } + + /// Whether this peer is done syncing (has no messages to send). + fn is_done(&self) -> bool { + self.buffered_messages.lock().is_empty() + && self.protocol_to_network_receiver.receiver().is_empty() + } + + /// Return oldest buffered message if it exists. + fn buffered_message(&self, message_filter: &MessageFilter) -> Option> { + let mut buffered_messages = self.buffered_messages.lock(); + for i in 0..buffered_messages.len() { + if message_filter(&buffered_messages[i]) { + return buffered_messages.remove(i); + } + } + + None + } + + /// Receive message from the channel. + fn channel_message(&self) -> Option> { + self.protocol_to_network_receiver.receiver().try_recv().ok() + } +} + impl> Peer { fn new( is_offline: Arc, is_major_syncing: Arc, peers: Arc>>>, client: Arc, - import_queue: Box>, + import_queue: Box>, network_to_protocol_sender: Sender>, protocol_sender: Sender>, network_sender: NetworkChan, network_port: NetworkPort, data: D, ) -> Self { - let network_port = Arc::new(Mutex::new(network_port)); - let network_link = TestLink::new(protocol_sender.clone(), network_sender.clone()); - import_queue.start(Box::new(network_link.clone_link())).expect("Test ImportQueue always starts"); + let net_proto_channel = ProtocolChannel::new( + network_to_protocol_sender.clone(), + protocol_sender.clone(), + network_port, + ); + let network_link = TestLink::new( + protocol_sender.clone(), + network_to_protocol_sender.clone(), + network_sender.clone(), + ); + import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts"); Peer { is_offline, is_major_syncing, peers, peer_id: PeerId::random(), client, - network_to_protocol_sender, - protocol_sender, import_queue, - network_link, - network_port, + net_proto_channel, data, best_hash: Mutex::new(None), finalized_hash: Mutex::new(None), @@ -260,9 +322,7 @@ impl> Peer { .header(&BlockId::Hash(info.chain.best_hash)) .unwrap() .unwrap(); - let _ = self - .protocol_sender - .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); + self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header)); } pub fn on_block_imported( @@ -270,9 +330,7 @@ impl> Peer { hash: ::Hash, header: &::Header, ) { - let _ = self - .protocol_sender - .send(ProtocolMsg::BlockImported(hash, header.clone())); + self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(hash, header.clone())); } // SyncOracle: are we connected to any peer? @@ -287,45 +345,38 @@ impl> Peer { /// Called on connection to other indicated peer. fn on_connect(&self, other: &Self) { - let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new())); + self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new())); } /// Called on disconnect from other indicated peer. fn on_disconnect(&self, other: &Self) { - let _ = self - .network_to_protocol_sender - .send(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new())); + self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new())); } /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: &Self, msg: Message) { - let _ = self - .network_to_protocol_sender - .send(FromNetworkMsg::CustomMessage(from.peer_id.clone(), msg)); + fn receive_message(&self, from: &PeerId, msg: Message) { + self.net_proto_channel.send_from_net(FromNetworkMsg::CustomMessage(from.clone(), msg)); } /// Produce the next pending message to send to another peer. - fn pending_message(&self) -> Option> { - select! { - recv(self.network_port.lock().receiver()) -> msg => return msg.ok(), - // If there are no messages ready, give protocol a change to send one. - recv(channel::after(Duration::from_millis(100))) -> _ => return None, - } - } - - /// Produce the next pending message to send to another peer, without waiting. - fn pending_message_fast(&self) -> Option> { - self.network_port.lock().receiver().try_recv().ok() + fn pending_message(&self, message_filter: &MessageFilter) -> Option> { + self.net_proto_channel.pending_message(message_filter) } /// Whether this peer is done syncing (has no messages to send). fn is_done(&self) -> bool { - self.network_port.lock().receiver().is_empty() + self.net_proto_channel.is_done() + } + + /// Synchronize with import queue. + fn import_queue_sync(&self) { + self.import_queue.synchronize(); + let _ = self.net_proto_channel.wait_sync(); } /// Execute a "sync step". This is called for each peer after it sends a packet. fn sync_step(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::Tick); + self.net_proto_channel.send_from_client(ProtocolMsg::Tick); } /// Send block import notifications. @@ -340,10 +391,7 @@ impl> Peer { } let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - let _ = self - .protocol_sender - .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); - + self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header)); *best_hash = Some(info.chain.best_hash); } @@ -359,16 +407,13 @@ impl> Peer { } let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); - let _ = self - .protocol_sender - .send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())); - + self.net_proto_channel.send_from_client(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())); *finalized_hash = Some(info.chain.finalized_hash); } /// Restart sync for a peer. fn restart_sync(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::Abort); + self.net_proto_channel.send_from_client(ProtocolMsg::Abort); } /// Push a message into the gossip network and relay to peers. @@ -380,10 +425,14 @@ impl> Peer { data: Vec, force: bool, ) { - let recipient = if force { GossipMessageRecipient::BroadcastToAll } else { GossipMessageRecipient::BroadcastNew }; - let _ = self - .protocol_sender - .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient)); + let recipient = if force { + GossipMessageRecipient::BroadcastToAll + } else { + GossipMessageRecipient::BroadcastNew + }; + self.net_proto_channel.send_from_client( + ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient), + ); } pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: ::Hash) { @@ -408,22 +457,18 @@ impl> Peer { pub fn with_gossip(&self, f: F) where F: FnOnce(&mut ConsensusGossip, &mut Context) + Send + 'static { - let _ = self - .protocol_sender - .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); + self.net_proto_channel.send_from_client(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } /// Announce a block to peers. pub fn announce_block(&self, block: Hash) { - let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(block)); + self.net_proto_channel.send_from_client(ProtocolMsg::AnnounceBlock(block)); } /// Request a justification for the given block. #[cfg(test)] fn request_justification(&self, hash: &::primitives::H256, number: NumberFor) { - let _ = self - .protocol_sender - .send(ProtocolMsg::RequestJustification(hash.clone(), number)); + self.net_proto_channel.send_from_client(ProtocolMsg::RequestJustification(hash.clone(), number)); } /// Add blocks to the peer -- edit the block before adding @@ -452,7 +497,6 @@ impl> Peer { ); let header = block.header.clone(); at = hash; - self.network_link.with_hash(hash); self.import_queue.import_blocks( origin, vec![IncomingBlock { @@ -463,9 +507,11 @@ impl> Peer { justification: None, }], ); - // Simulate a sync import. - self.network_link.wait_for_import(); + + // make sure block import has completed + self.import_queue_sync(); } + at } @@ -525,7 +571,7 @@ impl TransactionPool for EmptyTransactionPool { } pub trait SpecializationFactory { - fn create() -> Self; + fn create() -> Self; } impl SpecializationFactory for DummySpecialization { @@ -633,86 +679,86 @@ pub trait TestNetFactory: Sized { } } } - self.route(None); + + loop { + // we only deliver Status messages during start + let need_continue = self.route_single(true, None, &|msg| match *msg { + NetworkMsg::Outgoing(_, crate::message::generic::Message::Status(_)) => true, + NetworkMsg::Outgoing(_, _) => false, + NetworkMsg::ReportPeer(_, _) | NetworkMsg::Synchronized => true, + }); + if !need_continue { + break; + } + } + self.set_started(true); } - /// Do one step of routing. - fn route(&mut self, disconnected: Option>) { - self.mut_peers(move |peers| { - let mut to_disconnect = HashSet::new(); - for (peer_pos, peer) in peers.iter().enumerate() { - let packet = peer.pending_message(); - match packet { - None => continue, - Some(NetworkMsg::Outgoing(recipient, packet)) => { - let recipient = peers.iter().position(|p| p.peer_id == recipient).unwrap(); - if let Some(disconnected) = disconnected.as_ref() { - let mut current = HashSet::new(); - current.insert(peer_pos); - current.insert(recipient); - // Not routing message between "disconnected" nodes. - if disconnected.is_subset(¤t) { - continue; + /// Do single round of message routing: single message from every peer is routed. + fn route_single( + &mut self, + disconnect: bool, + disconnected: Option>, + message_filter: &MessageFilter, + ) -> bool { + let mut had_messages = false; + let mut to_disconnect = HashSet::new(); + let peers = self.peers(); + for peer in peers { + if let Some(message) = peer.pending_message(message_filter) { + match message { + NetworkMsg::Outgoing(recipient_id, packet) => { + had_messages = true; + + let sender_pos = peers.iter().position(|p| p.peer_id == peer.peer_id).unwrap(); + let recipient_pos = peers.iter().position(|p| p.peer_id == recipient_id).unwrap(); + if disconnect { + if let Some(ref disconnected) = disconnected { + let mut current = HashSet::new(); + current.insert(sender_pos); + current.insert(recipient_pos); + // Not routing message between "disconnected" nodes. + if disconnected.is_subset(¤t) { + continue; + } } } - peers[recipient].receive_message(peer, packet) - } - Some(NetworkMsg::ReportPeer(who, _)) => { - to_disconnect.insert(who); - } + + peers[recipient_pos].receive_message(&peer.peer_id, packet); + }, + NetworkMsg::ReportPeer(who, _) => { + if disconnect { + to_disconnect.insert(who); + } + }, + _ => (), } } - for d in to_disconnect { - if let Some(d) = peers.iter().find(|p| p.peer_id == d) { - for peer in 0..peers.len() { - peers[peer].on_disconnect(d); - } + } + + for d in to_disconnect { + if let Some(d) = peers.iter().find(|p| p.peer_id == d) { + for peer in 0..peers.len() { + peers[peer].on_disconnect(d); } } - }); - } + } - /// Route all pending outgoing messages, without waiting or disconnecting. - fn route_fast(&mut self) { - self.mut_peers(move |peers| { - for peer in 0..peers.len() { - while let Some(NetworkMsg::Outgoing(recipient, packet)) = peers[peer].pending_message_fast() { - if let Some(p) = peers.iter().find(|p| p.peer_id == recipient) { - p.receive_message(&peers[peer], packet) - } - } - } - }); - } + // make sure that the protocol(s) has processed all messages that have been queued + self.peers().iter().for_each(|peer| peer.import_queue_sync()); - /// Do a step of synchronization. - fn sync_step(&mut self) { - self.route(None); - - self.mut_peers(|peers| { - for peer in peers { - peer.sync_step(); - } - }) + had_messages } /// Send block import notifications for all peers. fn send_import_notifications(&mut self) { - self.mut_peers(|peers| { - for peer in peers { - peer.send_import_notifications(); - } - }) + self.peers().iter().for_each(|peer| peer.send_import_notifications()) } /// Send block finalization notifications for all peers. fn send_finality_notifications(&mut self) { - self.mut_peers(|peers| { - for peer in peers { - peer.send_finality_notifications(); - } - }) + self.peers().iter().for_each(|peer| peer.send_finality_notifications()) } /// Restart sync for a peer. @@ -722,48 +768,30 @@ pub trait TestNetFactory: Sized { /// Perform synchronization until complete, if provided the /// given nodes set are excluded from sync. - fn sync_with(&mut self, disconnected: Option>) -> u32 { + fn sync_with(&mut self, disconnect: bool, disconnected: Option>) { self.start(); - let mut total_steps = 0; - let mut done = 0; - - loop { - if done > 3 { break; } - if self.done() { - done += 1; - } else { - done = 0; - } - - self.sync_step(); - self.route(disconnected.clone()); - - total_steps += 1; - } - - total_steps - } - - /// Perform synchronization until complete. - fn sync(&mut self) -> u32 { - self.sync_with(None) - } - - /// Perform synchronization until complete, - /// excluding sync between certain nodes. - fn sync_with_disconnected(&mut self, disconnected: HashSet) -> u32 { - self.sync_with(Some(disconnected)) - } - - /// Do the given amount of sync steps. - fn sync_steps(&mut self, count: usize) { - self.start(); - for _ in 0..count { - self.sync_step(); + while self.route_single(disconnect, disconnected.clone(), &|_| true) { + // give protocol a chance to do its maintain procedures + self.peers().iter().for_each(|peer| peer.sync_step()); } } - /// Whether all peers have synced. + /// Deliver at most 1 pending message from every peer. + fn sync_step(&mut self) { + self.route_single(true, None, &|_| true); + } + + /// Deliver pending messages until there are no more. + fn sync(&mut self) { + self.sync_with(true, None) + } + + /// Deliver pending messages until there are no more. Do not disconnect nodes. + fn sync_without_disconnects(&mut self) { + self.sync_with(false, None) + } + + /// Whether all peers have no pending outgoing messages. fn done(&self) -> bool { self.peers().iter().all(|p| p.is_done()) } diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 9bbf0a32b7..94bbc68b94 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -48,8 +48,7 @@ fn sync_peers_works() { net.sync(); for peer in 0..3 { // Assert peers is up to date. - let peers = net.peer(peer).peers.read(); - assert_eq!(peers.len(), 2); + assert_eq!(net.peer(peer).peers.read().len(), 2); // And then disconnect. for other in 0..3 { if other != peer { @@ -78,9 +77,6 @@ fn sync_cycle_from_offline_to_syncing_to_offline() { // Generate blocks. net.peer(2).push_blocks(100, false); net.start(); - net.route_fast(); - thread::sleep(Duration::from_millis(100)); - net.route_fast(); for peer in 0..3 { // Online assert!(!net.peer(peer).is_offline()); @@ -102,7 +98,6 @@ fn sync_cycle_from_offline_to_syncing_to_offline() { net.peer(peer).on_disconnect(net.peer(other)); } } - thread::sleep(Duration::from_millis(100)); assert!(net.peer(peer).is_offline()); assert!(!net.peer(peer).is_major_syncing()); } @@ -116,9 +111,7 @@ fn syncing_node_not_major_syncing_when_disconnected() { // Generate blocks. net.peer(2).push_blocks(100, false); net.start(); - net.route_fast(); - thread::sleep(Duration::from_millis(100)); - net.route_fast(); + net.sync_step(); // Peer 1 is major-syncing. assert!(net.peer(1).is_major_syncing()); @@ -126,7 +119,6 @@ fn syncing_node_not_major_syncing_when_disconnected() { // Disconnect peer 1 form everyone else. net.peer(1).on_disconnect(net.peer(0)); net.peer(1).on_disconnect(net.peer(2)); - thread::sleep(Duration::from_millis(100)); // Peer 1 is not major-syncing. assert!(!net.peer(1).is_major_syncing()); @@ -363,7 +355,7 @@ fn blocks_are_not_announced_by_light_nodes() { let mut disconnected = HashSet::new(); disconnected.insert(0); disconnected.insert(2); - net.sync_with_disconnected(disconnected); + net.sync_with(true, Some(disconnected)); // peer 0 has the best chain // peer 1 has the best chain