diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index dd6b82d407..9c6ff6c548 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4165,6 +4165,7 @@ dependencies = [ "substrate-primitives 1.0.0", "substrate-test-client 1.0.0", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index ca2e7621f6..473690b2ac 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -31,6 +31,7 @@ peerset = { package = "substrate-peerset", path = "../../core/peerset" } tokio = "0.1.11" keyring = { package = "substrate-keyring", path = "../../core/keyring", optional = true } test_client = { package = "substrate-test-client", path = "../../core/test-client", optional = true } +void = "1.0" [dev-dependencies] env_logger = { version = "0.6" } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index a907a3052d..b3c56c9e7c 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -14,9 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crossbeam_channel::{self as channel, Receiver, Sender, select}; -use futures::sync::mpsc; -use parking_lot::Mutex; +use futures::{prelude::*, sync::mpsc}; use network_libp2p::PeerId; use primitives::storage::StorageKey; use runtime_primitives::{generic::BlockId, ConsensusEngineId}; @@ -35,7 +33,7 @@ use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::{cmp, num::NonZeroUsize, thread, time}; +use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::Client; use client::light::fetcher::ChangesProof; @@ -46,8 +44,6 @@ const REQUEST_TIMEOUT_SEC: u64 = 40; const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); /// Interval at which we propagate exstrinsics; const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); -/// Interval at which we send status updates on the SyncProvider status stream. -const STATUS_INTERVAL: time::Duration = time::Duration::from_millis(5000); /// Current protocol version. pub(crate) const CURRENT_VERSION: u32 = 3; @@ -77,10 +73,12 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { - status_sinks: Arc>>>>, network_chan: NetworkChan, - port: Receiver>, - from_network_port: Receiver>, + port: mpsc::UnboundedReceiver>, + /// Interval at which we call `tick`. + tick_timeout: tokio::timer::Interval, + /// Interval at which we call `propagate_extrinsics`. + propagate_timeout: tokio::timer::Interval, config: ProtocolConfig, on_demand: Option>>, genesis_hash: B::Hash, @@ -256,8 +254,6 @@ pub enum ProtocolMsg> { BlocksProcessed(Vec, bool), /// Tell protocol to restart sync. RestartSync, - /// Propagate status updates. - Status, /// Tell protocol to propagate extrinsics. PropagateExtrinsics, /// Tell protocol that a block was imported (sent by the import-queue). @@ -284,39 +280,17 @@ pub enum ProtocolMsg> { /// Only used in tests. #[cfg(any(test, feature = "test-helpers"))] Abort, - /// Tell protocol to abort sync and stop. - Stop, /// Tell protocol to perform regular maintenance. + #[cfg(any(test, feature = "test-helpers"))] Tick, /// Synchronization request. #[cfg(any(test, feature = "test-helpers"))] Synchronize, } -/// Messages sent to Protocol from Network-libp2p. -pub enum FromNetworkMsg { - /// A peer connected, with debug info. - PeerConnected(PeerId, String), - /// A peer disconnected, with debug info. - PeerDisconnected(PeerId, String), - /// A custom message from another peer. - CustomMessage(PeerId, Message), - /// Let protocol know a peer is currenlty clogged. - PeerClogged(PeerId, Option>), - /// Synchronization request. - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, -} - -enum Incoming> { - FromNetwork(FromNetworkMsg), - FromClient(ProtocolMsg) -} - impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( - status_sinks: Arc>>>>, is_offline: Arc, is_major_syncing: Arc, connected_peers: Arc>>>, @@ -327,92 +301,81 @@ impl, H: ExHashT> Protocol { on_demand: Option>>, transaction_pool: Arc>, specialization: S, - ) -> error::Result<(Sender>, Sender>)> { - let (protocol_sender, port) = channel::unbounded(); - let (from_network_sender, from_network_port) = channel::bounded(4); + ) -> error::Result<(Protocol, mpsc::UnboundedSender>)> { + let (protocol_sender, port) = mpsc::unbounded(); let info = chain.info()?; let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue); - let _ = thread::Builder::new() - .name("Protocol".into()) - .spawn(move || { - let mut protocol = Protocol { - status_sinks, - network_chan, - from_network_port, - port, - config: config, - context_data: ContextData { - peers: HashMap::new(), - chain, - }, - on_demand, - genesis_hash: info.chain.genesis_hash, - sync, - specialization: specialization, - consensus_gossip: ConsensusGossip::new(), - handshaking_peers: HashMap::new(), - connected_peers, - transaction_pool: transaction_pool, - }; - let tick_timeout = channel::tick(TICK_TIMEOUT); - let propagate_timeout = channel::tick(PROPAGATE_TIMEOUT); - let status_interval = channel::tick(STATUS_INTERVAL); - while protocol.run(&tick_timeout, &propagate_timeout, &status_interval) { - // Running until all senders have been dropped... - } - }) - .expect("Protocol thread spawning failed"); - Ok((protocol_sender, from_network_sender)) - } - - fn run( - &mut self, - tick_timeout: &Receiver, - propagate_timeout: &Receiver, - status_interval: &Receiver, - ) -> bool { - let msg = select! { - recv(self.port) -> event => { - match event { - Ok(msg) => Incoming::FromClient(msg), - // Our sender has been dropped, quit. - Err(_) => { - Incoming::FromClient(ProtocolMsg::Stop) - }, - } - }, - recv(self.from_network_port) -> event => { - match event { - Ok(msg) => Incoming::FromNetwork(msg), - // Our sender has been dropped, quit. - Err(_) => { - Incoming::FromClient(ProtocolMsg::Stop) - }, - } - }, - recv(tick_timeout) -> _ => { - Incoming::FromClient(ProtocolMsg::Tick) - }, - recv(propagate_timeout) -> _ => { - Incoming::FromClient(ProtocolMsg::PropagateExtrinsics) - }, - recv(status_interval) -> _ => { - Incoming::FromClient(ProtocolMsg::Status) + let protocol = Protocol { + network_chan, + port, + tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT), + propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT), + config: config, + context_data: ContextData { + peers: HashMap::new(), + chain, }, + on_demand, + genesis_hash: info.chain.genesis_hash, + sync, + specialization: specialization, + consensus_gossip: ConsensusGossip::new(), + handshaking_peers: HashMap::new(), + connected_peers, + transaction_pool: transaction_pool, }; - self.handle_msg(msg) + + Ok((protocol, protocol_sender)) } - fn handle_msg(&mut self, msg: Incoming) -> bool { - match msg { - Incoming::FromNetwork(msg) => self.handle_network_msg(msg), - Incoming::FromClient(msg) => self.handle_client_msg(msg), + /// Returns an object representing the status of the protocol. + pub fn status(&mut self) -> ProtocolStatus { + ProtocolStatus { + sync: self.sync.status(), + num_peers: self.context_data.peers.values().count(), + num_active_peers: self + .context_data + .peers + .values() + .filter(|p| p.block_request.is_some()) + .count(), } } +} +impl, H: ExHashT> Future for Protocol { + type Item = (); + type Error = void::Void; + + fn poll(&mut self) -> Poll { + while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { + self.tick(); + } + + while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { + self.propagate_extrinsics(); + } + + loop { + match self.port.poll() { + Ok(Async::Ready(None)) | Err(_) => { + self.stop(); + return Ok(Async::Ready(())) + } + Ok(Async::Ready(Some(msg))) => if !self.handle_client_msg(msg) { + return Ok(Async::Ready(())) + } + Ok(Async::NotReady) => break, + } + } + + Ok(Async::NotReady) + } +} + +impl, H: ExHashT> Protocol { fn handle_client_msg(&mut self, msg: ProtocolMsg) -> bool { match msg { - ProtocolMsg::Status => self.on_status(), ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { @@ -449,13 +412,10 @@ impl, H: ExHashT> Protocol { }, ProtocolMsg::JustificationImportResult(hash, number, success) => self.sync.justification_import_result(hash, number, success), ProtocolMsg::PropagateExtrinsics => self.propagate_extrinsics(), + #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => self.tick(), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Abort => self.abort(), - ProtocolMsg::Stop => { - self.stop(); - return false; - }, #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Synchronize => { trace!(target: "sync", "handle_client_msg: received Synchronize msg"); @@ -465,20 +425,6 @@ impl, H: ExHashT> Protocol { true } - fn handle_network_msg(&mut self, msg: FromNetworkMsg) -> bool { - match msg { - FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), - FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), - FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), - FromNetworkMsg::CustomMessage(who, message) => { - self.on_custom_message(who, message) - }, - #[cfg(any(test, feature = "test-helpers"))] - FromNetworkMsg::Synchronize => self.network_chan.send(NetworkMsg::Synchronized), - } - true - } - fn handle_response(&mut self, who: PeerId, response: &message::BlockResponse) -> Option> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { if let Some(_) = peer.obsolete_requests.remove(&response.id) { @@ -511,22 +457,7 @@ impl, H: ExHashT> Protocol { } } - /// Propagates protocol statuses. - fn on_status(&mut self) { - let status = ProtocolStatus { - sync: self.sync.status(), - num_peers: self.context_data.peers.values().count(), - num_active_peers: self - .context_data - .peers - .values() - .filter(|p| p.block_request.is_some()) - .count(), - }; - self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); - } - - fn on_custom_message(&mut self, who: PeerId, message: Message) { + pub fn on_custom_message(&mut self, who: PeerId, message: Message) { match message { GenericMessage::Status(s) => self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), @@ -595,14 +526,14 @@ impl, H: ExHashT> Protocol { } /// Called when a new peer is connected - fn on_peer_connected(&mut self, who: PeerId, debug_info: String) { + pub fn on_peer_connected(&mut self, who: PeerId, debug_info: String) { trace!(target: "sync", "Connecting {}: {}", who, debug_info); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); self.send_status(who); } /// Called by peer when it is disconnecting - fn on_peer_disconnected(&mut self, peer: PeerId, debug_info: String) { + pub fn on_peer_disconnected(&mut self, peer: PeerId, debug_info: String) { trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -637,6 +568,12 @@ impl, H: ExHashT> Protocol { } } + /// Puts the `Synchronized` message on `network_chan`. + #[cfg(any(test, feature = "test-helpers"))] + pub fn synchronize(&self) { + self.network_chan.send(NetworkMsg::Synchronized); + } + fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, @@ -915,7 +852,7 @@ impl, H: ExHashT> Protocol { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&mut self, hash: B::Hash) { + fn announce_block(&mut self, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index bfc2333ee8..4c0afde42d 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::{io, thread}; +use std::{io, thread, time::Duration}; use log::{warn, debug, error, info}; use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc}; @@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; -use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; @@ -40,6 +40,9 @@ use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use tokio::prelude::task::AtomicTask; use tokio::runtime::Builder as RuntimeBuilder; +/// Interval at which we send status updates on the SyncProvider status stream. +const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + pub use network_libp2p::PeerId; /// Type that represents fetch completion future. @@ -81,22 +84,22 @@ pub trait TransactionPool: Send + Sync { #[derive(Clone)] pub struct NetworkLink> { /// The protocol sender - pub(crate) protocol_sender: Sender>, + pub(crate) protocol_sender: mpsc::UnboundedSender>, /// The network sender pub(crate) network_sender: NetworkChan, } impl> Link for NetworkLink { fn block_imported(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); } fn blocks_processed(&self, processed_blocks: Vec, has_error: bool) { - let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); } fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { info!("Invalid justification provided by {} for #{}", who, hash); let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); @@ -105,11 +108,11 @@ impl> Link for NetworkLink { } fn clear_justification_requests(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::ClearJustificationRequests); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests); } fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); } fn report_peer(&self, who: PeerId, reputation_change: i32) { @@ -117,7 +120,7 @@ impl> Link for NetworkLink { } fn restart(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync); } } @@ -151,7 +154,7 @@ pub struct Service> { /// nodes it should be connected to or not. peerset: PeersetHandle, /// Protocol sender - protocol_sender: Sender>, + protocol_sender: mpsc::UnboundedSender>, /// Sender for messages to the background service task, and handle for the background thread. /// Dropping the sender should close the task and the thread. /// This is an `Option` because we need to extract it in the destructor. @@ -171,8 +174,7 @@ impl> Service { let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let (protocol_sender, network_to_protocol_sender) = Protocol::new( - status_sinks.clone(), + let (protocol, protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), peers.clone(), @@ -187,8 +189,9 @@ impl> Service { let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); let registered = RegisteredProtocol::new(protocol_id, &versions); let (thread, network, peerset) = start_thread( - network_to_protocol_sender, + protocol, network_port, + status_sinks.clone(), params.network_config, registered, )?; @@ -236,19 +239,19 @@ impl> Service { pub fn on_block_imported(&self, hash: B::Hash, header: B::Header) { let _ = self .protocol_sender - .send(ProtocolMsg::BlockImported(hash, header)); + .unbounded_send(ProtocolMsg::BlockImported(hash, header)); } /// Called when a new block is finalized by the client. pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) { let _ = self .protocol_sender - .send(ProtocolMsg::BlockFinalized(hash, header)); + .unbounded_send(ProtocolMsg::BlockFinalized(hash, header)); } /// Called when new transactons are imported by the client. pub fn trigger_repropagate(&self) { - let _ = self.protocol_sender.send(ProtocolMsg::PropagateExtrinsics); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::PropagateExtrinsics); } /// Make sure an important block is propagated to peers. @@ -256,7 +259,7 @@ impl> Service { /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. pub fn announce_block(&self, hash: B::Hash) { - let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(hash)); + let _ = self.protocol_sender.unbounded_send(ProtocolMsg::AnnounceBlock(hash)); } /// Send a consensus message through the gossip @@ -269,7 +272,7 @@ impl> Service { ) { let _ = self .protocol_sender - .send(ProtocolMsg::GossipConsensusMessage( + .unbounded_send(ProtocolMsg::GossipConsensusMessage( topic, engine_id, message, recipient, )); } @@ -286,7 +289,7 @@ impl> Service { { let _ = self .protocol_sender - .send(ProtocolMsg::ExecuteWithSpec(Box::new(f))); + .unbounded_send(ProtocolMsg::ExecuteWithSpec(Box::new(f))); } /// Execute a closure with the consensus gossip. @@ -295,7 +298,7 @@ impl> Service { { let _ = self .protocol_sender - .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); + .unbounded_send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } /// Are we in the process of downloading the chain? @@ -471,9 +474,10 @@ pub enum NetworkMsg { } /// Starts the background thread that handles the networking. -fn start_thread( - protocol_sender: Sender>, +fn start_thread, H: ExHashT>( + protocol: Protocol, network_port: NetworkPort, + status_sinks: Arc>>>>, config: NetworkConfiguration, registered: RegisteredProtocol>, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { @@ -491,7 +495,7 @@ fn start_thread( let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; let peerset_clone = peerset.clone(); let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread(protocol_sender, service_clone, network_port, peerset_clone) + let fut = run_thread(protocol, service_clone, network_port, status_sinks, peerset_clone) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); @@ -508,14 +512,28 @@ fn start_thread( } /// Runs the background thread that handles the networking. -fn run_thread( - protocol_sender: Sender>, +fn run_thread, H: ExHashT>( + mut protocol: Protocol, network_service: Arc>>>, network_port: NetworkPort, + status_sinks: Arc>>>>, peerset: PeersetHandle, ) -> impl Future { + // Interval at which we send status updates on the `status_sinks`. + let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL); futures::future::poll_fn(move || { + while let Ok(Async::Ready(_)) = status_interval.poll() { + let status = protocol.status(); + status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); + } + + match protocol.poll() { + Ok(Async::Ready(())) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {} + Err(err) => void::unreachable(err), + } + loop { match network_port.take_one_message() { Ok(None) => break, @@ -540,19 +558,17 @@ fn run_thread( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); - let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info)); - } - Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { - let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info)); - } - Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => { - let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message)); + protocol.on_peer_connected(peer_id, debug_info); } + Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => + protocol.on_peer_disconnected(peer_id, debug_info), + Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => + protocol.on_custom_message(peer_id, message), Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg))); + protocol.on_clogged_peer(peer_id.clone(), Some(msg)); } } Ok(Async::Ready(None)) => return Ok(Async::Ready(())), diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 2b85c198e3..5a8d3b34b1 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -34,14 +34,13 @@ use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; -use crossbeam_channel::{Sender, RecvError}; -use futures::Future; -use futures::sync::{mpsc, oneshot}; +use crossbeam_channel::RecvError; +use futures::{prelude::*, sync::{mpsc, oneshot}}; use crate::message::Message; use network_libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId}; -use crate::protocol::{ConnectedPeer, Context, FromNetworkMsg, Protocol, ProtocolMsg}; +use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolMsg}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; @@ -120,13 +119,13 @@ pub struct TestLink> { link: NetworkLink, #[cfg(any(test, feature = "test-helpers"))] - network_to_protocol_sender: Sender>, + network_to_protocol_sender: mpsc::UnboundedSender>, } impl> TestLink { fn new( - protocol_sender: Sender>, - _network_to_protocol_sender: Sender>, + protocol_sender: mpsc::UnboundedSender>, + _network_to_protocol_sender: mpsc::UnboundedSender>, network_sender: NetworkChan ) -> TestLink { TestLink { @@ -172,7 +171,7 @@ impl> Link for TestLink { #[cfg(any(test, feature = "test-helpers"))] fn synchronized(&self) { trace!(target: "test_network", "Synchronizing"); - drop(self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize)) + drop(self.network_to_protocol_sender.unbounded_send(FromNetworkMsg::Synchronize)) } } @@ -191,18 +190,29 @@ pub struct Peer> { type MessageFilter = Fn(&NetworkMsg) -> bool; +enum FromNetworkMsg { + /// A peer connected, with debug info. + PeerConnected(PeerId, String), + /// A peer disconnected, with debug info. + PeerDisconnected(PeerId, String), + /// A custom message from another peer. + CustomMessage(PeerId, Message), + /// Synchronization request. + Synchronize, +} + struct ProtocolChannel> { buffered_messages: Mutex>>, - network_to_protocol_sender: Sender>, - client_to_protocol_sender: Sender>, + network_to_protocol_sender: mpsc::UnboundedSender>, + client_to_protocol_sender: mpsc::UnboundedSender>, protocol_to_network_receiver: NetworkPort, } impl> ProtocolChannel { /// Create new buffered network port. pub fn new( - network_to_protocol_sender: Sender>, - client_to_protocol_sender: Sender>, + network_to_protocol_sender: mpsc::UnboundedSender>, + client_to_protocol_sender: mpsc::UnboundedSender>, protocol_to_network_receiver: NetworkPort, ) -> Self { ProtocolChannel { @@ -215,17 +225,17 @@ impl> ProtocolChannel { /// Send message from network to protocol. pub fn send_from_net(&self, message: FromNetworkMsg) { - let _ = self.network_to_protocol_sender.send(message); + let _ = self.network_to_protocol_sender.unbounded_send(message); - let _ = self.network_to_protocol_sender.send(FromNetworkMsg::Synchronize); + let _ = self.network_to_protocol_sender.unbounded_send(FromNetworkMsg::Synchronize); let _ = self.wait_sync(); } /// Send message from client to protocol. pub fn send_from_client(&self, message: ProtocolMsg) { - let _ = self.client_to_protocol_sender.send(message); + let _ = self.client_to_protocol_sender.unbounded_send(message); - let _ = self.client_to_protocol_sender.send(ProtocolMsg::Synchronize); + let _ = self.client_to_protocol_sender.unbounded_send(ProtocolMsg::Synchronize); let _ = self.wait_sync(); } @@ -290,8 +300,8 @@ impl> Peer { peers: Arc>>>, client: Arc, import_queue: Box>, - network_to_protocol_sender: Sender>, - protocol_sender: Sender>, + network_to_protocol_sender: mpsc::UnboundedSender>, + protocol_sender: mpsc::UnboundedSender>, network_sender: NetworkChan, network_port: NetworkPort, data: D, @@ -642,14 +652,14 @@ pub trait TestNetFactory: Sized { let (network_sender, network_port) = network_channel(); let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); - let status_sinks = Arc::new(Mutex::new(Vec::new())); let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let specialization = self::SpecializationFactory::create(); let peers: Arc>>> = Arc::new(Default::default()); - let (protocol_sender, network_to_protocol_sender) = Protocol::new( - status_sinks, + let (network_to_protocol_sender, mut network_to_protocol_rx) = mpsc::unbounded(); + + let (mut protocol, protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), peers.clone(), @@ -662,6 +672,29 @@ pub trait TestNetFactory: Sized { specialization, ).unwrap(); + std::thread::spawn(move || { + tokio::run(futures::future::poll_fn(move || { + while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { + match msg { + Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => + protocol.on_peer_connected(peer_id, debug_msg), + Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => + protocol.on_peer_disconnected(peer_id, debug_msg), + Some(FromNetworkMsg::CustomMessage(peer_id, message)) => + protocol.on_custom_message(peer_id, message), + Some(FromNetworkMsg::Synchronize) => protocol.synchronize(), + None => return Ok(Async::Ready(())) + } + } + + if let Async::Ready(_) = protocol.poll().unwrap() { + return Ok(Async::Ready(())) + } + + Ok(Async::NotReady) + })); + }); + let peer = Arc::new(Peer::new( is_offline, is_major_syncing,