Simplify the network background thread future (#2419)

This commit is contained in:
Pierre Krieger
2019-04-29 18:57:12 +02:00
committed by Gavin Wood
parent bad3ce4e17
commit 8b7258f850
+63 -84
View File
@@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::{io, thread};
use log::{warn, debug, error, trace, info};
use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc};
use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use network_libp2p::{ProtocolId, NetworkConfiguration, Severity};
use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
@@ -525,92 +525,71 @@ fn run_thread<B: BlockT + 'static>(
peerset: PeersetHandle,
) -> impl Future<Item = (), Error = io::Error> {
let network_service_2 = network_service.clone();
futures::future::poll_fn(move || {
loop {
match network_port.take_one_message() {
Ok(None) => break,
Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) => {
network_service
.lock()
.send_custom_message(&who, outgoing_message);
},
Ok(Some(NetworkMsg::ReportPeer(who, severity))) => {
match severity {
Severity::Bad(message) => {
info!(target: "sync", "Banning {:?} because {:?}", who, message);
network_service.lock().drop_node(&who);
// temporary: make sure the peer gets dropped from the peerset
peerset.report_peer(who, i32::min_value());
},
Severity::Useless(message) => {
debug!(target: "sync", "Dropping {:?} because {:?}", who, message);
network_service.lock().drop_node(&who)
},
Severity::Timeout => {
debug!(target: "sync", "Dropping {:?} because it timed out", who);
network_service.lock().drop_node(&who)
},
}
},
#[cfg(any(test, feature = "test-helpers"))]
Ok(Some(NetworkMsg::Synchronized)) => {}
// Protocol produces a stream of messages about what happens in sync.
let protocol = stream::poll_fn(move || {
match network_port.take_one_message() {
Ok(Some(message)) => Ok(Async::Ready(Some(message))),
Ok(None) => Ok(Async::NotReady),
Err(_) => Err(())
Err(_) => return Ok(Async::Ready(())),
}
}
}).for_each(move |msg| {
// Handle message from Protocol.
match msg {
NetworkMsg::Outgoing(who, outgoing_message) => {
network_service_2
.lock()
.send_custom_message(&who, outgoing_message);
},
NetworkMsg::ReportPeer(who, severity) => {
match severity {
Severity::Bad(message) => {
info!(target: "sync", "Banning {:?} because {:?}", who, message);
network_service_2.lock().drop_node(&who);
// temporary: make sure the peer gets dropped from the peerset
peerset.report_peer(who, i32::min_value());
},
Severity::Useless(message) => {
debug!(target: "sync", "Dropping {:?} because {:?}", who, message);
network_service_2.lock().drop_node(&who)
},
Severity::Timeout => {
debug!(target: "sync", "Dropping {:?} because it timed out", who);
network_service_2.lock().drop_node(&who)
},
loop {
match network_service.lock().poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
debug_assert!(
version <= protocol::CURRENT_VERSION as u8
&& version >= protocol::MIN_VERSION as u8
);
let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info));
}
},
#[cfg(any(test, feature = "test-helpers"))]
NetworkMsg::Synchronized => (),
Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info));
}
Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => {
let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message));
}
Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg)));
}
}
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Err(err) => {
error!(target: "sync", "Error in the network: {:?}", err);
return Err(err)
}
}
}
Ok(())
Ok(Async::NotReady)
})
.then(|res| {
match res {
Ok(()) => (),
Err(_) => error!("Protocol disconnected"),
};
Ok(())
});
// The network service produces events about what happens on the network. Let's process them.
let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| {
match event {
NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. } => {
debug_assert!(
version <= protocol::CURRENT_VERSION as u8
&& version >= protocol::MIN_VERSION as u8
);
let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info));
}
NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. } => {
let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info));
}
NetworkServiceEvent::CustomMessage { peer_id, message, .. } => {
let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message));
return Ok(())
}
NetworkServiceEvent::Clogged { peer_id, messages, .. } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg)));
}
}
};
Ok(())
});
// Merge all futures into one.
let futures: Vec<Box<Future<Item = (), Error = io::Error> + Send>> = vec![
Box::new(protocol) as Box<_>,
Box::new(network) as Box<_>
];
futures::select_all(futures)
.and_then(move |_| {
debug!("Networking ended");
Ok(())
})
.map_err(|(r, _, _)| r)
}