diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index e845b1c8ac..5b1a38f96c 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3740,6 +3740,7 @@ name = "substrate-network" version = "0.1.0" dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index a5442e59b1..8cb63c0464 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -780,9 +780,7 @@ mod tests { let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() - .take_while(|n| { - Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5)) - }) + .take_while(|n| Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) .for_each(move |_| Ok(())) ); @@ -816,7 +814,7 @@ mod tests { let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) .for_each(move |_| { net.lock().send_import_notifications(); - net.lock().sync(); + net.lock().route_fast(); Ok(()) }) .map(|_| ()) diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 9da7a96d9a..54104c67a2 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -84,7 +84,7 @@ extern crate env_logger; extern crate parity_codec_derive; use futures::prelude::*; -use futures::sync::mpsc; +use futures::sync::{self, mpsc}; use client::{ BlockchainEvents, CallExecutor, Client, backend::Backend, error::Error as ClientError, @@ -249,18 +249,18 @@ pub trait Network: Clone { } /// Bridge between NetworkService, gossiping consensus messages and Grandpa -pub struct NetworkBridge, H: ExHashT> { - service: Arc> +pub struct NetworkBridge> { + service: Arc> } -impl, H: ExHashT> NetworkBridge { +impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService - pub fn new(service: Arc>) -> Self { + pub fn new(service: Arc>) -> Self { NetworkBridge { service } } } -impl, H: ExHashT> Clone for NetworkBridge { +impl,> Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { service: Arc::clone(&self.service) @@ -276,10 +276,15 @@ fn commit_topic(set_id: u64) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes()) } -impl, H: ExHashT> Network for NetworkBridge { +impl,> Network for NetworkBridge { type In = mpsc::UnboundedReceiver; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { - self.service.consensus_gossip().write().messages_for(message_topic::(round, set_id)) + let (tx, rx) = sync::oneshot::channel(); + self.service.with_gossip(move |gossip, _| { + let inner_rx = gossip.messages_for(message_topic::(round, set_id)); + let _ = tx.send(inner_rx); + }); + rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") } fn send_message(&self, round: u64, set_id: u64, message: Vec) { @@ -289,16 +294,21 @@ impl, H: ExHashT fn drop_round_messages(&self, round: u64, set_id: u64) { let topic = message_topic::(round, set_id); - self.service.consensus_gossip().write().collect_garbage_for_topic(topic); + self.service.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic)); } fn drop_set_messages(&self, set_id: u64) { let topic = commit_topic::(set_id); - self.service.consensus_gossip().write().collect_garbage_for_topic(topic); + self.service.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic)); } fn commit_messages(&self, set_id: u64) -> Self::In { - self.service.consensus_gossip().write().messages_for(commit_topic::(set_id)) + let (tx, rx) = sync::oneshot::channel(); + self.service.with_gossip(move |gossip, _| { + let inner_rx = gossip.messages_for(commit_topic::(set_id)); + let _ = tx.send(inner_rx); + }); + rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") } fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 036d7e1870..f4a5e4ac20 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -151,10 +151,7 @@ impl MessageRouting { fn drop_messages(&self, topic: Hash) { let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let mut gossip = peer.consensus_gossip().write(); - peer.with_spec(move |_, _| { - gossip.collect_garbage_for_topic(topic); - }); + peer.consensus_gossip_collect_garbage_for(topic); } } @@ -192,10 +189,7 @@ impl Network for MessageRouting { fn messages_for(&self, round: u64, set_id: u64) -> Self::In { let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let mut gossip = peer.consensus_gossip().write(); - let messages = peer.with_spec(move |_, _| { - gossip.messages_for(make_topic(round, set_id)) - }); + let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id)); let messages = messages.map_err( move |_| panic!("Messages for round {} dropped too early", round) @@ -205,9 +199,8 @@ impl Network for MessageRouting { } fn send_message(&self, round: u64, set_id: u64, message: Vec) { - let mut inner = self.inner.lock(); + let inner = self.inner.lock(); inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false); - inner.route_until_complete(); } fn drop_round_messages(&self, round: u64, set_id: u64) { @@ -223,10 +216,7 @@ impl Network for MessageRouting { fn commit_messages(&self, set_id: u64) -> Self::In { let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let mut gossip = peer.consensus_gossip().write(); - let messages = peer.with_spec(move |_, _| { - gossip.messages_for(make_commit_topic(set_id)) - }); + let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id)); let messages = messages.map_err( move |_| panic!("Commit messages for set {} dropped too early", set_id) @@ -236,9 +226,8 @@ impl Network for MessageRouting { } fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { - let mut inner = self.inner.lock(); + let inner = self.inner.lock(); inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, false); - inner.route_until_complete(); } fn announce(&self, _round: u64, _set_id: u64, _block: H256) { @@ -420,7 +409,7 @@ fn run_to_completion(blocks: u64, net: Arc>, peers: &[Keyr .map_err(|_| ()); let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .for_each(move |_| { net.lock().route_fast(); Ok(()) }) .map(|_| ()) .map_err(|_| ()); @@ -506,7 +495,7 @@ fn finalize_3_voters_1_observer() { .map_err(|_| ()); let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .for_each(move |_| { net.lock().route_fast(); Ok(()) }) .map(|_| ()) .map_err(|_| ()); @@ -667,6 +656,7 @@ fn transition_3_voters_twice_1_observer() { .for_each(move |_| { net.lock().send_import_notifications(); net.lock().send_finality_notifications(); + net.lock().route_fast(); Ok(()) }) .map(|_| ()) @@ -776,7 +766,7 @@ fn sync_justifications_on_change_blocks() { // the last peer should get the justification by syncing from other peers assert!(net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none()); while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { - net.lock().sync_steps(100); + net.lock().route_fast(); } } diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 83e583180a..6b88881163 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -154,6 +154,8 @@ pub enum ServiceEvent { protocol: ProtocolId, /// Version of the protocol that was opened. version: u8, + /// Node debug info + debug_info: String, }, /// A custom protocol substream has been closed. @@ -162,6 +164,8 @@ pub enum ServiceEvent { node_index: NodeIndex, /// Protocol that has been closed. protocol: ProtocolId, + /// Node debug info + debug_info: String, }, /// Sustom protocol substreams has been closed. @@ -172,6 +176,8 @@ pub enum ServiceEvent { node_index: NodeIndex, /// Protocols that have been closed. protocols: Vec, + /// Node debug info + debug_info: String, }, /// Receives a message on a custom protocol stream. @@ -348,6 +354,15 @@ impl Service { } } + /// Get debug info for a given peer. + pub fn peer_debug_info(&self, who: NodeIndex) -> String { + if let (Some(peer_id), Some(addr)) = (self.peer_id_of_node(who), self.node_endpoint(who)) { + format!("{:?} through {:?}", peer_id, addr) + } else { + "unknown".to_string() + } + } + /// Returns the `NodeIndex` of a peer, or assigns one if none exists. fn index_of_peer_or_assign(&mut self, peer: PeerId, endpoint: ConnectedPoint) -> NodeIndex { match self.index_by_id.entry(peer) { @@ -385,6 +400,7 @@ impl Service { node_index, protocol: protocol_id, version, + debug_info: self.peer_debug_info(node_index), }))) } Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => { @@ -393,6 +409,7 @@ impl Service { break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { node_index, protocol: protocol_id, + debug_info: self.peer_debug_info(node_index), }))) } Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => { diff --git a/substrate/core/network-libp2p/src/traits.rs b/substrate/core/network-libp2p/src/traits.rs index 562f4e6718..fc57a9aef3 100644 --- a/substrate/core/network-libp2p/src/traits.rs +++ b/substrate/core/network-libp2p/src/traits.rs @@ -96,21 +96,21 @@ impl NetworkConfiguration { } /// The severity of misbehaviour of a peer that is reported. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum Severity<'a> { +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Severity { /// Peer is timing out. Could be bad connectivity of overload of work on either of our sides. Timeout, /// Peer has been notably useless. E.g. unable to answer a request that we might reasonably consider /// it could answer. - Useless(&'a str), + Useless(String), /// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer /// must have taken concrete action in order to behave in such a way which is wantanly invalid. - Bad(&'a str), + Bad(String), } -impl<'a> fmt::Display for Severity<'a> { +impl fmt::Display for Severity { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { + match self { Severity::Timeout => write!(fmt, "Timeout"), Severity::Useless(r) => write!(fmt, "Useless ({})", r), Severity::Bad(r) => write!(fmt, "Bad ({})", r), diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 1a3a230778..0acf42e6e4 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -8,6 +8,7 @@ authors = ["Parity Technologies "] [lib] [dependencies] +crossbeam-channel = "0.3.6" log = "0.4" parking_lot = "0.7.1" error-chain = "0.12" diff --git a/substrate/core/network/src/io.rs b/substrate/core/network/src/io.rs deleted file mode 100644 index 5413d8c74c..0000000000 --- a/substrate/core/network/src/io.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2017-2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use parking_lot::Mutex; -use network_libp2p::{Service, Severity, NodeIndex, PeerId, ProtocolId}; -use std::sync::Arc; - -/// IO interface for the syncing handler. -/// Provides peer connection management and an interface to the blockchain client. -pub trait SyncIo { - /// Report a peer for misbehaviour. - fn report_peer(&mut self, who: NodeIndex, reason: Severity); - /// Send a packet to a peer. - fn send(&mut self, who: NodeIndex, data: Vec); - /// Returns peer identifier string - fn peer_debug_info(&self, who: NodeIndex) -> String { - who.to_string() - } - /// Returns information on p2p session - fn peer_id(&self, who: NodeIndex) -> Option; -} - -/// Wraps the network service. -pub struct NetSyncIo<'s> { - network: &'s Arc>, - protocol: ProtocolId, -} - -impl<'s> NetSyncIo<'s> { - /// Creates a new instance. - pub fn new(network: &'s Arc>, protocol: ProtocolId) -> NetSyncIo<'s> { - NetSyncIo { - network, - protocol, - } - } -} - -impl<'s> SyncIo for NetSyncIo<'s> { - fn report_peer(&mut self, who: NodeIndex, reason: Severity) { - info!("Purposefully dropping {} ; reason: {:?}", who, reason); - match reason { - Severity::Bad(_) => self.network.lock().ban_node(who), - Severity::Useless(_) => self.network.lock().drop_node(who), - Severity::Timeout => self.network.lock().drop_node(who), - } - } - - fn send(&mut self, who: NodeIndex, data: Vec) { - self.network.lock().send_custom_message(who, self.protocol, data) - } - - fn peer_id(&self, who: NodeIndex) -> Option { - let net = self.network.lock(); - net.peer_id_of_node(who).cloned() - } - - fn peer_debug_info(&self, who: NodeIndex) -> String { - let net = self.network.lock(); - if let (Some(peer_id), Some(addr)) = (net.peer_id_of_node(who), net.node_endpoint(who)) { - format!("{:?} through {:?}", peer_id, addr) - } else { - "unknown".to_string() - } - } -} diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index d2868f1a5c..428f4bbd16 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -20,6 +20,8 @@ //! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages. //! Allows attachment of an optional subprotocol for chain-specific requests. +#[macro_use] +extern crate crossbeam_channel; extern crate linked_hash_map; extern crate lru_cache; extern crate parking_lot; @@ -51,7 +53,6 @@ mod service; mod sync; #[macro_use] mod protocol; -mod io; mod chain; mod blocks; mod on_demand; @@ -65,7 +66,7 @@ pub mod specialization; pub mod test; pub use chain::Client as ClientHandle; -pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, SyncProvider, ExHashT}; +pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT}; pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index a9b00dc120..8f54081069 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -16,22 +16,22 @@ //! On-demand requests service. +use codec::Encode; use std::collections::{HashMap, VecDeque}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::{Instant, Duration}; use futures::{Async, Future, Poll}; -use futures::sync::oneshot::{channel, Receiver, Sender}; +use futures::sync::oneshot::{channel, Receiver, Sender as OneShotSender}; use linked_hash_map::LinkedHashMap; use linked_hash_map::Entry; use parking_lot::Mutex; use client::{error::{Error as ClientError, ErrorKind as ClientErrorKind}}; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; -use io::SyncIo; use message; use network_libp2p::{Severity, NodeIndex}; use config::Roles; -use service; +use service::{NetworkChan, NetworkMsg}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Remote request timeout. @@ -51,35 +51,34 @@ pub trait OnDemandService: Send + Sync { fn on_disconnect(&self, peer: NodeIndex); /// Maintain peers requests. - fn maintain_peers(&self, io: &mut SyncIo); + fn maintain_peers(&self); /// When header response is received from remote node. fn on_remote_header_response( &self, - io: &mut SyncIo, peer: NodeIndex, response: message::RemoteHeaderResponse ); /// When read response is received from remote node. - fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse); + fn on_remote_read_response(&self, peer: NodeIndex, response: message::RemoteReadResponse); /// When call response is received from remote node. - fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse); + fn on_remote_call_response(&self, peer: NodeIndex, response: message::RemoteCallResponse); /// When changes response is received from remote node. fn on_remote_changes_response( &self, - io: &mut SyncIo, peer: NodeIndex, response: message::RemoteChangesResponse, Block::Hash> ); } /// On-demand requests service. Dispatches requests to appropriate peers. -pub struct OnDemand> { - core: Mutex>, +pub struct OnDemand { + core: Mutex>, checker: Arc>, + network_sender: Mutex>, } /// On-demand remote call response. @@ -88,8 +87,7 @@ pub struct RemoteResponse { } #[derive(Default)] -struct OnDemandCore> { - service: Weak, +struct OnDemandCore { next_request_id: u64, pending_requests: VecDeque>, active_peers: LinkedHashMap>, @@ -105,10 +103,10 @@ struct Request { } enum RequestData { - RemoteHeader(RemoteHeaderRequest, Sender>), - RemoteRead(RemoteReadRequest, Sender>, ClientError>>), - RemoteCall(RemoteCallRequest, Sender, ClientError>>), - RemoteChanges(RemoteChangesRequest, Sender, u32)>, ClientError>>), + RemoteHeader(RemoteHeaderRequest, OneShotSender>), + RemoteRead(RemoteReadRequest, OneShotSender>, ClientError>>), + RemoteCall(RemoteCallRequest, OneShotSender, ClientError>>), + RemoteChanges(RemoteChangesRequest, OneShotSender, u32)>, ClientError>>), } enum Accept { @@ -132,16 +130,15 @@ impl Future for RemoteResponse { } } -impl OnDemand where - E: service::ExecuteInContext, +impl OnDemand where B::Header: HeaderT, { /// Creates new on-demand service. pub fn new(checker: Arc>) -> Self { OnDemand { checker, + network_sender: Mutex::new(None), core: Mutex::new(OnDemandCore { - service: Weak::new(), next_request_id: 0, pending_requests: VecDeque::new(), active_peers: LinkedHashMap::new(), @@ -152,25 +149,34 @@ impl OnDemand where } /// Sets weak reference to network service. - pub fn set_service_link(&self, service: Weak) { - self.core.lock().service = service; + pub fn set_network_sender(&self, network_sender: NetworkChan) { + self.network_sender.lock().replace(network_sender); + } + + fn send(&self, msg: NetworkMsg) { + let _ = self.network_sender + .lock() + .as_ref() + .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") + .send(msg); } /// Schedule && dispatch all scheduled requests. fn schedule_request(&self, retry_count: Option, data: RequestData, result: R) -> R { let mut core = self.core.lock(); core.insert(retry_count.unwrap_or(RETRY_COUNT), data); - core.dispatch(); + core.dispatch(self); result } /// Try to accept response from given peer. - fn accept_response) -> Accept>(&self, rtype: &str, io: &mut SyncIo, peer: NodeIndex, request_id: u64, try_accept: F) { + fn accept_response) -> Accept>(&self, rtype: &str, peer: NodeIndex, request_id: u64, try_accept: F) { let mut core = self.core.lock(); let request = match core.remove(peer, request_id) { Some(request) => request, None => { - io.report_peer(peer, Severity::Bad(&format!("Invalid remote {} response from peer", rtype))); + let reason = format!("Invalid remote {} response from peer", rtype); + self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); core.remove_peer(peer); return; }, @@ -180,7 +186,8 @@ impl OnDemand where let (retry_count, retry_request_data) = match try_accept(request) { Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { - io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error))); + let reason = format!("Failed to check remote {} response from peer: {}", rtype, error); + self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); core.remove_peer(peer); if retry_count > 0 { @@ -192,7 +199,8 @@ impl OnDemand where } }, Accept::Unexpected(retry_request_data) => { - io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype))); + let reason = format!("Unexpected response to remote {} from peer", rtype); + self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); core.remove_peer(peer); (retry_count, Some(retry_request_data)) @@ -203,13 +211,12 @@ impl OnDemand where core.insert(retry_count, request_data); } - core.dispatch(); + core.dispatch(self); } } -impl OnDemandService for OnDemand where +impl OnDemandService for OnDemand where B: BlockT, - E: service::ExecuteInContext, B::Header: HeaderT, { fn on_connect(&self, peer: NodeIndex, role: Roles, best_number: NumberFor) { @@ -219,31 +226,31 @@ impl OnDemandService for OnDemand where let mut core = self.core.lock(); core.add_peer(peer, best_number); - core.dispatch(); + core.dispatch(self); } fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor) { let mut core = self.core.lock(); core.update_peer(peer, best_number); - core.dispatch(); + core.dispatch(self); } fn on_disconnect(&self, peer: NodeIndex) { let mut core = self.core.lock(); core.remove_peer(peer); - core.dispatch(); + core.dispatch(self); } - fn maintain_peers(&self, io: &mut SyncIo) { + fn maintain_peers(&self) { let mut core = self.core.lock(); for bad_peer in core.maintain_peers() { - io.report_peer(bad_peer, Severity::Timeout); + self.send(NetworkMsg::ReportPeer(bad_peer, Severity::Timeout)); } - core.dispatch(); + core.dispatch(self); } - fn on_remote_header_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteHeaderResponse) { - self.accept_response("header", io, peer, response.id, |request| match request.data { + fn on_remote_header_response(&self, peer: NodeIndex, response: message::RemoteHeaderResponse) { + self.accept_response("header", peer, response.id, |request| match request.data { RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already @@ -256,8 +263,8 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse) { - self.accept_response("read", io, peer, response.id, |request| match request.data { + fn on_remote_read_response(&self, peer: NodeIndex, response: message::RemoteReadResponse) { + self.accept_response("read", peer, response.id, |request| match request.data { RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already @@ -270,8 +277,8 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse) { - self.accept_response("call", io, peer, response.id, |request| match request.data { + fn on_remote_call_response(&self, peer: NodeIndex, response: message::RemoteCallResponse) { + self.accept_response("call", peer, response.id, |request| match request.data { RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already @@ -284,8 +291,8 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_changes_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteChangesResponse, B::Hash>) { - self.accept_response("changes", io, peer, response.id, |request| match request.data { + fn on_remote_changes_response(&self, peer: NodeIndex, response: message::RemoteChangesResponse, B::Hash>) { + self.accept_response("changes", peer, response.id, |request| match request.data { RequestData::RemoteChanges(request, sender) => match self.checker.check_changes_proof( &request, ChangesProof { max_block: response.max, @@ -305,9 +312,8 @@ impl OnDemandService for OnDemand where } } -impl Fetcher for OnDemand where +impl Fetcher for OnDemand where B: BlockT, - E: service::ExecuteInContext, B::Header: HeaderT, { type RemoteHeaderResult = RemoteResponse; @@ -340,9 +346,8 @@ impl Fetcher for OnDemand where } } -impl OnDemandCore where +impl OnDemandCore where B: BlockT, - E: service::ExecuteInContext, B::Header: HeaderT, { pub fn add_peer(&mut self, peer: NodeIndex, best_number: NumberFor) { @@ -407,11 +412,7 @@ impl OnDemandCore where } } - pub fn dispatch(&mut self) { - let service = match self.service.upgrade() { - Some(service) => service, - None => return, - }; + pub fn dispatch(&mut self, on_demand: &OnDemand) { let mut last_peer = self.idle_peers.back().cloned(); let mut unhandled_requests = VecDeque::new(); @@ -457,8 +458,7 @@ impl OnDemandCore where let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - - service.execute_in_context(|ctx| ctx.send_message(peer, request.message())); + on_demand.send(NetworkMsg::Outgoing(peer, request.message().encode())); self.active_peers.insert(peer, request); } @@ -523,30 +523,23 @@ impl RequestData { #[cfg(test)] pub mod tests { - use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; use futures::Future; - use parking_lot::RwLock; use runtime_primitives::traits::NumberFor; use client::{error::{ErrorKind as ClientErrorKind, Result as ClientResult}}; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use config::Roles; use message; - use network_libp2p::NodeIndex; - use service::ExecuteInContext; - use test::TestIo; + use network_libp2p::{NodeIndex, ProtocolId, Severity}; + use service::{network_channel, NetworkPort, NetworkMsg}; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; use test_client::runtime::{changes_trie_config, Block, Header}; pub struct DummyExecutor; struct DummyFetchChecker { ok: bool } - impl ExecuteInContext for DummyExecutor { - fn execute_in_context)>(&self, _closure: F) {} - } - impl FetchChecker for DummyFetchChecker { fn check_header_proof( &self, @@ -582,20 +575,19 @@ pub mod tests { } } - fn dummy(ok: bool) -> (Arc, Arc>) { + fn dummy(ok: bool) -> (Arc, Arc>) { let executor = Arc::new(DummyExecutor); let service = Arc::new(OnDemand::new(Arc::new(DummyFetchChecker { ok }))); - service.set_service_link(Arc::downgrade(&executor)); (executor, service) } - fn total_peers(on_demand: &OnDemand) -> usize { + fn total_peers(on_demand: &OnDemand) -> usize { let core = on_demand.core.lock(); core.idle_peers.len() + core.active_peers.len() } - fn receive_call_response(on_demand: &OnDemand, network: &mut TestIo, peer: NodeIndex, id: message::RequestId) { - on_demand.on_remote_call_response(network, peer, message::RemoteCallResponse { + fn receive_call_response(on_demand: &OnDemand, peer: NodeIndex, id: message::RequestId) { + on_demand.on_remote_call_response(peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], }); @@ -611,6 +603,21 @@ pub mod tests { } } + fn assert_disconnected_peer(network_port: NetworkPort, expected_severity: Severity) { + let mut disconnect_count = 0; + while let Ok(msg) = network_port.receiver().try_recv() { + match msg { + NetworkMsg::ReportPeer(_, severity) => { + if severity == expected_severity { + disconnect_count = disconnect_count + 1; + } + }, + _ => {}, + } + } + assert_eq!(disconnect_count, 1); + } + #[test] fn knows_about_peers_roles() { let (_, on_demand) = dummy(true); @@ -637,9 +644,8 @@ pub mod tests { #[test] fn disconnects_from_timeouted_peer() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); - + let (network_sender, network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); on_demand.on_connect(1, Roles::FULL, 1000); assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); @@ -656,17 +662,17 @@ pub mod tests { assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::>()); on_demand.core.lock().active_peers[&0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; - on_demand.maintain_peers(&mut network); + on_demand.maintain_peers(); assert!(on_demand.core.lock().idle_peers.is_empty()); assert_eq!(vec![1], on_demand.core.lock().active_peers.keys().cloned().collect::>()); - assert!(network.to_disconnect.contains(&0)); + assert_disconnected_peer(network_port, Severity::Timeout); } #[test] fn disconnects_from_peer_on_response_with_wrong_id() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { @@ -676,16 +682,16 @@ pub mod tests { call_data: vec![], retry_count: None, }); - receive_call_response(&*on_demand, &mut network, 0, 1); - assert!(network.to_disconnect.contains(&0)); + receive_call_response(&*on_demand, 0, 1); + assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_incorrect_response() { let (_x, on_demand) = dummy(false); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.remote_call(RemoteCallRequest { block: Default::default(), header: dummy_header(), @@ -695,27 +701,27 @@ pub mod tests { }); on_demand.on_connect(0, Roles::FULL, 1000); - receive_call_response(&*on_demand, &mut network, 0, 0); - assert!(network.to_disconnect.contains(&0)); + receive_call_response(&*on_demand, 0, 0); + assert_disconnected_peer(network_port, Severity::Bad("Failed to check remote call response from peer: Backend error: Test error".to_string())); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_unexpected_response() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); - receive_call_response(&*on_demand, &mut network, 0, 0); - assert!(network.to_disconnect.contains(&0)); + receive_call_response(&*on_demand, 0, 0); + assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); } #[test] fn disconnects_from_peer_on_wrong_response_type() { let (_x, on_demand) = dummy(false); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { @@ -726,11 +732,11 @@ pub mod tests { retry_count: Some(1), }); - on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse { + on_demand.on_remote_read_response(0, message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); - assert!(network.to_disconnect.contains(&0)); + assert_disconnected_peer(network_port, Severity::Bad("Unexpected response to remote read from peer".to_string())); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -740,8 +746,8 @@ pub mod tests { let retry_count = 2; let (_x, on_demand) = dummy(false); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); for i in 0..retry_count+1 { on_demand.on_connect(i, Roles::FULL, 1000); } @@ -767,7 +773,7 @@ pub mod tests { for i in 0..retry_count+1 { let mut current = current.lock(); *current = *current + 1; - receive_call_response(&*on_demand, &mut network, i, i as u64); + receive_call_response(&*on_demand, i, i as u64); } let mut finished_at = finished_at.lock(); @@ -780,8 +786,8 @@ pub mod tests { #[test] fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_call(RemoteCallRequest { @@ -796,15 +802,15 @@ pub mod tests { assert_eq!(result, vec![42]); }); - receive_call_response(&*on_demand, &mut network, 0, 0); + receive_call_response(&*on_demand, 0, 0); thread.join().unwrap(); } #[test] fn receives_remote_read_response() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_read(RemoteReadRequest { @@ -818,7 +824,7 @@ pub mod tests { assert_eq!(result, Some(vec![42])); }); - on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse { + on_demand.on_remote_read_response(0, message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); @@ -828,8 +834,8 @@ pub mod tests { #[test] fn receives_remote_header_response() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_header(RemoteHeaderRequest { @@ -846,7 +852,7 @@ pub mod tests { ); }); - on_demand.on_remote_header_response(&mut network, 0, message::RemoteHeaderResponse { + on_demand.on_remote_header_response(0, message::RemoteHeaderResponse { id: 0, header: Some(Header { parent_hash: Default::default(), @@ -863,8 +869,8 @@ pub mod tests { #[test] fn receives_remote_changes_response() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_changes(RemoteChangesRequest { @@ -881,7 +887,7 @@ pub mod tests { assert_eq!(result, vec![(100, 2)]); }); - on_demand.on_remote_changes_response(&mut network, 0, message::RemoteChangesResponse { + on_demand.on_remote_changes_response(0, message::RemoteChangesResponse { id: 0, max: 1000, proof: vec![vec![2]], @@ -894,8 +900,8 @@ pub mod tests { #[test] fn does_not_sends_request_to_peer_who_has_no_required_block() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let mut network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(1, Roles::FULL, 100); @@ -930,7 +936,7 @@ pub mod tests { assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); - on_demand.on_remote_header_response(&mut network, 1, message::RemoteHeaderResponse { + on_demand.on_remote_header_response(1, message::RemoteHeaderResponse { id: 0, header: Some(dummy_header()), proof: vec![], @@ -946,8 +952,8 @@ pub mod tests { // loop forever after dispatching a request to the last peer, since the // last peer was not updated let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let _network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), @@ -971,8 +977,8 @@ pub mod tests { #[test] fn tries_to_send_all_pending_requests() { let (_x, on_demand) = dummy(true); - let queue = RwLock::new(VecDeque::new()); - let _network = TestIo::new(&queue, None); + let (network_sender, _network_port) = network_channel(ProtocolId::default()); + on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index b2e02ea4e3..5c4386c6f1 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -14,36 +14,37 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::collections::{HashMap, HashSet, BTreeMap}; -use std::cmp; -use std::io::Cursor; -use std::sync::Arc; -use std::time; -use parking_lot::RwLock; -use rustc_hex::ToHex; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, Zero}; -use runtime_primitives::generic::BlockId; -use primitives::storage::StorageKey; +use codec::Encode; +use crossbeam_channel::{self as channel, Receiver, Sender}; use network_libp2p::{NodeIndex, Severity}; -use codec::{Encode, Decode}; +use primitives::storage::StorageKey; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; use consensus::import_queue::ImportQueue; use message::{self, Message}; use message::generic::Message as GenericMessage; use consensus_gossip::ConsensusGossip; +use on_demand::OnDemandService; use specialization::NetworkSpecialization; use sync::{ChainSync, Status as SyncStatus, SyncState}; -use service::{TransactionPool, ExHashT}; +use service::{NetworkChan, NetworkMsg, TransactionPool, ExHashT}; use config::{ProtocolConfig, Roles}; +use rustc_hex::ToHex; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; +use std::thread; +use std::time; +use std::cmp; use chain::Client; use client::light::fetcher::ChangesProof; -use on_demand::OnDemandService; -use io::SyncIo; use error; const REQUEST_TIMEOUT_SEC: u64 = 40; +const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000); +const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000); /// Current protocol version. -pub (crate) const CURRENT_VERSION: u32 = 1; +pub(crate) const CURRENT_VERSION: u32 = 1; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; @@ -54,15 +55,17 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { + network_chan: NetworkChan, + port: Receiver>, config: ProtocolConfig, on_demand: Option>>, genesis_hash: B::Hash, - sync: Arc>>, - specialization: RwLock, - consensus_gossip: RwLock>, + sync: ChainSync, + specialization: S, + consensus_gossip: ConsensusGossip, context_data: ContextData, // Connected peers pending Status message. - handshaking_peers: RwLock>, + handshaking_peers: HashMap, transaction_pool: Arc>, } /// Syncing status and statistics @@ -77,6 +80,7 @@ pub struct ProtocolStatus { } /// Peer information +#[derive(Debug)] struct Peer { /// Protocol version protocol_version: u32, @@ -115,7 +119,7 @@ impl Peer { } /// Info about a peer's known state. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PeerInfo { /// Roles pub roles: Roles, @@ -144,37 +148,45 @@ pub trait Context { /// Protocol context. pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - io: &'a mut SyncIo, - context_data: &'a ContextData, + network_chan: &'a NetworkChan, + context_data: &'a mut ContextData, } impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { - pub(crate) fn new(context_data: &'a ContextData, io: &'a mut SyncIo) -> Self { + pub(crate) fn new( + context_data: &'a mut ContextData, + network_chan: &'a NetworkChan, + ) -> Self { ProtocolContext { - io, + network_chan, context_data, } } /// Send a message to a peer. pub fn send_message(&mut self, who: NodeIndex, message: Message) { - send_message(&self.context_data.peers, self.io, who, message) + send_message( + &mut self.context_data.peers, + &self.network_chan, + who, + message, + ) } /// Point out that a peer has been malign or irresponsible or appeared lazy. pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) { - self.io.report_peer(who, reason); + let _ = self + .network_chan + .send(NetworkMsg::ReportPeer(who, reason)); } /// Get peer info. pub fn peer_info(&self, peer: NodeIndex) -> Option> { - self.context_data.peers.read().get(&peer).map(|p| { - PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } + self.context_data.peers.get(&peer).map(|p| PeerInfo { + roles: p.roles, + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, }) } } @@ -200,81 +212,227 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, /// Data necessary to create a context. pub(crate) struct ContextData { // All connected peers - peers: RwLock>>, + peers: HashMap>, pub chain: Arc>, } +/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. +pub trait SpecTask> { + fn call_box(self: Box, spec: &mut S, context: &mut Context); +} + +impl, F: FnOnce(&mut S, &mut Context)> SpecTask for F { + fn call_box(self: Box, spec: &mut S, context: &mut Context) { + (*self)(spec, context) + } +} + +/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. +pub trait GossipTask { + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context); +} + +impl, &mut Context)> GossipTask for F { + fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut Context) { + (*self)(gossip, context) + } +} + +/// Messages sent to Protocol. +pub enum ProtocolMsg,> { + /// A peer connected, with debug info. + PeerConnected(NodeIndex, String), + /// A peer disconnected, with debug info. + PeerDisconnected(NodeIndex, String), + /// A custom message from another peer. + CustomMessage(NodeIndex, Message), + /// Ask the protocol for its status. + Status(Sender>), + /// Tell protocol to propagate extrinsics. + PropagateExtrinsics, + /// Execute a closure with the chain-specific network specialization. + ExecuteWithSpec(Box + Send + 'static>), + /// Execute a closure with the consensus gossip. + ExecuteWithGossip(Box + Send + 'static>), + /// Incoming gossip consensus message. + GossipConsensusMessage(B::Hash, Vec, bool), + /// Is protocol currently major-syncing? + IsMajorSyncing(Sender), + /// Is protocol currently offline? + IsOffline(Sender), + /// Return a list of peers currently known to protocol. + Peers(Sender)>>), + /// Let protocol know a peer is currenlty clogged. + PeerClogged(NodeIndex, Option>), + /// Tell protocol to maintain sync. + MaintainSync, + /// Tell protocol to restart sync. + RestartSync, + /// Propagate a block to peers. + AnnounceBlock(B::Hash), + /// Tell protocol that a block was imported (sent by the import-queue). + BlockImportedSync(B::Hash, NumberFor), + /// Tell protocol to request justification for a block. + RequestJustification(B::Hash, NumberFor), + /// A block has been imported (sent by the client). + BlockImported(B::Hash, B::Header), + /// A block has been finalized (sent by the client). + BlockFinalized(B::Hash, B::Header), + /// Tell protocol to abort sync (does not stop protocol). + /// Only used in tests. + #[cfg(any(test, feature = "test-helpers"))] + Abort, + /// Tell protocol to abort sync and stop. + Stop, + /// Tell protocol to perform regular maintenance. + Tick, +} + impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new>( + network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, import_queue: Arc, on_demand: Option>>, transaction_pool: Arc>, specialization: S, - ) -> error::Result - where I: ImportQueue - { + ) -> error::Result>> { + let (sender, port) = channel::unbounded(); let info = chain.info()?; let sync = ChainSync::new(config.roles, &info, import_queue); - let protocol = Protocol { - config: config, - context_data: ContextData { - peers: RwLock::new(HashMap::new()), - chain, - }, - on_demand, - genesis_hash: info.chain.genesis_hash, - sync: Arc::new(RwLock::new(sync)), - specialization: RwLock::new(specialization), - consensus_gossip: RwLock::new(ConsensusGossip::new()), - handshaking_peers: RwLock::new(HashMap::new()), - transaction_pool: transaction_pool, - }; - Ok(protocol) - } - - pub(crate) fn context_data(&self) -> &ContextData { - &self.context_data - } - - pub(crate) fn sync(&self) -> &Arc>> { - &self.sync - } - - pub(crate) fn consensus_gossip<'a>(&'a self) -> &'a RwLock> { - &self.consensus_gossip - } - - /// Returns protocol status - pub fn status(&self) -> ProtocolStatus { - let sync = self.sync.read(); - let peers = self.context_data.peers.read(); - ProtocolStatus { - sync: sync.status(), - num_peers: peers.values().count(), - num_active_peers: peers.values().filter(|p| p.block_request.is_some()).count(), - } - } - - pub fn peers(&self) -> Vec<(NodeIndex, PeerInfo)> { - self.context_data.peers.read().iter().map(|(idx, p)| { - ( - *idx, - PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, + let _ = thread::Builder::new() + .name("Protocol".into()) + .spawn(move || { + let mut protocol = Protocol { + network_chan, + port, + config: config, + context_data: ContextData { + peers: HashMap::new(), + chain, + }, + on_demand, + genesis_hash: info.chain.genesis_hash, + sync, + specialization: specialization, + consensus_gossip: ConsensusGossip::new(), + handshaking_peers: HashMap::new(), + transaction_pool: transaction_pool, + }; + let tick_timeout = channel::tick(TICK_TIMEOUT); + let propagate_timeout = channel::tick(PROPAGATE_TIMEOUT); + while protocol.run(&tick_timeout, &propagate_timeout) { + // Running until all senders have been dropped... } - ) - }).collect() + }) + .expect("Protocol thread spawning failed"); + Ok(sender) } - fn handle_response(&self, io: &mut SyncIo, who: NodeIndex, response: &message::BlockResponse) -> Option> { - let mut peers = self.context_data.peers.write(); - let request = if let Some(ref mut peer) = peers.get_mut(&who) { + fn run( + &mut self, + tick_timeout: &Receiver, + propagate_timeout: &Receiver, + ) -> bool { + let msg = select! { + recv(self.port) -> event => { + match event { + Ok(msg) => msg, + // Our sender has been dropped, quit. + Err(_) => { + ProtocolMsg::Stop + }, + } + }, + recv(tick_timeout) -> _ => { + ProtocolMsg::Tick + }, + recv(propagate_timeout) -> _ => { + ProtocolMsg::PropagateExtrinsics + }, + }; + self.handle_msg(msg) + } + + fn handle_msg(&mut self, msg: ProtocolMsg) -> bool { + match msg { + ProtocolMsg::Peers(sender) => { + let peers = self.context_data.peers.iter().map(|(idx, p)| { + ( + *idx, + PeerInfo { + roles: p.roles, + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ) + }).collect(); + let _ = sender.send(peers); + }, + ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), + ProtocolMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), + ProtocolMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), + ProtocolMsg::CustomMessage(who, message) => { + self.on_custom_message(who, message) + }, + ProtocolMsg::Status(sender) => self.status(sender), + ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), + ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), + ProtocolMsg::ExecuteWithSpec(task) => { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + task.call_box(&mut self.specialization, &mut context); + }, + ProtocolMsg::ExecuteWithGossip(task) => { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + task.call_box(&mut self.consensus_gossip, &mut context); + } + ProtocolMsg::GossipConsensusMessage(topic, message, broadcast) => { + self.gossip_consensus_message(topic, message, broadcast) + } + ProtocolMsg::IsMajorSyncing(sender) => { + let is_syncing = self.sync.status().is_major_syncing(); + let _ = sender.send(is_syncing); + } + ProtocolMsg::IsOffline(sender) => { + let is_offline = self.sync.status().is_offline(); + let _ = sender.send(is_offline); + } + ProtocolMsg::MaintainSync => { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.maintain_sync(&mut context); + } + ProtocolMsg::RestartSync => { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.restart(&mut context); + } + ProtocolMsg::AnnounceBlock(hash) => self.announce_block(hash), + ProtocolMsg::BlockImportedSync(hash, number) => self.sync.block_imported(&hash, number), + ProtocolMsg::RequestJustification(hash, number) => { + let mut context = + ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.sync.request_justification(&hash, number, &mut context); + }, + ProtocolMsg::PropagateExtrinsics => self.propagate_extrinsics(), + ProtocolMsg::Tick => self.tick(), + #[cfg(any(test, feature = "test-helpers"))] + ProtocolMsg::Abort => self.abort(), + ProtocolMsg::Stop => { + self.stop(); + return false; + }, + } + true + } + + fn handle_response(&mut self, who: NodeIndex, response: &message::BlockResponse) -> Option> { + let request = if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { match (peer.block_request.take(), peer.justification_request.take()) { (Some(block_request), Some(justification_request)) => { if block_request.id == response.id { @@ -326,104 +484,124 @@ impl, H: ExHashT> Protocol { } }, (None, None) => { - io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); + let _ = self + .network_chan + .send(NetworkMsg::ReportPeer(who, Severity::Bad("Unexpected response packet received from peer".to_string()))); return None; }, } } else { - io.report_peer(who, Severity::Bad("Unexpected packet received from peer")); + let _ = self + .network_chan + .send(NetworkMsg::ReportPeer(who, Severity::Bad("Unexpected packet received from peer".to_string()))); return None; }; Some(request) } - pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) { - let message: Message = match Decode::decode(&mut data) { - Some(m) => m, - None => { - trace!(target: "sync", "Invalid packet from {}", who); - io.report_peer(who, Severity::Bad("Peer sent us a packet with invalid format")); - return; - } + /// Returns protocol status + fn status(&mut self, sender: Sender>) { + let status = ProtocolStatus { + sync: self.sync.status(), + num_peers: self.context_data.peers.values().count(), + num_active_peers: self + .context_data + .peers + .values() + .filter(|p| p.block_request.is_some()) + .count(), }; + let _ = sender.send(status); + } + fn on_custom_message(&mut self, who: NodeIndex, message: Message) { match message { - GenericMessage::Status(s) => self.on_status_message(io, who, s), - GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r), + GenericMessage::Status(s) => self.on_status_message(who, s), + GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { - if let Some(request) = self.handle_response(io, who, &r) { - self.on_block_response(io, who, request, r); + if let Some(request) = self.handle_response(who, &r) { + self.on_block_response(who, request, r); } }, - GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce), - GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m), - GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, who, request), - GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response), - GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(io, who, request), - GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response), - GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request), - GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response), - GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request), - GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response), + GenericMessage::BlockAnnounce(announce) => self.on_block_announce(who, announce), + GenericMessage::Transactions(m) => self.on_extrinsics(who, m), + GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), + GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response), + GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(who, request), + GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(who, response), + GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(who, request), + GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), + GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request), + GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), GenericMessage::Consensus(topic, msg, broadcast) => { - self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg, broadcast); - }, - other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)), + self.consensus_gossip.on_incoming( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + who, + topic, + msg, + broadcast, + ); + } + other => self.specialization.on_message( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + who, + &mut Some(other), + ), } } - pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message) { - send_message::(&self.context_data.peers, io, who, message) + fn send_message(&mut self, who: NodeIndex, message: Message) { + send_message::( + &mut self.context_data.peers, + &self.network_chan, + who, + message, + ); } - pub fn gossip_consensus_message(&self, io: &mut SyncIo, topic: B::Hash, message: Vec, broadcast: bool) { - let gossip = self.consensus_gossip(); - self.with_spec(io, move |_s, context|{ - gossip.write().multicast(context, topic, message, broadcast); - }); + fn gossip_consensus_message(&mut self, topic: B::Hash, message: Vec, broadcast: bool) { + self.consensus_gossip.multicast( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + topic, + message, + broadcast, + ); } /// Called when a new peer is connected - pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) { - trace!(target: "sync", "Connected {}: {}", who, io.peer_debug_info(who)); - self.handshaking_peers.write().insert(who, time::Instant::now()); - self.send_status(io, who); + fn on_peer_connected(&mut self, who: NodeIndex, debug_info: String) { + trace!(target: "sync", "Connecting {}: {}", who, debug_info); + self.handshaking_peers.insert(who, time::Instant::now()); + self.send_status(who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) { - trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer)); - + fn on_peer_disconnected(&mut self, peer: NodeIndex, debug_info: String) { + trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); // lock all the the peer lists so that add/remove peer events are in order - let mut sync = self.sync.write(); - let mut spec = self.specialization.write(); - let removed = { - let mut peers = self.context_data.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - handshaking_peers.remove(&peer); - peers.remove(&peer).is_some() + self.handshaking_peers.remove(&peer); + self.context_data.peers.remove(&peer).is_some() }; if removed { - let mut context = ProtocolContext::new(&self.context_data, io); - self.consensus_gossip.write().peer_disconnected(&mut context, peer); - sync.peer_disconnected(&mut context, peer); - spec.on_disconnect(&mut context, peer); + let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.consensus_gossip.peer_disconnected(&mut context, peer); + self.sync.peer_disconnected(&mut context, peer); + self.specialization.on_disconnect(&mut context, peer); self.on_demand.as_ref().map(|s| s.on_disconnect(peer)); } } /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer<'a>( + pub fn on_clogged_peer( &self, - _io: &mut SyncIo, who: NodeIndex, - clogging_messages: impl ExactSizeIterator + _message: Option>, ) { // We don't do anything but print some diagnostics for now. - if let Some(peer) = self.context_data.peers.read().get(&who) { + if let Some(peer) = self.context_data.peers.get(&who) { debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks, @@ -431,27 +609,15 @@ impl, H: ExHashT> Protocol { } else { debug!(target: "sync", "Peer clogged before being properly connected"); } - - debug!(target: "sync", "{} clogging messages:", clogging_messages.len()); - for msg_bytes in clogging_messages.take(5) { - if let Some(msg) = as Decode>::decode(&mut Cursor::new(msg_bytes)) { - debug!(target: "sync", "{:?}", msg); - } else { - debug!(target: "sync", "{:?}", msg_bytes) - } - } } - fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest) { - trace!(target: "sync", "BlockRequest {} from {} with fields {:?}: from {:?} to {:?} max {:?}", + fn on_block_request(&mut self, peer: NodeIndex, request: message::BlockRequest) { + trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, - request.fields, request.from, request.to, - request.max, - ); - + request.max); let mut blocks = Vec::new(); let mut id = match request.from { message::FromBlock::Hash(h) => BlockId::Hash(h), @@ -460,7 +626,9 @@ impl, H: ExHashT> Protocol { let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize; let get_header = request.fields.contains(message::BlockAttributes::HEADER); let get_body = request.fields.contains(message::BlockAttributes::BODY); - let get_justification = request.fields.contains(message::BlockAttributes::JUSTIFICATION); + let get_justification = request + .fields + .contains(message::BlockAttributes::JUSTIFICATION); while let Some(header) = self.context_data.chain.header(&id).unwrap_or(None) { if blocks.len() >= max { break; @@ -472,7 +640,14 @@ impl, H: ExHashT> Protocol { let block_data = message::generic::BlockData { hash: hash, header: if get_header { Some(header) } else { None }, - body: if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None }, + body: if get_body { + self.context_data + .chain + .body(&BlockId::Hash(hash)) + .unwrap_or(None) + } else { + None + }, receipt: None, message_queue: None, justification, @@ -493,10 +668,15 @@ impl, H: ExHashT> Protocol { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(io, peer, GenericMessage::BlockResponse(response)) + self.send_message(peer, GenericMessage::BlockResponse(response)) } - fn on_block_response(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest, response: message::BlockResponse) { + fn on_block_response( + &mut self, + peer: NodeIndex, + request: message::BlockRequest, + response: message::BlockResponse, + ) { let blocks_range = match ( response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), @@ -511,9 +691,8 @@ impl, H: ExHashT> Protocol { // TODO [andre]: move this logic to the import queue so that // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { - let mut sync = self.sync.write(); - sync.on_block_justification_data( - &mut ProtocolContext::new(&self.context_data, io), + self.sync.on_block_justification_data( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response, @@ -522,34 +701,36 @@ impl, H: ExHashT> Protocol { // import_queue.import_blocks also acquires sync.write(); // Break the cycle by doing these separately from the outside; let new_blocks = { - let mut sync = self.sync.write(); - sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response) + self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response) }; if let Some((origin, new_blocks)) = new_blocks { - let import_queue = self.sync.read().import_queue(); + let import_queue = self.sync.import_queue(); import_queue.import_blocks(origin, new_blocks); } } } /// Perform time based maintenance. - pub fn tick(&self, io: &mut SyncIo) { - self.consensus_gossip.write().collect_garbage(|_| true); - self.maintain_peers(io); - self.sync.write().tick(&mut ProtocolContext::new(&self.context_data, io)); - self.on_demand.as_ref().map(|s| s.maintain_peers(io)); + fn tick(&mut self) { + self.consensus_gossip.collect_garbage(|_| true); + self.maintain_peers(); + self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); + self.on_demand + .as_ref() + .map(|s| s.maintain_peers()); } - fn maintain_peers(&self, io: &mut SyncIo) { + fn maintain_peers(&mut self) { let tick = time::Instant::now(); let mut aborting = Vec::new(); { - let peers = self.context_data.peers.read(); - let handshaking_peers = self.handshaking_peers.read(); - for (who, timestamp) in peers.iter() + for (who, timestamp) in self + .context_data + .peers + .iter() .filter_map(|(id, peer)| peer.min_request_timestamp().map(|r| (id, r))) - .chain(handshaking_peers.iter()) + .chain(self.handshaking_peers.iter()) { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { trace!(target: "sync", "Timeout {}", who); @@ -558,50 +739,75 @@ impl, H: ExHashT> Protocol { } } - self.specialization.write().maintain_peers(&mut ProtocolContext::new(&self.context_data, io)); + self.specialization + .maintain_peers(&mut ProtocolContext::new( + &mut self.context_data, + &self.network_chan, + )); for p in aborting { - io.report_peer(p, Severity::Timeout); + let _ = self + .network_chan + .send(NetworkMsg::ReportPeer(p, Severity::Timeout)); } } #[allow(dead_code)] - pub fn peer_info(&self, peer: NodeIndex) -> Option> { - self.context_data.peers.read().get(&peer).map(|p| { - PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } + fn peer_info(&mut self, peer: NodeIndex) -> Option> { + self.context_data.peers.get(&peer).map(|p| PeerInfo { + roles: p.roles, + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, }) } /// Called by peer to report status - fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status) { + fn on_status_message(&mut self, who: NodeIndex, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); - { - let mut peers = self.context_data.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - if peers.contains_key(&who) { - debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_debug_info(who)); + if self.context_data.peers.contains_key(&who) { + debug!("Unexpected status packet from {}", who); return; } if status.genesis_hash != self.genesis_hash { - io.report_peer(who, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash))); + let reason = format!( + "Peer is on different chain (our genesis: {} theirs: {})", + self.genesis_hash, status.genesis_hash + ); + self.network_chan.send(NetworkMsg::ReportPeer( + who, + Severity::Bad(reason), + )); return; } if status.version != CURRENT_VERSION { - io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version))); + let reason = format!("Peer using unsupported protocol version {}", status.version); + self.network_chan.send(NetworkMsg::ReportPeer( + who, + Severity::Bad(reason), + )); return; } if self.config.roles & Roles::LIGHT == Roles::LIGHT { - let self_best_block = self.context_data.chain.info().ok() + let self_best_block = self + .context_data + .chain + .info() + .ok() .and_then(|info| info.best_queued_number) .unwrap_or_else(|| Zero::zero()); - let blocks_difference = self_best_block.as_().checked_sub(status.best_number.as_()).unwrap_or(0); + let blocks_difference = self_best_block + .as_() + .checked_sub(status.best_number.as_()) + .unwrap_or(0); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { - io.report_peer(who, Severity::Useless("Peer is far behind us and will unable to serve light requests")); + self.network_chan.send(NetworkMsg::ReportPeer( + who, + Severity::Useless( + "Peer is far behind us and will unable to serve light requests" + .to_string(), + ), + )); return; } } @@ -619,28 +825,30 @@ impl, H: ExHashT> Protocol { known_blocks: HashSet::new(), next_request_id: 0, }; - peers.insert(who.clone(), peer); - handshaking_peers.remove(&who); - debug!(target: "sync", "Connected {} {}", who, io.peer_debug_info(who)); + self.context_data.peers.insert(who.clone(), peer); + self.handshaking_peers.remove(&who); + debug!(target: "sync", "Connected {}", who); } - let mut context = ProtocolContext::new(&self.context_data, io); - self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles, status.best_number)); - self.sync.write().new_peer(&mut context, who); - self.consensus_gossip.write().new_peer(&mut context, who, status.roles); - self.specialization.write().on_connect(&mut context, who, status); + let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + self.on_demand + .as_ref() + .map(|s| s.on_connect(who, status.roles, status.best_number)); + self.sync.new_peer(&mut context, who); + self.consensus_gossip + .new_peer(&mut context, who, status.roles); + self.specialization.on_connect(&mut context, who, status); } /// Called when peer sends us new extrinsics - fn on_extrinsics(&self, _io: &mut SyncIo, who: NodeIndex, extrinsics: message::Transactions) { + fn on_extrinsics(&mut self, who: NodeIndex, extrinsics: message::Transactions) { // Accept extrinsics only when fully synced - if self.sync.read().status().state != SyncState::Idle { + if self.sync.status().state != SyncState::Idle { trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); return; } trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); - let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&who) { + if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { if let Some(hash) = self.transaction_pool.import(&t) { peer.known_extrinsics.insert(hash); @@ -652,19 +860,19 @@ impl, H: ExHashT> Protocol { } /// Called when we propagate ready extrinsics to peers. - pub fn propagate_extrinsics(&self, io: &mut SyncIo) { + fn propagate_extrinsics(&mut self) { debug!(target: "sync", "Propagating extrinsics"); // Accept transactions only when fully synced - if self.sync.read().status().state != SyncState::Idle { + if self.sync.status().state != SyncState::Idle { return; } let extrinsics = self.transaction_pool.transactions(); - + // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 + let mut will_send = vec![]; let mut propagated_to = HashMap::new(); - let mut peers = self.context_data.peers.write(); - for (who, ref mut peer) in peers.iter_mut() { + for (who, ref mut peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) @@ -672,16 +880,26 @@ impl, H: ExHashT> Protocol { .unzip(); if !to_send.is_empty() { - let node_id = io.peer_id(*who).map(|id| id.to_base58()); + let (sender, port) = channel::unbounded(); + let _ = self + .network_chan + .send(NetworkMsg::GetPeerId(who.clone(), sender)); + let node_id = port.recv().expect("1. We are running 2. Network should be running too."); if let Some(id) = node_id { for hash in hashes { - propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone()); + propagated_to + .entry(hash) + .or_insert_with(Vec::new) + .push(id.clone()); } } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.send_message(io, *who, GenericMessage::Transactions(to_send)); + will_send.push((who.clone(), to_send)); } } + for (who, to_send) in will_send { + self.send_message(who, GenericMessage::Transactions(to_send)); + } self.transaction_pool.on_broadcasted(propagated_to); } @@ -689,7 +907,7 @@ impl, H: ExHashT> Protocol { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&self, io: &mut SyncIo, hash: B::Hash) { + pub fn announce_block(&mut self, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -701,19 +919,24 @@ impl, H: ExHashT> Protocol { return; } }; - let mut peers = self.context_data.peers.write(); let hash = header.hash(); - for (who, ref mut peer) in peers.iter_mut() { + + // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 + let mut to_send = vec![]; + for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { + to_send.push(who.clone()); + } + for who in to_send { + self.send_message(who, GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() })); } } /// Send Status message - fn send_status(&self, io: &mut SyncIo, who: NodeIndex) { + fn send_status(&mut self, who: NodeIndex) { if let Ok(info) = self.context_data.chain.info() { let status = message::generic::Status { version: CURRENT_VERSION, @@ -721,53 +944,54 @@ impl, H: ExHashT> Protocol { roles: self.config.roles.into(), best_number: info.chain.best_number, best_hash: info.chain.best_hash, - chain_status: self.specialization.read().status(), + chain_status: self.specialization.status(), }; - self.send_message(io, who, GenericMessage::Status(status)) + self.send_message(who, GenericMessage::Status(status)) } } - pub fn abort(&self) { - let mut sync = self.sync.write(); - let mut spec = self.specialization.write(); - let mut peers = self.context_data.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - let mut consensus_gossip = self.consensus_gossip.write(); - sync.clear(); - spec.on_abort(); - peers.clear(); - handshaking_peers.clear(); - consensus_gossip.abort(); + fn abort(&mut self) { + self.sync.clear(); + self.specialization.on_abort(); + self.context_data.peers.clear(); + self.handshaking_peers.clear(); + self.consensus_gossip.abort(); } - pub fn stop(&self) { + fn stop(&mut self) { // stop processing import requests first (without holding a sync lock) - let import_queue = self.sync.read().import_queue(); + let import_queue = self.sync.import_queue(); import_queue.stop(); // and then clear all the sync data self.abort(); } - pub fn on_block_announce(&self, io: &mut SyncIo, who: NodeIndex, announce: message::BlockAnnounce) { + fn on_block_announce(&mut self, who: NodeIndex, announce: message::BlockAnnounce) { let header = announce.header; let hash = header.hash(); { - let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&who) { + if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { peer.known_blocks.insert(hash.clone()); } } - self.on_demand.as_ref().map(|s| s.on_block_announce(who, *header.number())); - self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header); + self.on_demand + .as_ref() + .map(|s| s.on_block_announce(who, *header.number())); + self.sync.on_block_announce( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + who, + hash, + &header, + ); } - pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) { - self.sync.write().update_chain_info(&header); - self.specialization.write().on_block_imported( - &mut ProtocolContext::new(&self.context_data, io), + fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { + self.sync.update_chain_info(&header); + self.specialization.on_block_imported( + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), hash.clone(), - header + header, ); // blocks are not announced by light clients @@ -776,44 +1000,69 @@ impl, H: ExHashT> Protocol { } // send out block announcements - let mut peers = self.context_data.peers.write(); - for (who, ref mut peer) in peers.iter_mut() { + // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 + let mut to_send = vec![]; + for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { - header: header.clone() - })); + to_send.push(who.clone()); } } + for who in to_send { + self.send_message( + who, + GenericMessage::BlockAnnounce(message::BlockAnnounce { + header: header.clone(), + }), + ); + } } - pub fn on_block_finalized(&self, _io: &mut SyncIo, hash: B::Hash, header: &B::Header) { - self.sync.write().block_finalized(&hash, *header.number()); + fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { + self.sync.block_finalized(&hash, *header.number()); } - fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest) { + fn on_remote_call_request( + &mut self, + who: NodeIndex, + request: message::RemoteCallRequest, + ) { trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); - let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) { + let proof = match self.context_data.chain.execution_proof( + &request.block, + &request.method, + &request.data, + ) { Ok((_, proof)) => proof, Err(error) => { trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}", request.id, who, request.method, request.block, error); Default::default() - }, + } }; - self.send_message(io, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { - id: request.id, proof, - })); + self.send_message( + who, + GenericMessage::RemoteCallResponse(message::RemoteCallResponse { + id: request.id, + proof, + }), + ); } - fn on_remote_call_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteCallResponse) { + fn on_remote_call_response(&mut self, who: NodeIndex, response: message::RemoteCallResponse) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, who, response)); + self.on_demand + .as_ref() + .map(|s| s.on_remote_call_response(who, response)); } - fn on_remote_read_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteReadRequest) { + fn on_remote_read_request( + &mut self, + who: NodeIndex, + request: message::RemoteReadRequest, + ) { trace!(target: "sync", "Remote read request {} from {} ({} at {})", request.id, who, request.key.to_hex::(), request.block); let proof = match self.context_data.chain.read_proof(&request.block, &request.key) { @@ -822,18 +1071,28 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}", request.id, who, request.key.to_hex::(), request.block, error); Default::default() - }, + } }; - self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { - id: request.id, proof, - })); + self.send_message( + who, + GenericMessage::RemoteReadResponse(message::RemoteReadResponse { + id: request.id, + proof, + }), + ); } - fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) { + fn on_remote_read_response(&mut self, who: NodeIndex, response: message::RemoteReadResponse) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response)); + self.on_demand + .as_ref() + .map(|s| s.on_remote_read_response(who, response)); } - fn on_remote_header_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteHeaderRequest>) { + fn on_remote_header_request( + &mut self, + who: NodeIndex, + request: message::RemoteHeaderRequest>, + ) { trace!(target: "sync", "Remote header proof request {} from {} ({})", request.id, who, request.block); let (header, proof) = match self.context_data.chain.header_proof(request.block) { @@ -842,19 +1101,34 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}", request.id, who, request.block, error); (Default::default(), Default::default()) - }, + } }; - self.send_message(io, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { - id: request.id, header, proof, - })); + self.send_message( + who, + GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { + id: request.id, + header, + proof, + }), + ); } - fn on_remote_header_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteHeaderResponse) { + fn on_remote_header_response( + &mut self, + who: NodeIndex, + response: message::RemoteHeaderResponse, + ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response)); + self.on_demand + .as_ref() + .map(|s| s.on_remote_header_response(who, response)); } - fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest) { + fn on_remote_changes_request( + &mut self, + who: NodeIndex, + request: message::RemoteChangesRequest, + ) { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})", request.id, who, request.key.to_hex::(), request.first, request.last); let key = StorageKey(request.key); @@ -869,36 +1143,41 @@ impl, H: ExHashT> Protocol { roots: BTreeMap::new(), roots_proof: vec![], } - }, + } }; - self.send_message(io, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { - id: request.id, - max: proof.max_block, - proof: proof.proof, - roots: proof.roots.into_iter().collect(), - roots_proof: proof.roots_proof, - })); + self.send_message( + who, + GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { + id: request.id, + max: proof.max_block, + proof: proof.proof, + roots: proof.roots.into_iter().collect(), + roots_proof: proof.roots_proof, + }), + ); } - fn on_remote_changes_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteChangesResponse, B::Hash>) { + fn on_remote_changes_response( + &mut self, + who: NodeIndex, + response: message::RemoteChangesResponse, B::Hash>, + ) { trace!(target: "sync", "Remote changes proof response {} from {} (max={})", response.id, who, response.max); - self.on_demand.as_ref().map(|s| s.on_remote_changes_response(io, who, response)); - } - - - /// Execute a closure with access to a network context and specialization. - pub fn with_spec(&self, io: &mut SyncIo, f: F) -> U - where F: FnOnce(&mut S, &mut Context) -> U - { - f(&mut* self.specialization.write(), &mut ProtocolContext::new(&self.context_data, io)) + self.on_demand + .as_ref() + .map(|s| s.on_remote_changes_response(who, response)); } } -fn send_message(peers: &RwLock>>, io: &mut SyncIo, who: NodeIndex, mut message: Message) { +fn send_message( + peers: &mut HashMap>, + network_chan: &NetworkChan, + who: NodeIndex, + mut message: Message, +) { match message { GenericMessage::BlockRequest(ref mut r) => { - let mut peers = peers.write(); if let Some(ref mut peer) = peers.get_mut(&who) { r.id = peer.next_request_id; peer.next_request_id = peer.next_request_id + 1; @@ -911,10 +1190,10 @@ fn send_message(peers: &RwLock (), } - io.send(who, message.encode()); + network_chan.send(NetworkMsg::Outgoing(who, message.encode())); } /// Construct a simple protocol that is composed of several sub protocols. diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 7e74c19ff5..6d4f0131dd 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -17,30 +17,27 @@ use std::collections::HashMap; use std::sync::Arc; use std::{io, thread}; -use std::time::Duration; -use futures::{self, Future, Stream, stream, sync::oneshot}; -use parking_lot::{Mutex, RwLock}; +use futures::{Async, Future, Stream, stream, sync::oneshot}; +use parking_lot::Mutex; use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, NodeIndex, ErrorKind, Severity}; -use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; -use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol}; -use io::NetSyncIo; +use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; +use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol}; use consensus::import_queue::{ImportQueue, Link}; use consensus_gossip::ConsensusGossip; -use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus, PeerInfo}; +use protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo}; +use codec::Decode; use config::Params; +use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use error::Error; -use specialization::NetworkSpecialization; use runtime_primitives::traits::{Block as BlockT, NumberFor}; -use sync::ChainSync; -use std::sync::Weak; -use tokio::{runtime::Runtime, timer::Interval}; +use specialization::NetworkSpecialization; + +use tokio::prelude::task::AtomicTask; +use tokio::runtime::Runtime; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; -const TICK_TIMEOUT: Duration = Duration::from_millis(1000); -const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000); - /// Sync status pub trait SyncProvider: Send + Sync { /// Get sync status @@ -50,8 +47,14 @@ pub trait SyncProvider: Send + Sync { } /// Minimum Requirements for a Hash within Networking -pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} -impl ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} +pub trait ExHashT: + ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static +{ +} +impl ExHashT for T where + T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static +{ +} /// Transaction pool interface pub trait TransactionPool: Send + Sync { @@ -63,114 +66,98 @@ pub trait TransactionPool: Send + Sync { fn on_broadcasted(&self, propagations: HashMap>); } -/// Service able to execute closure in the network context. -pub trait ExecuteInContext: Send + Sync { - /// Execute closure in network context. - fn execute_in_context)>(&self, closure: F); -} - /// A link implementation that connects to the network. -pub struct NetworkLink> { - /// The chain-sync handle - pub(crate) sync: Weak>>, - /// Network context. - pub(crate) context: Weak, +pub struct NetworkLink> { + /// The protocol sender + pub(crate) protocol_sender: Sender>, + /// The network sender + pub(crate) network_sender: NetworkChan, } -impl> NetworkLink { - /// Execute closure with locked ChainSync. - fn with_sync, &mut Context)>(&self, closure: F) { - if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) { - service.execute_in_context(move |protocol| { - let mut sync = sync.write(); - closure(&mut *sync, protocol) - }); - } - } -} - -impl> Link for NetworkLink { +impl> Link for NetworkLink { fn block_imported(&self, hash: &B::Hash, number: NumberFor) { - self.with_sync(|sync, _| sync.block_imported(&hash, number)) + let _ = self.protocol_sender.send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); } fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - self.with_sync(|sync, protocol| sync.request_justification(hash, number, protocol)) + let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } fn maintain_sync(&self) { - self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) + let _ = self.protocol_sender.send(ProtocolMsg::MaintainSync); } fn useless_peer(&self, who: NodeIndex, reason: &str) { trace!(target:"sync", "Useless peer {}, {}", who, reason); - self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) + self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); } fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { trace!(target:"sync", "Bad peer {}, {}", who, reason); - self.with_sync(|sync, protocol| { - protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? - sync.restart(protocol); - }) + // is this actually malign or just useless? + self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); + let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); } fn restart(&self) { - self.with_sync(|sync, protocol| sync.restart(protocol)) + let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); } } /// Substrate network service. Handles network IO and manages connectivity. -pub struct Service, H: ExHashT> { +pub struct Service> { /// Network service network: Arc>, - /// Protocol handler - handler: Arc>, - /// Protocol ID. - protocol_id: ProtocolId, + /// Protocol sender + protocol_sender: Sender>, /// Sender for messages to the background service task, and handle for the background thread. /// Dropping the sender should close the task and the thread. /// This is an `Option` because we need to extract it in the destructor. bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, } -impl, H: ExHashT> Service { +impl> Service { /// Creates and register protocol with the network service - pub fn new>( + pub fn new, H: ExHashT>( params: Params, protocol_id: ProtocolId, import_queue: Arc, - ) -> Result>, Error> - where I: ImportQueue - { - let handler = Arc::new(Protocol::new( + ) -> Result<(Arc>, NetworkChan), Error> { + let (network_chan, network_port) = network_channel(protocol_id); + let protocol_sender = Protocol::new( + network_chan.clone(), params.config, params.chain, import_queue.clone(), params.on_demand, params.transaction_pool, params.specialization, - )?); + )?; let versions = [(protocol::CURRENT_VERSION as u8)]; let registered = RegisteredProtocol::new(protocol_id, &versions[..]); - let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?; + let (thread, network) = start_thread( + protocol_sender.clone(), + network_port, + network_chan.clone(), + params.network_config, + registered, + )?; let service = Arc::new(Service { network, - protocol_id, - handler, - bg_thread: Some(thread) + protocol_sender: protocol_sender.clone(), + bg_thread: Some(thread), }); // connect the import-queue to the network service. let link = NetworkLink { - sync: Arc::downgrade(service.handler.sync()), - context: Arc::downgrade(&service), + protocol_sender, + network_sender: network_chan.clone(), }; import_queue.start(link)?; - Ok(service) + Ok((service, network_chan)) } /// Returns the downloaded bytes per second averaged over the past few seconds. @@ -186,18 +173,22 @@ impl, H: ExHashT> Service, H: ExHashT> Service, broadcast: bool) { - self.handler.gossip_consensus_message( - &mut NetSyncIo::new(&self.network, self.protocol_id), - topic, - message, - broadcast, - ) - } - /// Execute a closure with the chain-specific network specialization. - pub fn with_spec(&self, f: F) -> U - where F: FnOnce(&mut S, &mut Context) -> U - { - self.handler.with_spec(&mut NetSyncIo::new(&self.network, self.protocol_id), f) + let _ = self + .protocol_sender + .send(ProtocolMsg::GossipConsensusMessage( + topic, message, broadcast, + )); } - /// access the underlying consensus gossip handler - pub fn consensus_gossip<'a>(&'a self) -> &'a RwLock> { - self.handler.consensus_gossip() + /// Execute a closure with the chain-specific network specialization. + pub fn with_spec(&self, f: F) + where F: FnOnce(&mut S, &mut Context) + Send + 'static + { + let _ = self + .protocol_sender + .send(ProtocolMsg::ExecuteWithSpec(Box::new(f))); + } + + /// Execute a closure with the consensus gossip. + pub fn with_gossip(&self, f: F) + where F: FnOnce(&mut ConsensusGossip, &mut Context) + Send + 'static + { + let _ = self + .protocol_sender + .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } } -impl, H: ExHashT> ::consensus::SyncOracle for Service { +impl> ::consensus::SyncOracle for Service { fn is_major_syncing(&self) -> bool { - self.handler.sync().read().status().is_major_syncing() + let (sender, port) = channel::unbounded(); + let _ = self + .protocol_sender + .send(ProtocolMsg::IsMajorSyncing(sender)); + port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, + or the ProtocolMsg::Stop message is received, + 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") } fn is_offline(&self) -> bool { - self.handler.sync().read().status().is_offline() + let (sender, port) = channel::unbounded(); + let _ = self + .protocol_sender + .send(ProtocolMsg::IsOffline(sender)); + port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, + or the ProtocolMsg::Stop message is received, + 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") } } -impl, H:ExHashT> Drop for Service { +impl> Drop for Service { fn drop(&mut self) { - self.handler.stop(); if let Some((sender, join)) = self.bg_thread.take() { let _ = sender.send(()); if let Err(e) = join.join() { @@ -251,20 +259,22 @@ impl, H:ExHashT> Drop for Servi } } -impl, H: ExHashT> ExecuteInContext for Service { - fn execute_in_context)>(&self, closure: F) { - closure(&mut ProtocolContext::new(self.handler.context_data(), &mut NetSyncIo::new(&self.network, self.protocol_id))) - } -} - -impl, H: ExHashT> SyncProvider for Service { +impl> SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { - self.handler.status() + let (sender, port) = channel::unbounded(); + let _ = self.protocol_sender.send(ProtocolMsg::Status(sender)); + port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, + or the ProtocolMsg::Stop message is received, + 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") } fn peers(&self) -> Vec<(NodeIndex, Option, PeerInfo)> { - let peers = self.handler.peers(); + let (sender, port) = channel::unbounded(); + let _ = self.protocol_sender.send(ProtocolMsg::Peers(sender)); + let peers = port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, + or the ProtocolMsg::Stop message is received, + 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent."); let network = self.network.lock(); peers.into_iter().map(|(idx, info)| { (idx, network.peer_id_of_node(idx).map(|p| p.clone()), info) @@ -273,7 +283,7 @@ impl, H: ExHashT> SyncProvider< } /// Trait for managing network -pub trait ManageNetwork: Send + Sync { +pub trait ManageNetwork { /// Set to allow unreserved peers to connect fn accept_unreserved_peers(&self); /// Set to deny unreserved peers to connect @@ -286,7 +296,7 @@ pub trait ManageNetwork: Send + Sync { fn node_id(&self) -> Option; } -impl, H: ExHashT> ManageNetwork for Service { +impl> ManageNetwork for Service { fn accept_unreserved_peers(&self) { self.network.lock().accept_unreserved_peers(); } @@ -319,10 +329,102 @@ impl, H: ExHashT> ManageNetwork } } + +/// Create a NetworkPort/Chan pair. +pub fn network_channel(protocol_id: ProtocolId) -> (NetworkChan, NetworkPort) { + let (network_sender, network_receiver) = channel::unbounded(); + let task_notify = Arc::new(AtomicTask::new()); + let network_port = NetworkPort::new(network_receiver, protocol_id, task_notify.clone()); + let network_chan = NetworkChan::new(network_sender, task_notify); + (network_chan, network_port) +} + + +/// A sender of NetworkMsg that notifies a task when a message has been sent. +#[derive(Clone)] +pub struct NetworkChan { + sender: Sender, + task_notify: Arc, +} + +impl NetworkChan { + /// Create a new network chan. + pub fn new(sender: Sender, task_notify: Arc) -> Self { + NetworkChan { + sender, + task_notify, + } + } + + /// Send a messaging, to be handled on a stream. Notify the task handling the stream. + pub fn send(&self, msg: NetworkMsg) { + let _ = self.sender.send(msg); + self.task_notify.notify(); + } +} + +impl Drop for NetworkChan { + /// Notifying the task when a sender is dropped(when all are dropped, the stream is finished). + fn drop(&mut self) { + self.task_notify.notify(); + } +} + + +/// A receiver of NetworkMsg that makes the protocol-id available with each message. +pub struct NetworkPort { + receiver: Receiver, + protocol_id: ProtocolId, + task_notify: Arc, +} + +impl NetworkPort { + /// Create a new network port for a given protocol-id. + pub fn new(receiver: Receiver, protocol_id: ProtocolId, task_notify: Arc) -> Self { + Self { + receiver, + protocol_id, + task_notify, + } + } + + /// Receive a message, if any is currently-enqueued. + /// Register the current tokio task for notification when a new message is available. + pub fn take_one_message(&self) -> Result, ()> { + self.task_notify.register(); + match self.receiver.try_recv() { + Ok(msg) => Ok(Some((self.protocol_id.clone(), msg))), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(()), + } + } + + /// Get a reference to the underlying crossbeam receiver. + #[cfg(any(test, feature = "test-helpers"))] + pub fn receiver(&self) -> &Receiver { + &self.receiver + } +} + +/// Messages to be handled by NetworkService. +#[derive(Debug)] +pub enum NetworkMsg { + /// Ask network to convert a list of nodes, to a list of peers. + PeerIds(Vec, Sender)>>), + /// Send an outgoing custom message. + Outgoing(NodeIndex, Vec), + /// Report a peer. + ReportPeer(NodeIndex, Severity), + /// Get a peer id. + GetPeerId(NodeIndex, Sender>), +} + /// Starts the background thread that handles the networking. -fn start_thread, H: ExHashT>( +fn start_thread>( + protocol_sender: Sender>, + network_port: NetworkPort, + network_sender: NetworkChan, config: NetworkConfiguration, - protocol: Arc>, registered: RegisteredProtocol, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>), Error> { let protocol_id = registered.id(); @@ -344,7 +446,7 @@ fn start_thread, H: ExHashT>( let service_clone = service.clone(); let mut runtime = Runtime::new()?; let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread(service_clone, protocol, protocol_id) + let fut = run_thread(protocol_sender, service_clone, network_sender, network_port, protocol_id) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); @@ -361,82 +463,111 @@ fn start_thread, H: ExHashT>( } /// Runs the background thread that handles the networking. -fn run_thread, H: ExHashT>( +fn run_thread>( + protocol_sender: Sender>, network_service: Arc>, - protocol: Arc>, + network_sender: NetworkChan, + network_port: NetworkPort, protocol_id: ProtocolId, ) -> impl Future { - // Interval for performing maintenance on the protocol handler. - let tick = Interval::new_interval(TICK_TIMEOUT) - .for_each({ - let protocol = protocol.clone(); - let network_service = network_service.clone(); - move |_| { - protocol.tick(&mut NetSyncIo::new(&network_service, protocol_id)); - Ok(()) - } - }) - .then(|res| { - match res { - Ok(()) => (), - Err(err) => error!("Error in the propagation timer: {:?}", err), - }; - Ok(()) - }); - // Interval at which we gossip extrinsics over the network. - let propagate = Interval::new_interval(PROPAGATE_TIMEOUT) - .for_each({ - let protocol = protocol.clone(); - let network_service = network_service.clone(); - move |_| { - protocol.propagate_extrinsics(&mut NetSyncIo::new(&network_service, protocol_id)); - Ok(()) + let network_service_2 = network_service.clone(); + + // Protocol produces a stream of messages about what happens in sync. + let protocol = stream::poll_fn(move || { + match network_port.take_one_message() { + Ok(Some(message)) => Ok(Async::Ready(Some(message))), + Ok(None) => Ok(Async::NotReady), + Err(_) => Err(()) + } + }).for_each(move |(protocol_id, msg)| { + // Handle message from Protocol. + match msg { + NetworkMsg::PeerIds(node_idxs, sender) => { + let reply = node_idxs.into_iter().map(|idx| { + (idx, network_service_2.lock().peer_id_of_node(idx).map(|p| p.clone())) + }).collect::>(); + let _ = sender.send(reply); } - }) - .then(|res| { - match res { - Ok(()) => (), - Err(err) => error!("Error in the propagation timer: {:?}", err), - }; - Ok(()) - }); + NetworkMsg::Outgoing(who, outgoing_message) => { + network_service_2 + .lock() + .send_custom_message(who, protocol_id, outgoing_message); + }, + NetworkMsg::ReportPeer(who, severity) => { + match severity { + Severity::Bad(_) => network_service_2.lock().ban_node(who), + Severity::Useless(_) => network_service_2.lock().drop_node(who), + Severity::Timeout => network_service_2.lock().drop_node(who), + } + }, + NetworkMsg::GetPeerId(who, sender) => { + let node_id = network_service_2 + .lock() + .peer_id_of_node(who) + .cloned() + .map(|id| id.to_base58()); + let _ = sender.send(node_id); + }, + } + Ok(()) + }) + .then(|res| { + match res { + Ok(()) => (), + Err(_) => error!("Protocol disconnected"), + }; + Ok(()) + }); // The network service produces events about what happens on the network. Let's process them. - let network_service2 = network_service.clone(); - let network = stream::poll_fn(move || network_service2.lock().poll()).for_each(move |event| { - let mut net_sync = NetSyncIo::new(&network_service, protocol_id); - + let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| { match event { - NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols } => { + NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols, debug_info } => { if !protocols.is_empty() { debug_assert_eq!(protocols, &[protocol_id]); - protocol.on_peer_disconnected(&mut net_sync, node_index); + let _ = protocol_sender.send( + ProtocolMsg::PeerDisconnected(node_index, debug_info)); } } - NetworkServiceEvent::OpenedCustomProtocol { node_index, version, .. } => { + NetworkServiceEvent::OpenedCustomProtocol { node_index, version, debug_info, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); - protocol.on_peer_connected(&mut net_sync, node_index); + let _ = protocol_sender.send(ProtocolMsg::PeerConnected(node_index, debug_info)); } - NetworkServiceEvent::ClosedCustomProtocol { node_index, .. } => { - protocol.on_peer_disconnected(&mut net_sync, node_index); + NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => { + let _ = protocol_sender.send(ProtocolMsg::PeerDisconnected(node_index, debug_info)); } NetworkServiceEvent::CustomMessage { node_index, data, .. } => { - protocol.handle_packet(&mut net_sync, node_index, &data); + if let Some(m) = Decode::decode(&mut (&data as &[u8])) { + let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, m)); + return Ok(()) + } + let _ = network_sender.send( + NetworkMsg::ReportPeer( + node_index, + Severity::Bad("Peer sent us a packet with invalid format".to_string()) + ) + ); } NetworkServiceEvent::Clogged { node_index, messages, .. } => { - protocol.on_clogged_peer(&mut net_sync, node_index, - messages.iter().map(|d| d.as_ref())); + debug!(target: "sync", "{} clogging messages:", messages.len()); + for msg_bytes in messages.iter().take(5) { + if let Some(msg) = Decode::decode(&mut (&msg_bytes as &[u8])) { + debug!(target: "sync", "{:?}", msg); + let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg))); + } else { + debug!(target: "sync", "{:?}", msg_bytes); + let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, None)); + } + } } }; - Ok(()) }); // Merge all futures into one. let futures: Vec + Send>> = vec![ - Box::new(tick) as Box<_>, - Box::new(propagate) as Box<_>, + Box::new(protocol) as Box<_>, Box::new(network) as Box<_> ]; diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index b9fd2d289c..9b2719b38e 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -25,7 +25,7 @@ use consensus::import_queue::{ImportQueue, IncomingBlock}; use client::error::Error as ClientError; use blocks::BlockCollection; use runtime_primitives::Justification; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; use config::Roles; @@ -211,7 +211,7 @@ impl PendingJustifications { } else { protocol.report_peer( who, - Severity::Bad(&format!("Invalid justification provided for #{}", request.0)), + Severity::Bad(format!("Invalid justification provided for #{}", request.0)), ); } } else { @@ -332,13 +332,16 @@ impl ChainSync { match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); - protocol.report_peer(who, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e))); + let reason = format!("Error legimimately reading blockchain status: {:?}", e); + protocol.report_peer(who, Severity::Useless(reason)); }, (Ok(BlockStatus::KnownBad), _) => { - protocol.report_peer(who, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number))); + let reason = format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number); + protocol.report_peer(who, Severity::Bad(reason)); }, - (Ok(BlockStatus::Unknown), b) if b.is_zero() => { - protocol.report_peer(who, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number))); + (Ok(BlockStatus::Unknown), b) if b == As::sa(0) => { + let reason = format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); + protocol.report_peer(who, Severity::Bad(reason)); }, (Ok(BlockStatus::Unknown), _) if self.import_queue.status().importing_count > MAJOR_SYNC_BLOCKS => { // when actively syncing the common point moves too fast. @@ -457,18 +460,19 @@ impl ChainSync { }, Ok(_) => { // genesis mismatch trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); - protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer")); + protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string())); return None; }, Err(e) => { - protocol.report_peer(who, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e))); + let reason = format!("Error answering legitimate blockchain query: {:?}", e); + protocol.report_peer(who, Severity::Useless(reason)); return None; } } }, None => { trace!(target:"sync", "Invalid response when searching for ancestor from {}", who); - protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor")); + protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor".to_string())); return None; } } @@ -517,7 +521,7 @@ impl ChainSync { response.hash, ); - protocol.report_peer(who, Severity::Bad(&msg)); + protocol.report_peer(who, Severity::Bad(msg)); return; } @@ -534,7 +538,7 @@ impl ChainSync { hash, ); - protocol.report_peer(who, Severity::Useless(&msg)); + protocol.report_peer(who, Severity::Useless(msg)); return; }, } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index e0887b0df0..941fd3b5b1 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -16,37 +16,41 @@ #![allow(missing_docs)] -#[cfg(test)] -mod sync; #[cfg(test)] mod block_import; +#[cfg(test)] +mod sync; -use std::collections::{VecDeque, HashSet, HashMap}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::thread; +use std::time::Duration; -use parking_lot::RwLock; use client; use client::block_builder::BlockBuilder; -use primitives::{H256, Ed25519AuthorityId}; -use runtime_primitives::Justification; -use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor, Zero}; -use io::SyncIo; -use protocol::{Context, Protocol, ProtocolContext}; +use codec::{Decode, Encode}; use config::ProtocolConfig; -use service::{NetworkLink, TransactionPool}; -use network_libp2p::{NodeIndex, PeerId, Severity}; -use keyring::Keyring; -use codec::Encode; -use consensus::{BlockOrigin, ImportBlock, JustificationImport, ForkChoiceStrategy, Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock}; use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier}; +use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; +use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; +use consensus_gossip::{ConsensusGossip, ConsensusMessage}; +use crossbeam_channel::{self as channel, Sender}; +use futures::Future; +use futures::sync::{mpsc, oneshot}; +use keyring::Keyring; +use network_libp2p::{NodeIndex, ProtocolId, Severity}; +use parking_lot::Mutex; +use primitives::{H256, Ed25519AuthorityId}; +use protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus}; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, Zero, NumberFor}; +use runtime_primitives::Justification; +use service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, TransactionPool}; use specialization::NetworkSpecialization; -use consensus_gossip::ConsensusGossip; -use service::ExecuteInContext; use test_client; -pub use test_client::runtime::{Block, Hash, Transfer, Extrinsic}; +pub use test_client::runtime::{Block, Extrinsic, Hash, Transfer}; pub use test_client::TestClient; #[cfg(any(test, feature = "test-helpers"))] @@ -61,7 +65,7 @@ impl ImportCB { ImportCB(RefCell::new(None)) } fn set(&self, cb: Box) - where F: 'static + Fn(BlockOrigin, Vec>) -> bool + where F: 'static + Fn(BlockOrigin, Vec>) -> bool, { *self.0.borrow_mut() = Some(cb); } @@ -168,7 +172,7 @@ impl> ImportQueue for SyncImpor &link, None, (origin, new_blocks), - verifier, + verifier ) })); Ok(()) @@ -204,23 +208,13 @@ impl> ImportQueue for SyncImpor } } -struct DummyContextExecutor(Arc>, Arc>>); -unsafe impl Send for DummyContextExecutor {} -unsafe impl Sync for DummyContextExecutor {} - -impl ExecuteInContext for DummyContextExecutor { - fn execute_in_context)>(&self, closure: F) { - let mut io = TestIo::new(&self.1, None); - let mut context = ProtocolContext::new(&self.0.context_data(), &mut io); - closure(&mut context); - } -} - /// The test specialization. pub struct DummySpecialization { } impl NetworkSpecialization for DummySpecialization { - fn status(&self) -> Vec { vec![] } + fn status(&self) -> Vec { + vec![] + } fn on_connect(&mut self, _ctx: &mut Context, _peer_id: NodeIndex, _status: ::message::Status) { } @@ -232,184 +226,198 @@ impl NetworkSpecialization for DummySpecialization { &mut self, _ctx: &mut Context, _peer_id: NodeIndex, - _message: &mut Option<::message::Message> + _message: &mut Option<::message::Message>, ) { } } -pub struct TestIo<'p> { - queue: &'p RwLock>, - pub to_disconnect: HashSet, - packets: Vec, - _sender: Option, -} - -impl<'p> TestIo<'p> where { - pub fn new(queue: &'p RwLock>, sender: Option) -> TestIo<'p> { - TestIo { - queue: queue, - _sender: sender, - to_disconnect: HashSet::new(), - packets: Vec::new(), - } - } -} - -impl<'p> Drop for TestIo<'p> { - fn drop(&mut self) { - self.queue.write().extend(self.packets.drain(..)); - } -} - -impl<'p> SyncIo for TestIo<'p> { - fn report_peer(&mut self, who: NodeIndex, _reason: Severity) { - self.to_disconnect.insert(who); - } - - fn send(&mut self, who: NodeIndex, data: Vec) { - self.packets.push(TestPacket { - data: data, - recipient: who, - }); - } - - fn peer_debug_info(&self, _who: NodeIndex) -> String { - "unknown".to_string() - } - - fn peer_id(&self, _peer_id: NodeIndex) -> Option { - None - } -} - -/// Mocked subprotocol packet -pub struct TestPacket { - data: Vec, - recipient: NodeIndex, -} - pub type PeersClient = client::Client; -pub struct Peer, D> { +pub struct Peer, D> { client: Arc, - pub sync: Arc>, - pub queue: Arc>>, + pub protocol_sender: Sender>, + network_port: Mutex, import_queue: Arc>, - executor: Arc, - /// Some custom data set up at initialization time. + network_sender: NetworkChan, pub data: D, } impl, D> Peer { fn new( client: Arc, - sync: Arc>, - queue: Arc>>, import_queue: Arc>, + protocol_sender: Sender>, + network_sender: NetworkChan, + network_port: NetworkPort, data: D, ) -> Self { - let executor = Arc::new(DummyContextExecutor(sync.clone(), queue.clone())); - Peer { client, sync, queue, import_queue, executor, data } + let network_port = Mutex::new(network_port); + Peer { + client, + protocol_sender, + import_queue, + network_sender, + network_port, + data, + } } /// Called after blockchain has been populated to updated current state. fn start(&self) { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); - let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); + let header = self + .client + .header(&BlockId::Hash(info.chain.best_hash)) + .unwrap() + .unwrap(); let network_link = NetworkLink { - sync: Arc::downgrade(self.sync.sync()), - context: Arc::downgrade(&self.executor), + protocol_sender: self.protocol_sender.clone(), + network_sender: self.network_sender.clone(), }; self.import_queue.start(network_link).expect("Test ImportQueue always starts"); - self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); + let _ = self + .protocol_sender + .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); + } + + pub fn on_block_imported( + &self, + hash: ::Hash, + header: &::Header, + ) { + let _ = self + .protocol_sender + .send(ProtocolMsg::BlockImported(hash, header.clone())); } /// Called on connection to other indicated peer. fn on_connect(&self, other: NodeIndex) { - self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other); - } - - pub fn consensus_gossip(&self) -> &RwLock> { - self.sync.consensus_gossip() + let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new())); } /// Called on disconnect from other indicated peer. fn on_disconnect(&self, other: NodeIndex) { - let mut io = TestIo::new(&self.queue, Some(other)); - self.sync.on_peer_disconnected(&mut io, other); + let _ = self + .protocol_sender + .send(ProtocolMsg::PeerDisconnected(other, String::new())); } /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: NodeIndex, msg: TestPacket) -> HashSet { - let mut io = TestIo::new(&self.queue, Some(from)); - self.sync.handle_packet(&mut io, from, &msg.data); - self.flush(); - io.to_disconnect.clone() - } - - #[cfg(test)] - fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U { - let mut io = TestIo::new(&self.queue, None); - f(&mut io) + fn receive_message(&self, from: NodeIndex, msg: Vec) { + match Decode::decode(&mut (&msg as &[u8])) { + Some(m) => { + let _ = self + .protocol_sender + .send(ProtocolMsg::CustomMessage(from, m)); + } + None => { + let _ = self.network_sender.send(NetworkMsg::ReportPeer( + from, + Severity::Bad("Peer sent us a packet with invalid format".to_string()), + )); + } + } } /// Produce the next pending message to send to another peer. - fn pending_message(&self) -> Option { - self.flush(); - self.queue.write().pop_front() + fn pending_message(&self) -> Option { + select! { + recv(self.network_port.lock().receiver()) -> msg => return msg.ok(), + // If there are no messages ready, give protocol a change to send one. + recv(channel::after(Duration::from_millis(100))) -> _ => return None, + } + } + + /// Produce the next pending message to send to another peer, without waiting. + fn pending_message_fast(&self) -> Option { + self.network_port.lock().receiver().try_recv().ok() } /// Whether this peer is done syncing (has no messages to send). fn is_done(&self) -> bool { - self.queue.read().is_empty() + self.network_port.lock().receiver().is_empty() } /// Execute a "sync step". This is called for each peer after it sends a packet. fn sync_step(&self) { - self.flush(); - self.sync.tick(&mut TestIo::new(&self.queue, None)); + let _ = self.protocol_sender.send(ProtocolMsg::Tick); } /// Send block import notifications. fn send_import_notifications(&self) { let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); + let _ = self + .protocol_sender + .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); } /// Send block finalization notifications. pub fn send_finality_notifications(&self) { let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); - self.sync.on_block_finalized(&mut TestIo::new(&self.queue, None), info.chain.finalized_hash, &header); + let _ = self + .protocol_sender + .send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())); } /// Restart sync for a peer. fn restart_sync(&self) { - self.sync.abort(); + let _ = self.protocol_sender.send(ProtocolMsg::Abort); } - fn flush(&self) { + pub fn status(&self) -> ProtocolStatus { + let (sender, port) = channel::unbounded(); + let _ = self.protocol_sender.send(ProtocolMsg::Status(sender)); + port.recv().unwrap() } /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. - pub fn gossip_message(&self, topic: Hash, data: Vec, broadcast: bool) { - self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast); + pub fn gossip_message(&self, topic: ::Hash, data: Vec, broadcast: bool) { + let _ = self + .protocol_sender + .send(ProtocolMsg::GossipConsensusMessage(topic, data, broadcast)); + } + + pub fn consensus_gossip_collect_garbage_for(&self, topic: ::Hash) { + self.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic)) + } + + /// access the underlying consensus gossip handler + pub fn consensus_gossip_messages_for( + &self, + topic: ::Hash, + ) -> mpsc::UnboundedReceiver { + let (tx, rx) = oneshot::channel(); + self.with_gossip(move |gossip, _| { + let inner_rx = gossip.messages_for(topic); + let _ = tx.send(inner_rx); + }); + rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") + } + + /// Execute a closure with the consensus gossip. + pub fn with_gossip(&self, f: F) + where F: FnOnce(&mut ConsensusGossip, &mut Context) + Send + 'static + { + let _ = self + .protocol_sender + .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } /// Announce a block to peers. pub fn announce_block(&self, block: Hash) { - self.sync.announce_block(&mut TestIo::new(&self.queue, None), block); + let _ = self.protocol_sender.send(ProtocolMsg::AnnounceBlock(block)); } /// Request a justification for the given block. #[cfg(test)] fn request_justification(&self, hash: &::primitives::H256, number: NumberFor) { - self.executor.execute_in_context(|context| { - self.sync.sync().write().request_justification(hash, number, context); - }) + let _ = self + .protocol_sender + .send(ProtocolMsg::RequestJustification(hash.clone(), number)); } /// Add blocks to the peer -- edit the block before adding @@ -429,23 +437,28 @@ impl, D> Peer { let builder = self.client.new_block_at(&at).unwrap(); let block = edit_block(builder); let hash = block.header.hash(); - trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash); + trace!( + "Generating {}, (#{}, parent={})", + hash, + block.header.number, + block.header.parent_hash + ); let header = block.header.clone(); at = BlockId::Hash(hash); // NOTE: if we use a non-synchronous queue in the test-net in the future, // this may not work. - self.import_queue.import_blocks(origin, vec![ - IncomingBlock { - origin: None, + self.import_queue.import_blocks( + origin, + vec![IncomingBlock { + origin: None, hash, header: Some(header), body: Some(block.extrinsics), justification: None, - }, + } ]); } - } /// Push blocks to the peer (simplified: with or without a TX) @@ -483,13 +496,6 @@ impl, D> Peer { }); } - /// Execute a function with specialization for this peer. - pub fn with_spec(&self, f: F) -> U - where F: FnOnce(&mut DummySpecialization, &mut Context) -> U - { - self.sync.with_spec(&mut TestIo::new(&self.queue, None), f) - } - /// Get a reference to the client. pub fn client(&self) -> &Arc { &self.client @@ -554,23 +560,26 @@ pub trait TestNetFactory: Sized { let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(client.clone(), config); let (block_import, justification_import, data) = self.make_block_import(client.clone()); + let (network_sender, network_port) = network_channel(ProtocolId::default()); let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import, justification_import)); let specialization = DummySpecialization { }; - let sync = Protocol::new( + let protocol_sender = Protocol::new( + network_sender.clone(), config.clone(), client.clone(), import_queue.clone(), None, tx_pool, - specialization + specialization, ).unwrap(); let peer = Arc::new(Peer::new( client, - Arc::new(sync), - Arc::new(RwLock::new(VecDeque::new())), import_queue, + protocol_sender, + network_sender, + network_port, data, )); @@ -594,44 +603,58 @@ pub trait TestNetFactory: Sized { } } }); + self.route(None); self.set_started(true); } /// Do one step of routing. - fn route(&mut self) { + fn route(&mut self, disconnected: Option>) { self.mut_peers(move |peers| { + let mut to_disconnect = HashSet::new(); for peer in 0..peers.len() { let packet = peers[peer].pending_message(); - if let Some(packet) = packet { - let disconnecting = { - let recipient = packet.recipient; - trace!(target: "sync", "--- {} -> {} ---", peer, recipient); - let to_disconnect = peers[recipient].receive_message(peer as NodeIndex, packet); - for d in &to_disconnect { - // notify this that disconnecting peers are disconnecting - peers[recipient].on_disconnect(*d as NodeIndex); + match packet { + None => continue, + Some(NetworkMsg::Outgoing(recipient, packet)) => { + if let Some(disconnected) = disconnected.as_ref() { + let mut current = HashSet::new(); + current.insert(peer); + current.insert(recipient); + // Not routing message between "disconnected" nodes. + if disconnected.is_subset(¤t) { + continue; + } } - to_disconnect - }; - for d in &disconnecting { - // notify other peers that this peer is disconnecting - peers[*d].on_disconnect(peer as NodeIndex); + peers[recipient].receive_message(peer as NodeIndex, packet) } + Some(NetworkMsg::ReportPeer(who, _)) => { + to_disconnect.insert(who); + } + Some(_msg) => continue, + } + } + for d in to_disconnect { + for peer in 0..peers.len() { + peers[peer].on_disconnect(d); } } }); } - /// Route messages between peers until all queues are empty. - fn route_until_complete(&mut self) { - while !self.done() { - self.route() - } + /// Route all pending outgoing messages, without waiting or disconnecting. + fn route_fast(&mut self) { + self.mut_peers(move |peers| { + for peer in 0..peers.len() { + while let Some(NetworkMsg::Outgoing(recipient, packet)) = peers[peer].pending_message_fast() { + peers[recipient].receive_message(peer as NodeIndex, packet) + } + } + }); } /// Do a step of synchronization. fn sync_step(&mut self) { - self.route(); + self.route(None); self.mut_peers(|peers| { for peer in peers { @@ -667,10 +690,26 @@ pub trait TestNetFactory: Sized { fn sync(&mut self) -> u32 { self.start(); let mut total_steps = 0; + self.sync_step(); + self.route(None); + while !self.done() { + total_steps += 1; + self.route(None); + } + total_steps + } + + /// Perform synchronization until complete, + /// excluding sync between certain nodes. + fn sync_with_disconnected(&mut self, disconnected: HashSet) -> u32 { + self.start(); + let mut total_steps = 0; + self.sync_step(); + self.route(Some(disconnected.clone())); while !self.done() { self.sync_step(); total_steps += 1; - self.route(); + self.route(Some(disconnected.clone())); } total_steps } @@ -685,6 +724,16 @@ pub trait TestNetFactory: Sized { /// Whether all peers have synced. fn done(&self) -> bool { + for _ in 0..10 { + if self.peers().iter().all(|p| p.is_done()) { + // If all peers are done, wait a little bit + // in case one is still about to send a message. + thread::sleep(Duration::from_millis(1000)); + continue; + } + // Do another round of routing. + return false + } self.peers().iter().all(|p| p.is_done()) } } diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 1313bf4c2c..2f9d5512ce 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -18,7 +18,10 @@ use client::backend::Backend; use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use config::Roles; use consensus::BlockOrigin; +use network_libp2p::NodeIndex; use sync::SyncState; +use std::{thread, time}; +use std::collections::HashSet; use super::*; #[test] @@ -29,7 +32,7 @@ fn sync_from_two_peers_works() { net.peer(2).push_blocks(100, false); net.sync(); assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain())); - let status = net.peer(0).sync.status(); + let status = net.peer(0).status(); assert_eq!(status.sync.state, SyncState::Idle); } @@ -49,9 +52,12 @@ fn sync_from_two_peers_with_ancestry_search_works() { fn sync_long_chain_works() { let mut net = TestNet::new(2); net.peer(1).push_blocks(500, false); - net.sync_steps(3); - assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading); net.sync(); + // Wait for peer 0 to import blocks received over the network. + thread::sleep(time::Duration::from_millis(1000)); + net.sync(); + // Wait for peers to get up to speed. + thread::sleep(time::Duration::from_millis(1000)); assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain())); } @@ -137,7 +143,7 @@ fn own_blocks_are_announced() { net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap()); let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap(); - net.peer(0).with_io(|io| net.peer(0).sync.on_block_imported(io, header.hash(), &header)); + net.peer(0).on_block_imported(header.hash(), &header); net.sync(); assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1); assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1); @@ -166,10 +172,11 @@ fn blocks_are_not_announced_by_light_nodes() { net.peer(0).on_connect(1); net.peer(1).on_connect(2); - // generate block at peer0 && run sync - while !net.done() { - net.sync_step(); - } + // Only sync between 0 -> 1, and 1 -> 2 + let mut disconnected = HashSet::new(); + disconnected.insert(0 as NodeIndex); + disconnected.insert(2 as NodeIndex); + net.sync_with_disconnected(disconnected); // peer 0 has the best chain // peer 1 has the best chain diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index c1a9c63fe9..b83fc38975 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -38,11 +38,7 @@ use parking_lot::Mutex; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. /// Network service type for a factory. -pub type NetworkService = network::Service< - ::Block, - ::NetworkProtocol, - <::Block as BlockT>::Hash, ->; +pub type NetworkService = network::Service<::Block, ::NetworkProtocol>; /// Code executor type for a factory. pub type CodeExecutor = NativeExecutor<::RuntimeDispatch>; @@ -59,7 +55,7 @@ pub type FullExecutor = client::LocalCallExecutor< /// Light client backend type for a factory. pub type LightBackend = client::light::backend::Backend< client_db::light::LightStorage<::Block>, - network::OnDemand<::Block, NetworkService>, + network::OnDemand<::Block>, Blake2Hasher, >; @@ -68,20 +64,20 @@ pub type LightExecutor = client::light::call_executor::RemoteOrLocalCallExecu ::Block, client::light::backend::Backend< client_db::light::LightStorage<::Block>, - network::OnDemand<::Block, NetworkService>, + network::OnDemand<::Block>, Blake2Hasher >, client::light::call_executor::RemoteCallExecutor< client::light::blockchain::Blockchain< client_db::light::LightStorage<::Block>, - network::OnDemand<::Block, NetworkService> + network::OnDemand<::Block> >, - network::OnDemand<::Block, NetworkService> + network::OnDemand<::Block> >, client::LocalCallExecutor< client::light::backend::Backend< client_db::light::LightStorage<::Block>, - network::OnDemand<::Block, NetworkService>, + network::OnDemand<::Block>, Blake2Hasher >, CodeExecutor @@ -363,7 +359,7 @@ pub trait Components: Sized + 'static { ) -> Result< ( Arc>, - Option, NetworkService>>> + Option>>> ), error::Error >; @@ -429,7 +425,7 @@ impl Components for FullComponents { ) -> Result<( Arc>, - Option, NetworkService>>> + Option>>> ), error::Error> { let db_settings = client_db::DatabaseSettings { @@ -505,7 +501,7 @@ impl Components for LightComponents { -> Result< ( Arc>, - Option, NetworkService>>> + Option>>> ), error::Error> { let db_settings = client_db::DatabaseSettings { diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index f4c9463ab3..e8b2581daa 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -192,12 +192,12 @@ impl Service { }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let network = network::Service::new( + let (network, network_chan) = network::Service::new( network_params, protocol_id, import_queue )?; - on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network))); + on_demand.map(|on_demand| on_demand.set_network_sender(network_chan)); { // block notifications @@ -208,7 +208,7 @@ impl Service { let events = client.import_notification_stream() .for_each(move |notification| { if let Some(network) = network.upgrade() { - network.on_block_imported(notification.hash, ¬ification.header); + network.on_block_imported(notification.hash, notification.header); } if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) { Components::TransactionPool::on_block_imported( @@ -260,7 +260,7 @@ impl Service { let events = MostRecentNotification(client.finality_notification_stream().fuse()) .for_each(move |notification| { if let Some(network) = network.upgrade() { - network.on_block_finalized(notification.hash, ¬ification.header); + network.on_block_finalized(notification.hash, notification.header); } Ok(()) }) diff --git a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm index 96e58fb7eb..22611f75a4 100644 Binary files a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm and b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm differ diff --git a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm index 4c583a3897..51ed6f7dde 100644 Binary files a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm and b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm differ