diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index 504cf23385..0d7538936d 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -17,7 +17,7 @@ //! On-demand requests service. use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; use log::{trace, info}; use futures::{Async, Future, Poll}; @@ -32,7 +32,8 @@ use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, use crate::message; use network_libp2p::PeerId; use crate::config::Roles; -use crate::service::{NetworkChan, NetworkMsg}; +use crate::service::Service as NetworkService; +use crate::specialization::NetworkSpecialization; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Remote request timeout. @@ -77,11 +78,43 @@ pub trait OnDemandService: Send + Sync { ); } +/// Trait used by the `OnDemand` service to communicate messages back to the network. +pub trait OnDemandNetwork { + /// Adjusts the reputation of the given peer. + fn report_peer(&self, who: &PeerId, reputation_change: i32); + + /// Disconnect from the given peer. Used in case of misbehaviour. + fn disconnect_peer(&self, who: &PeerId); + + /// Send a request to a peer. + fn send_request(&self, who: &PeerId, message: message::Message); +} + +impl> OnDemandNetwork for Weak> { + fn report_peer(&self, who: &PeerId, reputation_change: i32) { + if let Some(service) = self.upgrade() { + service.report_peer(who.clone(), reputation_change) + } + } + + fn disconnect_peer(&self, who: &PeerId) { + if let Some(service) = self.upgrade() { + service.disconnect_peer(who.clone()) + } + } + + fn send_request(&self, who: &PeerId, message: message::Message) { + if let Some(service) = self.upgrade() { + service.send_request(who.clone(), message) + } + } +} + /// On-demand requests service. Dispatches requests to appropriate peers. pub struct OnDemand { core: Mutex>, checker: Arc>, - network_sender: Mutex>>, + network_interface: Mutex + Send + Sync + 'static>>>, } /// On-demand remote call response. @@ -144,7 +177,7 @@ impl OnDemand where pub fn new(checker: Arc>) -> Self { OnDemand { checker, - network_sender: Mutex::new(None), + network_interface: Mutex::new(None), core: Mutex::new(OnDemandCore { next_request_id: 0, pending_requests: VecDeque::new(), @@ -161,16 +194,32 @@ impl OnDemand where } /// Sets weak reference to network service. - pub fn set_network_sender(&self, network_sender: NetworkChan) { - self.network_sender.lock().replace(network_sender); + pub fn set_network_interface(&self, network_interface: Box + Send + Sync + 'static>) { + self.network_interface.lock().replace(network_interface); } - fn send(&self, msg: NetworkMsg) { - let _ = self.network_sender + fn report_peer(&self, who: &PeerId, reputation_change: i32) { + self.network_interface .lock() .as_ref() .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") - .send(msg); + .report_peer(who, reputation_change); + } + + fn disconnect_peer(&self, who: &PeerId) { + self.network_interface + .lock() + .as_ref() + .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") + .disconnect_peer(who); + } + + fn send_request(&self, who: &PeerId, msg: message::Message) { + self.network_interface + .lock() + .as_ref() + .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") + .send_request(who, msg); } /// Schedule && dispatch all scheduled requests. @@ -188,8 +237,8 @@ impl OnDemand where Some(request) => request, None => { info!("Invalid remote {} response from peer {}", rtype, peer); - self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); - self.send(NetworkMsg::DisconnectPeer(peer.clone())); + self.report_peer(&peer, i32::min_value()); + self.disconnect_peer(&peer); core.remove_peer(peer); return; }, @@ -200,8 +249,8 @@ impl OnDemand where Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { info!("Failed to check remote {} response from peer {}: {}", rtype, peer, error); - self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); - self.send(NetworkMsg::DisconnectPeer(peer.clone())); + self.report_peer(&peer, i32::min_value()); + self.disconnect_peer(&peer); core.remove_peer(peer); if retry_count > 0 { @@ -214,8 +263,8 @@ impl OnDemand where }, Accept::Unexpected(retry_request_data) => { info!("Unexpected response to remote {} from peer", rtype); - self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); - self.send(NetworkMsg::DisconnectPeer(peer.clone())); + self.report_peer(&peer, i32::min_value()); + self.disconnect_peer(&peer); core.remove_peer(peer); (retry_count, Some(retry_request_data)) @@ -259,8 +308,8 @@ impl OnDemandService for OnDemand where fn maintain_peers(&self) { let mut core = self.core.lock(); for bad_peer in core.maintain_peers() { - self.send(NetworkMsg::ReportPeer(bad_peer.clone(), TIMEOUT_REPUTATION_CHANGE)); - self.send(NetworkMsg::DisconnectPeer(bad_peer)); + self.report_peer(&bad_peer, TIMEOUT_REPUTATION_CHANGE); + self.disconnect_peer(&bad_peer); } core.dispatch(self); } @@ -505,7 +554,7 @@ impl OnDemandCore where let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - on_demand.send(NetworkMsg::Outgoing(peer.clone(), request.message())); + on_demand.send_request(&peer, request.message()); self.active_peers.insert(peer, request); } @@ -580,10 +629,11 @@ impl RequestData { #[cfg(test)] pub mod tests { - use std::sync::Arc; + use std::collections::HashSet; + use std::sync::{Arc, Mutex}; use std::time::Instant; use futures::Future; - use runtime_primitives::traits::NumberFor; + use runtime_primitives::traits::{Block as BlockT, NumberFor}; use client::{error::{Error as ClientError, Result as ClientResult}}; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, ChangesProof, RemoteCallRequest, RemoteReadRequest, @@ -591,8 +641,7 @@ pub mod tests { use crate::config::Roles; use crate::message; use network_libp2p::PeerId; - use crate::service::{network_channel, NetworkPort, NetworkMsg}; - use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; + use super::{REQUEST_TIMEOUT, OnDemand, OnDemandNetwork, OnDemandService}; use test_client::runtime::{changes_trie_config, Block, Header}; pub struct DummyExecutor; @@ -672,15 +721,21 @@ pub mod tests { } } - fn assert_disconnected_peer(network_port: NetworkPort) { - let mut disconnect_count = 0; - while let Ok(msg) = network_port.receiver().try_recv() { - match msg { - NetworkMsg::DisconnectPeer(_) => disconnect_count = disconnect_count + 1, - _ => {}, - } + #[derive(Default)] + struct DummyNetwork { + disconnected_peers: Mutex>, + } + + impl OnDemandNetwork for Arc { + fn report_peer(&self, _: &PeerId, _: i32) {} + fn disconnect_peer(&self, who: &PeerId) { + self.disconnected_peers.lock().unwrap().insert(who.clone()); } - assert_eq!(disconnect_count, 1); + fn send_request(&self, _: &PeerId, _: message::Message) {} + } + + fn assert_disconnected_peer(dummy: Arc) { + assert_eq!(dummy.disconnected_peers.lock().unwrap().len(), 1); } #[test] @@ -714,10 +769,10 @@ pub mod tests { #[test] fn disconnects_from_timeouted_peer() { let (_x, on_demand) = dummy(true); - let (network_sender, network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); let peer1 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); on_demand.on_connect(peer1.clone(), Roles::FULL, 1000); assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); @@ -737,15 +792,15 @@ pub mod tests { on_demand.maintain_peers(); assert!(on_demand.core.lock().idle_peers.is_empty()); assert_eq!(vec![peer1.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::>()); - assert_disconnected_peer(network_port); + assert_disconnected_peer(network_interface); } #[test] fn disconnects_from_peer_on_response_with_wrong_id() { let (_x, on_demand) = dummy(true); let peer0 = PeerId::random(); - let (network_sender, network_port) = network_channel(); - on_demand.set_network_sender(network_sender.clone()); + let network_interface = Arc::new(DummyNetwork::default()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { @@ -756,16 +811,16 @@ pub mod tests { retry_count: None, }); receive_call_response(&*on_demand, peer0, 1); - assert_disconnected_peer(network_port); + assert_disconnected_peer(network_interface); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_incorrect_response() { let (_x, on_demand) = dummy(false); - let (network_sender, network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.remote_call(RemoteCallRequest { block: Default::default(), header: dummy_header(), @@ -776,28 +831,28 @@ pub mod tests { on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); receive_call_response(&*on_demand, peer0.clone(), 0); - assert_disconnected_peer(network_port); + assert_disconnected_peer(network_interface); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_unexpected_response() { let (_x, on_demand) = dummy(true); - let (network_sender, network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); receive_call_response(&*on_demand, peer0, 0); - assert_disconnected_peer(network_port); + assert_disconnected_peer(network_interface); } #[test] fn disconnects_from_peer_on_wrong_response_type() { let (_x, on_demand) = dummy(false); let peer0 = PeerId::random(); - let (network_sender, network_port) = network_channel(); - on_demand.set_network_sender(network_sender.clone()); + let network_interface = Arc::new(DummyNetwork::default()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { @@ -812,7 +867,7 @@ pub mod tests { id: 0, proof: vec![vec![2]], }); - assert_disconnected_peer(network_port); + assert_disconnected_peer(network_interface); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -823,8 +878,8 @@ pub mod tests { let retry_count = 2; let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::>(); let (_x, on_demand) = dummy(false); - let (network_sender, _network_port) = network_channel(); - on_demand.set_network_sender(network_sender.clone()); + let network_interface = Arc::new(DummyNetwork::default()); + on_demand.set_network_interface(Box::new(network_interface.clone())); for i in 0..retry_count+1 { on_demand.on_connect(peer_ids[i].clone(), Roles::FULL, 1000); } @@ -863,9 +918,9 @@ pub mod tests { #[test] fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_call(RemoteCallRequest { @@ -887,9 +942,9 @@ pub mod tests { #[test] fn receives_remote_read_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_read(RemoteReadRequest { @@ -913,9 +968,9 @@ pub mod tests { #[test] fn receives_remote_read_child_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_read_child(RemoteReadChildRequest { @@ -941,9 +996,9 @@ pub mod tests { #[test] fn receives_remote_header_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_header(RemoteHeaderRequest { @@ -977,9 +1032,9 @@ pub mod tests { #[test] fn receives_remote_changes_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer0 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_changes(RemoteChangesRequest { @@ -1009,10 +1064,10 @@ pub mod tests { #[test] fn does_not_sends_request_to_peer_who_has_no_required_block() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.on_connect(peer1.clone(), Roles::FULL, 100); @@ -1063,11 +1118,11 @@ pub mod tests { // loop forever after dispatching a request to the last peer, since the // last peer was not updated let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let peer3 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), @@ -1091,9 +1146,9 @@ pub mod tests { #[test] fn tries_to_send_all_pending_requests() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(); + let network_interface = Arc::new(DummyNetwork::default()); let peer1 = PeerId::random(); - on_demand.set_network_sender(network_sender.clone()); + on_demand.set_network_interface(Box::new(network_interface.clone())); on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 8a81524bf9..3b2e0ae9ff 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -177,6 +177,8 @@ pub struct Service> { is_major_syncing: Arc, /// Peers whom we are connected with. peers: Arc>>>, + /// Channel for networking messages processed by the background thread. + network_chan: NetworkChan, /// Network service network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which @@ -196,7 +198,7 @@ impl> Service { params: Params, protocol_id: ProtocolId, import_queue: Box>, - ) -> Result<(Arc>, NetworkChan), Error> { + ) -> Result>, Error> { let (network_chan, network_port) = network_channel(); let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. @@ -230,6 +232,7 @@ impl> Service { status_sinks, is_offline, is_major_syncing, + network_chan: network_chan.clone(), peers, peerset, network, @@ -240,12 +243,12 @@ impl> Service { // connect the import-queue to the network service. let link = NetworkLink { protocol_sender, - network_sender: network_chan.clone(), + network_sender: network_chan, }; import_queue.start(Box::new(link))?; - Ok((service, network_chan)) + Ok(service) } /// Returns the downloaded bytes per second averaged over the past few seconds. @@ -313,6 +316,20 @@ impl> Service { self.peerset.report_peer(who, cost_benefit); } + /// Send a message to the given peer. Has no effect if we're not connected to this peer. + /// + /// This method is extremely poor in terms of API and should be eventually removed. + pub fn disconnect_peer(&self, who: PeerId) { + let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + } + + /// Send a message to the given peer. Has no effect if we're not connected to this peer. + /// + /// This method is extremely poor in terms of API and should be eventually removed. + pub fn send_request(&self, who: PeerId, message: Message) { + let _ = self.network_chan.send(NetworkMsg::Outgoing(who, message)); + } + /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) where F: FnOnce(&mut S, &mut Context) + Send + 'static diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 53aaff02cc..e99eeade7c 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -199,12 +199,10 @@ impl Service { }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let (network, network_chan) = network::Service::new( - network_params, - protocol_id, - import_queue - )?; - on_demand.map(|on_demand| on_demand.set_network_sender(network_chan)); + let network = network::Service::new(network_params, protocol_id, import_queue)?; + if let Some(on_demand) = on_demand.as_ref() { + on_demand.set_network_interface(Box::new(Arc::downgrade(&network))); + } let inherents_pool = Arc::new(InherentsPool::default()); let offchain_workers = if config.offchain_worker {