diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index f9e827b4ab..28416fe7f4 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; use log::{warn, debug, error, trace, info}; -use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc}; +use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; @@ -525,92 +525,71 @@ fn run_thread( peerset: PeersetHandle, ) -> impl Future { - let network_service_2 = network_service.clone(); + futures::future::poll_fn(move || { + loop { + match network_port.take_one_message() { + Ok(None) => break, + Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) => { + network_service + .lock() + .send_custom_message(&who, outgoing_message); + }, + Ok(Some(NetworkMsg::ReportPeer(who, severity))) => { + match severity { + Severity::Bad(message) => { + info!(target: "sync", "Banning {:?} because {:?}", who, message); + network_service.lock().drop_node(&who); + // temporary: make sure the peer gets dropped from the peerset + peerset.report_peer(who, i32::min_value()); + }, + Severity::Useless(message) => { + debug!(target: "sync", "Dropping {:?} because {:?}", who, message); + network_service.lock().drop_node(&who) + }, + Severity::Timeout => { + debug!(target: "sync", "Dropping {:?} because it timed out", who); + network_service.lock().drop_node(&who) + }, + } + }, + #[cfg(any(test, feature = "test-helpers"))] + Ok(Some(NetworkMsg::Synchronized)) => {} - // Protocol produces a stream of messages about what happens in sync. - let protocol = stream::poll_fn(move || { - match network_port.take_one_message() { - Ok(Some(message)) => Ok(Async::Ready(Some(message))), - Ok(None) => Ok(Async::NotReady), - Err(_) => Err(()) + Err(_) => return Ok(Async::Ready(())), + } } - }).for_each(move |msg| { - // Handle message from Protocol. - match msg { - NetworkMsg::Outgoing(who, outgoing_message) => { - network_service_2 - .lock() - .send_custom_message(&who, outgoing_message); - }, - NetworkMsg::ReportPeer(who, severity) => { - match severity { - Severity::Bad(message) => { - info!(target: "sync", "Banning {:?} because {:?}", who, message); - network_service_2.lock().drop_node(&who); - // temporary: make sure the peer gets dropped from the peerset - peerset.report_peer(who, i32::min_value()); - }, - Severity::Useless(message) => { - debug!(target: "sync", "Dropping {:?} because {:?}", who, message); - network_service_2.lock().drop_node(&who) - }, - Severity::Timeout => { - debug!(target: "sync", "Dropping {:?} because it timed out", who); - network_service_2.lock().drop_node(&who) - }, + + loop { + match network_service.lock().poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { + debug_assert!( + version <= protocol::CURRENT_VERSION as u8 + && version >= protocol::MIN_VERSION as u8 + ); + let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info)); } - }, - #[cfg(any(test, feature = "test-helpers"))] - NetworkMsg::Synchronized => (), + Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { + let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info)); + } + Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => { + let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message)); + } + Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { + debug!(target: "sync", "{} clogging messages:", messages.len()); + for msg in messages.into_iter().take(5) { + debug!(target: "sync", "{:?}", msg); + let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg))); + } + } + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Err(err) => { + error!(target: "sync", "Error in the network: {:?}", err); + return Err(err) + } + } } - Ok(()) + + Ok(Async::NotReady) }) - .then(|res| { - match res { - Ok(()) => (), - Err(_) => error!("Protocol disconnected"), - }; - Ok(()) - }); - - // The network service produces events about what happens on the network. Let's process them. - let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| { - match event { - NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. } => { - debug_assert!( - version <= protocol::CURRENT_VERSION as u8 - && version >= protocol::MIN_VERSION as u8 - ); - let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info)); - } - NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. } => { - let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info)); - } - NetworkServiceEvent::CustomMessage { peer_id, message, .. } => { - let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message)); - return Ok(()) - } - NetworkServiceEvent::Clogged { peer_id, messages, .. } => { - debug!(target: "sync", "{} clogging messages:", messages.len()); - for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg))); - } - } - }; - Ok(()) - }); - - // Merge all futures into one. - let futures: Vec + Send>> = vec![ - Box::new(protocol) as Box<_>, - Box::new(network) as Box<_> - ]; - - futures::select_all(futures) - .and_then(move |_| { - debug!("Networking ended"); - Ok(()) - }) - .map_err(|(r, _, _)| r) }