Remove the NetworkChan from the OnDemandService (#2561)

This commit is contained in:
Pierre Krieger
2019-05-13 20:55:27 +02:00
committed by Gavin Wood
parent 7fb3dc5f68
commit 2724cdac33
3 changed files with 142 additions and 72 deletions
+118 -63
View File
@@ -17,7 +17,7 @@
//! On-demand requests service.
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use log::{trace, info};
use futures::{Async, Future, Poll};
@@ -32,7 +32,8 @@ use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
use crate::message;
use network_libp2p::PeerId;
use crate::config::Roles;
use crate::service::{NetworkChan, NetworkMsg};
use crate::service::Service as NetworkService;
use crate::specialization::NetworkSpecialization;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
/// Remote request timeout.
@@ -77,11 +78,43 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
);
}
/// Trait used by the `OnDemand` service to communicate messages back to the network.
pub trait OnDemandNetwork<B: BlockT> {
/// Adjusts the reputation of the given peer.
fn report_peer(&self, who: &PeerId, reputation_change: i32);
/// Disconnect from the given peer. Used in case of misbehaviour.
fn disconnect_peer(&self, who: &PeerId);
/// Send a request to a peer.
fn send_request(&self, who: &PeerId, message: message::Message<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>> OnDemandNetwork<B> for Weak<NetworkService<B, S>> {
fn report_peer(&self, who: &PeerId, reputation_change: i32) {
if let Some(service) = self.upgrade() {
service.report_peer(who.clone(), reputation_change)
}
}
fn disconnect_peer(&self, who: &PeerId) {
if let Some(service) = self.upgrade() {
service.disconnect_peer(who.clone())
}
}
fn send_request(&self, who: &PeerId, message: message::Message<B>) {
if let Some(service) = self.upgrade() {
service.send_request(who.clone(), message)
}
}
}
/// On-demand requests service. Dispatches requests to appropriate peers.
pub struct OnDemand<B: BlockT> {
core: Mutex<OnDemandCore<B>>,
checker: Arc<FetchChecker<B>>,
network_sender: Mutex<Option<NetworkChan<B>>>,
network_interface: Mutex<Option<Box<dyn OnDemandNetwork<B> + Send + Sync + 'static>>>,
}
/// On-demand remote call response.
@@ -144,7 +177,7 @@ impl<B: BlockT> OnDemand<B> where
pub fn new(checker: Arc<FetchChecker<B>>) -> Self {
OnDemand {
checker,
network_sender: Mutex::new(None),
network_interface: Mutex::new(None),
core: Mutex::new(OnDemandCore {
next_request_id: 0,
pending_requests: VecDeque::new(),
@@ -161,16 +194,32 @@ impl<B: BlockT> OnDemand<B> where
}
/// Sets weak reference to network service.
pub fn set_network_sender(&self, network_sender: NetworkChan<B>) {
self.network_sender.lock().replace(network_sender);
pub fn set_network_interface(&self, network_interface: Box<dyn OnDemandNetwork<B> + Send + Sync + 'static>) {
self.network_interface.lock().replace(network_interface);
}
fn send(&self, msg: NetworkMsg<B>) {
let _ = self.network_sender
fn report_peer(&self, who: &PeerId, reputation_change: i32) {
self.network_interface
.lock()
.as_ref()
.expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now")
.send(msg);
.report_peer(who, reputation_change);
}
fn disconnect_peer(&self, who: &PeerId) {
self.network_interface
.lock()
.as_ref()
.expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now")
.disconnect_peer(who);
}
fn send_request(&self, who: &PeerId, msg: message::Message<B>) {
self.network_interface
.lock()
.as_ref()
.expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now")
.send_request(who, msg);
}
/// Schedule && dispatch all scheduled requests.
@@ -188,8 +237,8 @@ impl<B: BlockT> OnDemand<B> where
Some(request) => request,
None => {
info!("Invalid remote {} response from peer {}", rtype, peer);
self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value()));
self.send(NetworkMsg::DisconnectPeer(peer.clone()));
self.report_peer(&peer, i32::min_value());
self.disconnect_peer(&peer);
core.remove_peer(peer);
return;
},
@@ -200,8 +249,8 @@ impl<B: BlockT> OnDemand<B> where
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
info!("Failed to check remote {} response from peer {}: {}", rtype, peer, error);
self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value()));
self.send(NetworkMsg::DisconnectPeer(peer.clone()));
self.report_peer(&peer, i32::min_value());
self.disconnect_peer(&peer);
core.remove_peer(peer);
if retry_count > 0 {
@@ -214,8 +263,8 @@ impl<B: BlockT> OnDemand<B> where
},
Accept::Unexpected(retry_request_data) => {
info!("Unexpected response to remote {} from peer", rtype);
self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value()));
self.send(NetworkMsg::DisconnectPeer(peer.clone()));
self.report_peer(&peer, i32::min_value());
self.disconnect_peer(&peer);
core.remove_peer(peer);
(retry_count, Some(retry_request_data))
@@ -259,8 +308,8 @@ impl<B> OnDemandService<B> for OnDemand<B> where
fn maintain_peers(&self) {
let mut core = self.core.lock();
for bad_peer in core.maintain_peers() {
self.send(NetworkMsg::ReportPeer(bad_peer.clone(), TIMEOUT_REPUTATION_CHANGE));
self.send(NetworkMsg::DisconnectPeer(bad_peer));
self.report_peer(&bad_peer, TIMEOUT_REPUTATION_CHANGE);
self.disconnect_peer(&bad_peer);
}
core.dispatch(self);
}
@@ -505,7 +554,7 @@ impl<B> OnDemandCore<B> where
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
on_demand.send(NetworkMsg::Outgoing(peer.clone(), request.message()));
on_demand.send_request(&peer, request.message());
self.active_peers.insert(peer, request);
}
@@ -580,10 +629,11 @@ impl<Block: BlockT> RequestData<Block> {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use futures::Future;
use runtime_primitives::traits::NumberFor;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use client::{error::{Error as ClientError, Result as ClientResult}};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
ChangesProof, RemoteCallRequest, RemoteReadRequest,
@@ -591,8 +641,7 @@ pub mod tests {
use crate::config::Roles;
use crate::message;
use network_libp2p::PeerId;
use crate::service::{network_channel, NetworkPort, NetworkMsg};
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandNetwork, OnDemandService};
use test_client::runtime::{changes_trie_config, Block, Header};
pub struct DummyExecutor;
@@ -672,15 +721,21 @@ pub mod tests {
}
}
fn assert_disconnected_peer(network_port: NetworkPort<Block>) {
let mut disconnect_count = 0;
while let Ok(msg) = network_port.receiver().try_recv() {
match msg {
NetworkMsg::DisconnectPeer(_) => disconnect_count = disconnect_count + 1,
_ => {},
}
#[derive(Default)]
struct DummyNetwork {
disconnected_peers: Mutex<HashSet<PeerId>>,
}
impl<B: BlockT> OnDemandNetwork<B> for Arc<DummyNetwork> {
fn report_peer(&self, _: &PeerId, _: i32) {}
fn disconnect_peer(&self, who: &PeerId) {
self.disconnected_peers.lock().unwrap().insert(who.clone());
}
assert_eq!(disconnect_count, 1);
fn send_request(&self, _: &PeerId, _: message::Message<B>) {}
}
fn assert_disconnected_peer(dummy: Arc<DummyNetwork>) {
assert_eq!(dummy.disconnected_peers.lock().unwrap().len(), 1);
}
#[test]
@@ -714,10 +769,10 @@ pub mod tests {
#[test]
fn disconnects_from_timeouted_peer() {
let (_x, on_demand) = dummy(true);
let (network_sender, network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
let peer1 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
on_demand.on_connect(peer1.clone(), Roles::FULL, 1000);
assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
@@ -737,15 +792,15 @@ pub mod tests {
on_demand.maintain_peers();
assert!(on_demand.core.lock().idle_peers.is_empty());
assert_eq!(vec![peer1.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
assert_disconnected_peer(network_port);
assert_disconnected_peer(network_interface);
}
#[test]
fn disconnects_from_peer_on_response_with_wrong_id() {
let (_x, on_demand) = dummy(true);
let peer0 = PeerId::random();
let (network_sender, network_port) = network_channel();
on_demand.set_network_sender(network_sender.clone());
let network_interface = Arc::new(DummyNetwork::default());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
on_demand.remote_call(RemoteCallRequest {
@@ -756,16 +811,16 @@ pub mod tests {
retry_count: None,
});
receive_call_response(&*on_demand, peer0, 1);
assert_disconnected_peer(network_port);
assert_disconnected_peer(network_interface);
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_incorrect_response() {
let (_x, on_demand) = dummy(false);
let (network_sender, network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
@@ -776,28 +831,28 @@ pub mod tests {
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
receive_call_response(&*on_demand, peer0.clone(), 0);
assert_disconnected_peer(network_port);
assert_disconnected_peer(network_interface);
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_unexpected_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
receive_call_response(&*on_demand, peer0, 0);
assert_disconnected_peer(network_port);
assert_disconnected_peer(network_interface);
}
#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let peer0 = PeerId::random();
let (network_sender, network_port) = network_channel();
on_demand.set_network_sender(network_sender.clone());
let network_interface = Arc::new(DummyNetwork::default());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
on_demand.remote_call(RemoteCallRequest {
@@ -812,7 +867,7 @@ pub mod tests {
id: 0,
proof: vec![vec![2]],
});
assert_disconnected_peer(network_port);
assert_disconnected_peer(network_interface);
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
@@ -823,8 +878,8 @@ pub mod tests {
let retry_count = 2;
let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::<Vec<_>>();
let (_x, on_demand) = dummy(false);
let (network_sender, _network_port) = network_channel();
on_demand.set_network_sender(network_sender.clone());
let network_interface = Arc::new(DummyNetwork::default());
on_demand.set_network_interface(Box::new(network_interface.clone()));
for i in 0..retry_count+1 {
on_demand.on_connect(peer_ids[i].clone(), Roles::FULL, 1000);
}
@@ -863,9 +918,9 @@ pub mod tests {
#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
let response = on_demand.remote_call(RemoteCallRequest {
@@ -887,9 +942,9 @@ pub mod tests {
#[test]
fn receives_remote_read_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
let response = on_demand.remote_read(RemoteReadRequest {
@@ -913,9 +968,9 @@ pub mod tests {
#[test]
fn receives_remote_read_child_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
let response = on_demand.remote_read_child(RemoteReadChildRequest {
@@ -941,9 +996,9 @@ pub mod tests {
#[test]
fn receives_remote_header_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
let response = on_demand.remote_header(RemoteHeaderRequest {
@@ -977,9 +1032,9 @@ pub mod tests {
#[test]
fn receives_remote_changes_response() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer0 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer0.clone(), Roles::FULL, 1000);
let response = on_demand.remote_changes(RemoteChangesRequest {
@@ -1009,10 +1064,10 @@ pub mod tests {
#[test]
fn does_not_sends_request_to_peer_who_has_no_required_block() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer1 = PeerId::random();
let peer2 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.on_connect(peer1.clone(), Roles::FULL, 100);
@@ -1063,11 +1118,11 @@ pub mod tests {
// loop forever after dispatching a request to the last peer, since the
// last peer was not updated
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
@@ -1091,9 +1146,9 @@ pub mod tests {
#[test]
fn tries_to_send_all_pending_requests() {
let (_x, on_demand) = dummy(true);
let (network_sender, _network_port) = network_channel();
let network_interface = Arc::new(DummyNetwork::default());
let peer1 = PeerId::random();
on_demand.set_network_sender(network_sender.clone());
on_demand.set_network_interface(Box::new(network_interface.clone()));
on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
+20 -3
View File
@@ -177,6 +177,8 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
is_major_syncing: Arc<AtomicBool>,
/// Peers whom we are connected with.
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
/// Channel for networking messages processed by the background thread.
network_chan: NetworkChan<B>,
/// Network service
network: Arc<Mutex<NetworkService<Message<B>>>>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
@@ -196,7 +198,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
params: Params<B, S, H>,
protocol_id: ProtocolId,
import_queue: Box<ImportQueue<B>>,
) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> {
) -> Result<Arc<Service<B, S>>, Error> {
let (network_chan, network_port) = network_channel();
let status_sinks = Arc::new(Mutex::new(Vec::new()));
// Start in off-line mode, since we're not connected to any nodes yet.
@@ -230,6 +232,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
status_sinks,
is_offline,
is_major_syncing,
network_chan: network_chan.clone(),
peers,
peerset,
network,
@@ -240,12 +243,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
// connect the import-queue to the network service.
let link = NetworkLink {
protocol_sender,
network_sender: network_chan.clone(),
network_sender: network_chan,
};
import_queue.start(Box::new(link))?;
Ok((service, network_chan))
Ok(service)
}
/// Returns the downloaded bytes per second averaged over the past few seconds.
@@ -313,6 +316,20 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
self.peerset.report_peer(who, cost_benefit);
}
/// Send a message to the given peer. Has no effect if we're not connected to this peer.
///
/// This method is extremely poor in terms of API and should be eventually removed.
pub fn disconnect_peer(&self, who: PeerId) {
let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(who));
}
/// Send a message to the given peer. Has no effect if we're not connected to this peer.
///
/// This method is extremely poor in terms of API and should be eventually removed.
pub fn send_request(&self, who: PeerId, message: Message<B>) {
let _ = self.network_chan.send(NetworkMsg::Outgoing(who, message));
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut Context<B>) + Send + 'static