diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 6ca698df3b..c835b0761a 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4484,6 +4484,7 @@ dependencies = [ "sysinfo 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index 7c8ec3ac1e..3eb5a0d992 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -40,39 +40,36 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe /// Creates an informant in the form of a `Future` that must be polled regularly. pub fn build(service: &Service) -> impl Future where C: Components { - let network = service.network(); let client = service.client(); let mut last_number = None; let mut last_update = time::Instant::now(); - let display_notifications = network.status().for_each(move |sync_status| { + let display_notifications = service.network_status().for_each(move |net_status| { let info = client.info(); let best_number = info.chain.best_number.saturated_into::(); let best_hash = info.chain.best_hash; let speed = move || speed(best_number, last_number, last_update); last_update = time::Instant::now(); - let (status, target) = match (sync_status.sync.state, sync_status.sync.best_seen_block) { + let (status, target) = match (net_status.sync_state, net_status.best_seen_block) { (SyncState::Idle, _) => ("Idle".into(), "".into()), (SyncState::Downloading, None) => (format!("Syncing{}", speed()), "".into()), (SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed()), format!(", target=#{}", n)), }; last_number = Some(best_number); let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); - let bandwidth_download = network.average_download_per_sec(); - let bandwidth_upload = network.average_upload_per_sec(); info!( target: "substrate", "{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⬇ {} ⬆ {}", Colour::White.bold().paint(&status), target, - Colour::White.bold().paint(format!("{}", sync_status.num_peers)), + Colour::White.bold().paint(format!("{}", net_status.num_connected_peers)), Colour::White.paint(format!("{}", best_number)), best_hash, Colour::White.paint(format!("{}", finalized_number)), info.chain.finalized_hash, - TransferRateFormat(bandwidth_download), - TransferRateFormat(bandwidth_upload), + TransferRateFormat(net_status.average_download_per_sec), + TransferRateFormat(net_status.average_upload_per_sec), ); Ok(()) diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 9c022cce44..88c252bb32 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -184,8 +184,8 @@ pub use service::{ NetworkMsg, ExHashT, ReportHandle, }; pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret}; -pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization}; -pub use protocol::sync::{Status as SyncStatus, SyncState}; +pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; pub use libp2p::{identity, PeerId, core::PublicKey}; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 13be819fae..3038898b09 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -33,7 +33,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage}; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use specialization::NetworkSpecialization; -use sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; +use sync::{ChainSync, Context as SyncContext, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::Roles; use rustc_hex::ToHex; @@ -115,17 +115,6 @@ struct HandshakingPeer { timestamp: time::Instant, } -/// Syncing status and statistics -#[derive(Clone)] -pub struct ProtocolStatus { - /// Sync status. - pub sync: SyncStatus, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers. - pub num_active_peers: usize, -} - /// Peer information #[derive(Debug, Clone)] struct Peer { @@ -415,26 +404,33 @@ impl, H: ExHashT> Protocol { }) } - /// Returns an object representing the status of the protocol. - pub fn status(&self) -> ProtocolStatus { - ProtocolStatus { - sync: self.sync.status(), - num_peers: self.context_data.peers.values().count(), - num_active_peers: self - .context_data - .peers - .values() - .filter(|p| p.block_request.is_some()) - .count(), - } + /// Returns the number of peers we're connected to. + pub fn num_connected_peers(&self) -> usize { + self.context_data.peers.values().count() } - pub fn is_major_syncing(&self) -> bool { - self.sync.status().is_major_syncing() + /// Returns the number of peers we're connected to and that are being queried. + pub fn num_active_peers(&self) -> usize { + self.context_data + .peers + .values() + .filter(|p| p.block_request.is_some()) + .count() } - pub fn is_offline(&self) -> bool { - self.sync.status().is_offline() + /// Current global sync state. + pub fn sync_state(&self) -> SyncState { + self.sync.status().state + } + + /// Target sync block number. + pub fn best_seen_block(&self) -> Option> { + self.sync.status().best_seen_block + } + + /// Number of peers participating in syncing. + pub fn num_sync_peers(&self) -> u32 { + self.sync.status().num_peers } /// Starts a new data demand request. diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 591d5e4669..0478086222 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -194,22 +194,6 @@ pub struct Status { pub num_peers: u32, } -impl Status { - /// Whether the synchronization status is doing major downloading work or - /// is near the head of the chain. - pub fn is_major_syncing(&self) -> bool { - match self.state { - SyncState::Idle => false, - SyncState::Downloading => true, - } - } - - /// Are we all alone? - pub fn is_offline(&self) -> bool { - self.num_peers == 0 - } -} - impl ChainSync { /// Create a new instance. Pass the initial known state of the chain. pub(crate) fn new(role: Roles, info: &ClientInfo) -> Self { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 83ac5c9778..34cc9797f1 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -39,13 +39,12 @@ use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as Gos use crate::protocol::message::Message; use crate::protocol::on_demand::RequestData; use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; -use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut}; +use crate::protocol::{PeerInfo, NetworkOut}; +use crate::protocol::sync::SyncState; use crate::config::Params; use crate::error::Error; use crate::protocol::specialization::NetworkSpecialization; -/// Interval at which we send status updates on the status stream. -const STATUS_INTERVAL: Duration = Duration::from_millis(5000); /// Interval at which we update the `peers` field on the main thread. const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500); @@ -90,8 +89,6 @@ impl ReportHandle { /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService> { - /// Sinks to propagate status updates. - status_sinks: Arc>>>>, /// Are we connected to any peer? is_offline: Arc, /// Are we actively catching up with the chain? @@ -122,7 +119,6 @@ impl, H: ExHashT> NetworkWorker ) -> Result, Error> { let (network_chan, network_port) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); - let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); @@ -148,7 +144,6 @@ impl, H: ExHashT> NetworkWorker }; let service = Arc::new(NetworkService { - status_sinks: status_sinks.clone(), bandwidth, is_offline: is_offline.clone(), is_major_syncing: is_major_syncing.clone(), @@ -172,13 +167,46 @@ impl, H: ExHashT> NetworkWorker finality_proof_provider: params.finality_proof_provider, network_port, protocol_rx, - status_sinks, on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), - status_interval: tokio_timer::Interval::new_interval(STATUS_INTERVAL), connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL), }) } + /// Returns the downloaded bytes per second averaged over the past few seconds. + pub fn average_download_per_sec(&self) -> u64 { + self.service.bandwidth.average_download_per_sec() + } + + /// Returns the uploaded bytes per second averaged over the past few seconds. + pub fn average_upload_per_sec(&self) -> u64 { + self.service.bandwidth.average_upload_per_sec() + } + + /// Returns the number of peers we're connected to. + pub fn num_connected_peers(&self) -> usize { + self.protocol.num_connected_peers() + } + + /// Returns the number of peers we're connected to and that are being queried. + pub fn num_active_peers(&self) -> usize { + self.protocol.num_active_peers() + } + + /// Current global sync state. + pub fn sync_state(&self) -> SyncState { + self.protocol.sync_state() + } + + /// Target sync block number. + pub fn best_seen_block(&self) -> Option> { + self.protocol.best_seen_block() + } + + /// Number of peers participating in syncing. + pub fn num_sync_peers(&self) -> u32 { + self.protocol.num_sync_peers() + } + /// Return a `NetworkService` that can be shared through the code base and can be used to /// manipulate the worker. pub fn service(&self) -> &Arc> { @@ -187,16 +215,6 @@ impl, H: ExHashT> NetworkWorker } impl> NetworkService { - /// Returns the downloaded bytes per second averaged over the past few seconds. - pub fn average_download_per_sec(&self) -> u64 { - self.bandwidth.average_download_per_sec() - } - - /// Returns the uploaded bytes per second averaged over the past few seconds. - pub fn average_upload_per_sec(&self) -> u64 { - self.bandwidth.average_upload_per_sec() - } - /// Returns the network identity of the node. pub fn local_peer_id(&self) -> PeerId { Swarm::::local_peer_id(&*self.network.lock()).clone() @@ -287,13 +305,6 @@ impl> NetworkService { self.is_major_syncing.load(Ordering::Relaxed) } - /// Get sync status - pub fn status(&self) -> mpsc::UnboundedReceiver> { - let (sink, stream) = mpsc::unbounded(); - self.status_sinks.lock().push(sink); - stream - } - /// Get network state. pub fn network_state(&self) -> NetworkState { let mut swarm = self.network.lock(); @@ -497,12 +508,9 @@ pub struct NetworkWorker, H: Ex finality_proof_provider: Option>>, network_port: mpsc::UnboundedReceiver>, protocol_rx: mpsc::UnboundedReceiver>, - status_sinks: Arc>>>>, peerset: PeersetHandle, on_demand_in: Option>>, - /// Interval at which we send status updates on the `status_sinks`. - status_interval: tokio_timer::Interval, /// Interval at which we update the `connected_peers` Arc. connected_peers_interval: tokio_timer::Interval, } @@ -580,11 +588,6 @@ impl, H: ExHashT> Future for Ne } } - while let Ok(Async::Ready(_)) = self.status_interval.poll() { - let status = self.protocol.status(); - self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); - } - { let mut network_service = self.network_service.lock(); let mut link = NetworkLink { @@ -742,8 +745,11 @@ impl, H: ExHashT> Future for Ne } } - self.is_offline.store(self.protocol.is_offline(), Ordering::Relaxed); - self.is_major_syncing.store(self.protocol.is_major_syncing(), Ordering::Relaxed); + self.is_offline.store(self.protocol.num_connected_peers() == 0, Ordering::Relaxed); + self.is_major_syncing.store(match self.protocol.sync_state() { + SyncState::Idle => false, + SyncState::Downloading => true, + }, Ordering::Relaxed); Ok(Async::NotReady) } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 95646cd74b..9931826ae2 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -46,7 +46,8 @@ use crate::message::Message; use libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, Blake2Hasher}; -use crate::protocol::{Context, Protocol, ProtocolConfig, ProtocolStatus, CustomMessageOutcome, NetworkOut}; +use crate::SyncState; +use crate::protocol::{Context, Protocol, ProtocolConfig, CustomMessageOutcome, NetworkOut}; use runtime_primitives::generic::{BlockId, OpaqueDigestItemId}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; @@ -365,7 +366,8 @@ pub struct Peer> { /// instantiation paths or field names is too much hassle, hence /// we allow it to be unused. #[cfg_attr(not(test), allow(unused))] - protocol_status: Arc>>, + /// `(is_offline, is_major_syncing, num_peers)` + protocol_status: Arc>, import_queue: Arc>>>, pub data: D, best_hash: Mutex>, @@ -509,7 +511,7 @@ impl> ProtocolChannel { impl> Peer { fn new( - protocol_status: Arc>>, + protocol_status: Arc>, client: PeersClient, import_queue: Arc>>>, use_tokio: bool, @@ -560,19 +562,19 @@ impl> Peer { /// SyncOracle: are we connected to any peer? #[cfg(test)] fn is_offline(&self) -> bool { - self.protocol_status.read().sync.is_offline() + self.protocol_status.read().0 } /// SyncOracle: are we in the process of catching-up with the chain? #[cfg(test)] fn is_major_syncing(&self) -> bool { - self.protocol_status.read().sync.is_major_syncing() + self.protocol_status.read().1 } /// Get protocol status. #[cfg(test)] - fn protocol_status(&self) -> ProtocolStatus { - self.protocol_status.read().clone() + fn num_peers(&self) -> usize { + self.protocol_status.read().2 } /// Called on connection to other indicated peer. @@ -873,7 +875,7 @@ pub trait TestNetFactory: Sized { /// Add created peer. fn add_peer( &mut self, - protocol_status: Arc>>, + protocol_status: Arc>, import_queue: Arc>>>, tx_pool: EmptyTransactionPool, finality_proof_provider: Option>>, @@ -1009,7 +1011,11 @@ pub trait TestNetFactory: Sized { return Ok(Async::Ready(())) } - *protocol_status.write() = protocol.status(); + *protocol_status.write() = ( + protocol.num_connected_peers() == 0, + protocol.sync_state() == SyncState::Downloading, + protocol.num_connected_peers() + ); Ok(Async::NotReady) })); }); @@ -1054,7 +1060,7 @@ pub trait TestNetFactory: Sized { specialization, ).unwrap(); - let protocol_status = Arc::new(RwLock::new(protocol.status())); + let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), @@ -1110,7 +1116,7 @@ pub trait TestNetFactory: Sized { specialization, ).unwrap(); - let protocol_status = Arc::new(RwLock::new(protocol.status())); + let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 1e9e2948f3..15b866c168 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -45,7 +45,7 @@ fn sync_peers_works() { net.sync(); for peer in 0..3 { // Assert peers is up to date. - assert_eq!(net.peer(peer).protocol_status.read().num_peers, 2); + assert_eq!(net.peer(peer).num_peers(), 2); // And then disconnect. for other in 0..3 { if other != peer { @@ -56,8 +56,7 @@ fn sync_peers_works() { net.sync(); // Now peers are disconnected. for peer in 0..3 { - let status = net.peer(peer).protocol_status.read(); - assert_eq!(status.num_peers, 0); + assert_eq!(net.peer(peer).num_peers(), 0); } } @@ -433,7 +432,7 @@ fn can_not_sync_from_light_peer() { assert_eq!(net.peer(2).client.info().chain.best_number, 0); // and that the #1 is still connected to #2 // (because #2 has not tried to fetch block data from the #1 light node) - assert_eq!(net.peer(1).protocol_status().num_peers, 2); + assert_eq!(net.peer(1).num_peers(), 2); // and now try to fetch block data from light peer #1 // (this should result in disconnect) @@ -450,7 +449,7 @@ fn can_not_sync_from_light_peer() { ); net.sync(); // check that light #1 has disconnected from #2 - assert_eq!(net.peer(1).protocol_status().num_peers, 1); + assert_eq!(net.peer(1).num_peers(), 1); } #[test] diff --git a/substrate/core/service/Cargo.toml b/substrate/core/service/Cargo.toml index 21501286d4..5a81022362 100644 --- a/substrate/core/service/Cargo.toml +++ b/substrate/core/service/Cargo.toml @@ -12,6 +12,7 @@ lazy_static = "1.0" log = "0.4" slog = {version = "^2", features = ["nested-values"]} tokio = "0.1.7" +tokio-timer = "0.2" exit-future = "0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 580bee21dd..85a39cf6c2 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -28,6 +28,7 @@ pub mod error; use std::io; use std::net::SocketAddr; use std::collections::HashMap; +use std::time::Duration; use futures::sync::mpsc; use parking_lot::Mutex; @@ -39,7 +40,7 @@ use log::{info, warn, debug}; use parity_codec::{Encode, Decode}; use primitives::Pair; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Header, SaturatedConversion}; +use runtime_primitives::traits::{Header, NumberFor, SaturatedConversion}; use substrate_executor::NativeExecutor; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; @@ -75,6 +76,8 @@ pub struct Service { client: Arc>, select_chain: Option<::SelectChain>, network: Arc>, + /// Sinks to propagate network status updates. + network_status_sinks: Arc>>>>>, transaction_pool: Arc>, keystore: Keystore, exit: ::exit_future::Exit, @@ -204,8 +207,9 @@ impl Service { let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); + let network_status_sinks = Arc::new(Mutex::new(Vec::new())); - task_executor.spawn(network_mut + task_executor.spawn(build_network_future(network_mut, network_status_sinks.clone()) .map_err(|_| ()) .select(exit.clone()) .then(|_| Ok(()))); @@ -331,15 +335,17 @@ impl Service { let network_ = network.clone(); let mut sys = System::new(); let self_pid = get_current_pid(); - task_executor.spawn(network.status().for_each(move |sync_status| { + let (netstat_tx, netstat_rx) = mpsc::unbounded(); + network_status_sinks.lock().push(netstat_tx); + task_executor.spawn(netstat_rx.for_each(move |net_status| { let info = client_.info(); let best_number = info.chain.best_number.saturated_into::(); let best_hash = info.chain.best_hash; - let num_peers = sync_status.num_peers; + let num_peers = net_status.num_connected_peers; let txpool_status = transaction_pool_.status(); let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); - let bandwidth_download = network_.average_download_per_sec(); - let bandwidth_upload = network_.average_upload_per_sec(); + let bandwidth_download = net_status.average_download_per_sec; + let bandwidth_upload = net_status.average_upload_per_sec; #[allow(deprecated)] let backend = (*client_).backend(); @@ -447,6 +453,7 @@ impl Service { Ok(Service { client, network, + network_status_sinks, select_chain, transaction_pool, signal: Some(signal), @@ -495,6 +502,13 @@ impl Service where Components: components::Components { self.network.clone() } + /// Returns a receiver that periodically receives a status of the network. + pub fn network_status(&self) -> mpsc::UnboundedReceiver>> { + let (sink, stream) = mpsc::unbounded(); + self.network_status_sinks.lock().push(sink); + stream + } + /// Get shared transaction pool instance. pub fn transaction_pool(&self) -> Arc> { self.transaction_pool.clone() @@ -511,6 +525,57 @@ impl Service where Components: components::Components { } } +/// Builds a never-ending future that continuously polls the network. +/// +/// The `status_sink` contain a list of senders to send a periodic network status to. +fn build_network_future, H: network::ExHashT>( + mut network: network::NetworkWorker, + status_sinks: Arc>>>>, +) -> impl Future { + // Interval at which we send status updates on the status stream. + const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + + futures::future::poll_fn(move || { + while let Ok(Async::Ready(_)) = status_interval.poll() { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + + status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); + } + + network.poll() + .map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) + }) +} + +/// Overview status of the network. +#[derive(Clone)] +pub struct NetworkStatus { + /// Current global sync state. + pub sync_state: network::SyncState, + /// Target sync block number. + pub best_seen_block: Option>, + /// Number of peers participating in syncing. + pub num_sync_peers: u32, + /// Total number of connected peers + pub num_connected_peers: usize, + /// Total number of active peers. + pub num_active_peers: usize, + /// Downloaded bytes per second averaged over the past few seconds. + pub average_download_per_sec: u64, + /// Uploaded bytes per second averaged over the past few seconds. + pub average_upload_per_sec: u64, +} impl Drop for Service where Components: components::Components { fn drop(&mut self) {