diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 5115bf4d42..39ec3e8c26 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -7535,6 +7535,7 @@ version = "0.9.0" dependencies = [ "ansi_term 0.12.1", "futures 0.3.13", + "futures-timer 3.0.2", "log", "parity-util-mem", "sc-client-api", @@ -7542,7 +7543,6 @@ dependencies = [ "sp-blockchain", "sp-runtime", "sp-transaction-pool", - "sp-utils", "wasm-timer", ] diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index ed0a046335..f504904100 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -149,7 +149,7 @@ pub fn new_full(mut config: Configuration) -> Result config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config()); - let (network, network_status_sinks, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -199,7 +199,6 @@ pub fn new_full(mut config: Configuration) -> Result on_demand: None, remote_blockchain: None, backend, - network_status_sinks, system_rpc_tx, config, telemetry: telemetry.as_mut(), @@ -370,7 +369,7 @@ pub fn new_light(mut config: Configuration) -> Result }, )?; - let (network, network_status_sinks, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -418,7 +417,6 @@ pub fn new_light(mut config: Configuration) -> Result keystore: keystore_container.sync_keystore(), backend, network, - network_status_sinks, system_rpc_tx, telemetry: telemetry.as_mut(), })?; diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 6781402c94..42020e6668 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -204,7 +204,6 @@ pub struct NewFullBase { pub task_manager: TaskManager, pub client: Arc, pub network: Arc::Hash>>, - pub network_status_sinks: sc_service::NetworkStatusSinks, pub transaction_pool: Arc>, } @@ -242,7 +241,7 @@ pub fn new_full_base( ) ); - let (network, network_status_sinks, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -279,7 +278,6 @@ pub fn new_full_base( task_manager: &mut task_manager, on_demand: None, remote_blockchain: None, - network_status_sinks: network_status_sinks.clone(), system_rpc_tx, telemetry: telemetry.as_mut(), }, @@ -415,7 +413,6 @@ pub fn new_full_base( task_manager, client, network, - network_status_sinks, transaction_pool, }) } @@ -519,7 +516,7 @@ pub fn new_light_base( telemetry.as_ref().map(|x| x.handle()), )?; - let (network, network_status_sinks, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -576,7 +573,7 @@ pub fn new_light_base( client: client.clone(), transaction_pool: transaction_pool.clone(), keystore: keystore_container.sync_keystore(), - config, backend, network_status_sinks, system_rpc_tx, + config, backend, system_rpc_tx, network: network.clone(), task_manager: &mut task_manager, telemetry: telemetry.as_mut(), diff --git a/substrate/client/informant/Cargo.toml b/substrate/client/informant/Cargo.toml index d552a123c3..139a5ce19a 100644 --- a/substrate/client/informant/Cargo.toml +++ b/substrate/client/informant/Cargo.toml @@ -15,12 +15,12 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] ansi_term = "0.12.1" futures = "0.3.9" +futures-timer = "3.0.1" log = "0.4.8" parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } sc-client-api = { version = "3.0.0", path = "../api" } sc-network = { version = "0.9.0", path = "../network" } sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" } sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" } -sp-utils = { version = "3.0.0", path = "../../primitives/utils" } sp-transaction-pool = { version = "3.0.0", path = "../../primitives/transaction-pool" } wasm-timer = "0.2" diff --git a/substrate/client/informant/src/lib.rs b/substrate/client/informant/src/lib.rs index c955834c0f..ef1533fb49 100644 --- a/substrate/client/informant/src/lib.rs +++ b/substrate/client/informant/src/lib.rs @@ -20,18 +20,23 @@ use ansi_term::Colour; use futures::prelude::*; +use futures_timer::Delay; use log::{info, trace, warn}; use parity_util_mem::MallocSizeOf; use sc_client_api::{BlockchainEvents, UsageProvider}; -use sc_network::NetworkStatus; +use sc_network::NetworkService; use sp_blockchain::HeaderMetadata; use sp_runtime::traits::{Block as BlockT, Header}; use sp_transaction_pool::TransactionPool; -use sp_utils::{status_sinks, mpsc::tracing_unbounded}; use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque}; mod display; +/// Creates a stream that returns a new value every `duration`. +fn interval(duration: Duration) -> impl Stream + Unpin { + futures::stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop) +} + /// The format to print telemetry output in. #[derive(Clone, Debug)] pub struct OutputFormat { @@ -64,12 +69,12 @@ impl TransactionPoolAndMaybeMallogSizeOf for T {} impl TransactionPoolAndMaybeMallogSizeOf for T {} /// Builds the informant and returns a `Future` that drives the informant. -pub fn build( +pub async fn build( client: Arc, - network_status_sinks: Arc>>, + network: Arc::Hash>>, pool: Arc, format: OutputFormat, -) -> impl futures::Future +) where C: UsageProvider + HeaderMetadata + BlockchainEvents, >::Error: Display, @@ -77,10 +82,12 @@ where let mut display = display::InformantDisplay::new(format.clone()); let client_1 = client.clone(); - let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status"); - network_status_sinks.push(Duration::from_millis(5000), network_status_sink); - let display_notifications = network_status_stream + let display_notifications = interval(Duration::from_millis(5000)) + .filter_map(|_| async { + let status = network.status().await; + status.ok() + }) .for_each(move |net_status| { let info = client_1.usage_info(); if let Some(ref usage) = info.usage { @@ -101,10 +108,10 @@ where future::ready(()) }); - future::join( - display_notifications, - display_block_import(client), - ).map(|_| ()) + futures::select! { + () = display_notifications.fuse() => (), + () = display_block_import(client).fuse() => (), + }; } fn display_block_import(client: Arc) -> impl Future diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 9bcde11e4b..5dc550254f 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -888,6 +888,43 @@ impl NetworkService { }); } + /// High-level network status information. + /// + /// Returns an error if the `NetworkWorker` is no longer running. + pub async fn status(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkStatus { + pending_response: tx, + }); + + match rx.await { + Ok(v) => v.map_err(|_| ()), + // The channel can only be closed if the network worker no longer exists. + Err(_) => Err(()), + } + } + + /// Get network state. + /// + /// **Note**: Use this only for debugging. This API is unstable. There are warnings literally + /// everywhere about this. Please don't use this function to retrieve actual information. + /// + /// Returns an error if the `NetworkWorker` is no longer running. + pub async fn network_state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkState { + pending_response: tx, + }); + + match rx.await { + Ok(v) => v.map_err(|_| ()), + // The channel can only be closed if the network worker no longer exists. + Err(_) => Err(()), + } + } + /// You may call this when new transactons are imported by the transaction pool. /// /// All transactions will be fetched from the `TransactionPool` that was passed at @@ -1307,6 +1344,12 @@ enum ServiceToWorkerMsg { pending_response: oneshot::Sender, RequestFailure>>, connect: IfDisconnected, }, + NetworkStatus { + pending_response: oneshot::Sender, RequestFailure>>, + }, + NetworkState { + pending_response: oneshot::Sender>, + }, DisconnectPeer(PeerId, Cow<'static, str>), NewBestBlockImported(B::Hash, NumberFor), } @@ -1434,6 +1477,12 @@ impl Future for NetworkWorker { ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => { this.network_service.behaviour_mut().send_request(&target, &protocol, request, pending_response, connect); }, + ServiceToWorkerMsg::NetworkStatus { pending_response } => { + let _ = pending_response.send(Ok(this.status())); + }, + ServiceToWorkerMsg::NetworkState { pending_response } => { + let _ = pending_response.send(Ok(this.network_state())); + }, ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this.network_service.behaviour_mut().user_protocol_mut().disconnect_peer(&who, &protocol_name), ServiceToWorkerMsg::NewBestBlockImported(hash, number) => diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 45652524d4..ba56625274 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use crate::{ - error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks, + error::Error, MallocSizeOfWasm, RpcHandlers, start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle, metrics::MetricsService, client::{light, Client, ClientConfig}, @@ -519,8 +519,6 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub remote_blockchain: Option>>, /// A shared network instance. pub network: Arc::Hash>>, - /// Sinks to propagate network status updates. - pub network_status_sinks: NetworkStatusSinks, /// A Sender for RPC requests. pub system_rpc_tx: TracingUnboundedSender>, /// Telemetry instance for this node. @@ -590,7 +588,6 @@ pub fn spawn_tasks( rpc_extensions_builder, remote_blockchain, network, - network_status_sinks, system_rpc_tx, telemetry, } = params; @@ -654,7 +651,7 @@ pub fn spawn_tasks( metrics_service.run( client.clone(), transaction_pool.clone(), - network_status_sinks.clone() + network.clone(), ) ); @@ -679,7 +676,7 @@ pub fn spawn_tasks( // Spawn informant task spawn_handle.spawn("informant", sc_informant::build( client.clone(), - network_status_sinks.status.clone(), + network.clone(), transaction_pool.clone(), config.informant_output_format, )); @@ -865,7 +862,6 @@ pub fn build_network( ) -> Result< ( Arc::Hash>>, - NetworkStatusSinks, TracingUnboundedSender>, NetworkStarter, ), @@ -959,7 +955,6 @@ pub fn build_network( let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); - let network_status_sinks = NetworkStatusSinks::new(); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); @@ -967,7 +962,6 @@ pub fn build_network( config.role.clone(), network_mut, client, - network_status_sinks.clone(), system_rpc_rx, has_bootnodes, config.announce_block, @@ -1010,7 +1004,7 @@ pub fn build_network( future.await }); - Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx))) + Ok((network, system_rpc_tx, NetworkStarter(network_start_tx))) } /// Object used to start the network. diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 0e47b775e4..ae2cfbc8b8 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -37,17 +37,16 @@ mod task_manager; use std::{io, pin::Pin}; use std::net::SocketAddr; use std::collections::HashMap; -use std::time::Duration; use std::task::Poll; use futures::{Future, FutureExt, Stream, StreamExt, stream, compat::*}; -use sc_network::{NetworkStatus, network_state::NetworkState, PeerId}; +use sc_network::PeerId; use log::{warn, debug, error}; use codec::{Encode, Decode}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use parity_util_mem::MallocSizeOf; -use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver}}; +use sp_utils::mpsc::TracingUnboundedReceiver; pub use self::error::Error; pub use self::builder::{ @@ -124,42 +123,6 @@ impl RpcHandlers { } } -/// Sinks to propagate network status updates. -/// For each element, every time the `Interval` fires we push an element on the sender. -#[derive(Clone)] -pub struct NetworkStatusSinks { - status: Arc>>, - state: Arc>, -} - -impl NetworkStatusSinks { - fn new() -> Self { - Self { - status: Arc::new(status_sinks::StatusSinks::new()), - state: Arc::new(status_sinks::StatusSinks::new()), - } - } - - /// Returns a receiver that periodically yields a [`NetworkStatus`]. - pub fn status_stream(&self, interval: Duration) - -> TracingUnboundedReceiver> - { - let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.status.push(interval, sink); - stream - } - - /// Returns a receiver that periodically yields a [`NetworkState`]. - pub fn state_stream(&self, interval: Duration) - -> TracingUnboundedReceiver - { - let (sink, stream) = tracing_unbounded("mpsc_network_state"); - self.state.push(interval, sink); - stream - } - -} - /// An incomplete set of chain components, but enough to run the chain ops subcommands. pub struct PartialComponents { /// A shared client instance. @@ -191,7 +154,6 @@ async fn build_network_future< role: Role, mut network: sc_network::NetworkWorker, client: Arc, - status_sinks: NetworkStatusSinks, mut rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, announce_imported_blocks: bool, @@ -335,18 +297,6 @@ async fn build_network_future< // used in the future to perform actions in response of things that happened on // the network. _ = (&mut network).fuse() => {} - - // At a regular interval, we send high-level status as well as - // detailed state information of the network on what are called - // "status sinks". - - status_sink = status_sinks.status.next().fuse() => { - status_sink.send(network.status()); - } - - state_sink = status_sinks.state.next().fuse() => { - state_sink.send(network.network_state()); - } } } } diff --git a/substrate/client/service/src/metrics.rs b/substrate/client/service/src/metrics.rs index 43e5b8eaad..516fb24355 100644 --- a/substrate/client/service/src/metrics.rs +++ b/substrate/client/service/src/metrics.rs @@ -18,7 +18,7 @@ use std::{convert::TryFrom, time::SystemTime}; -use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration}; +use crate::config::Configuration; use futures_timer::Delay; use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec}; use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO}; @@ -26,9 +26,8 @@ use sp_api::ProvideRuntimeApi; use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto}; use sp_transaction_pool::{PoolStatus, MaintainedTransactionPool}; use sp_utils::metrics::register_globals; -use sp_utils::mpsc::TracingUnboundedReceiver; use sc_client_api::{ClientInfo, UsageProvider}; -use sc_network::config::Role; +use sc_network::{config::Role, NetworkStatus, NetworkService, network_state::NetworkState}; use std::sync::Arc; use std::time::Duration; use wasm_timer::Instant; @@ -163,7 +162,7 @@ impl MetricsService { mut self, client: Arc, transactions: Arc, - network: NetworkStatusSinks, + network: Arc::Hash>>, ) where TBl: Block, TCl: ProvideRuntimeApi + UsageProvider, @@ -172,33 +171,23 @@ impl MetricsService { let mut timer = Delay::new(Duration::from_secs(0)); let timer_interval = Duration::from_secs(5); - // Metric and telemetry update interval. - let net_status_interval = timer_interval; - let net_state_interval = Duration::from_secs(30); - - // Source of network information. - let mut net_status_rx = Some(network.status_stream(net_status_interval)); - let mut net_state_rx = Some(network.state_stream(net_state_interval)); + let net_state_duration = Duration::from_secs(30); + let mut last_net_state = Instant::now(); loop { // Wait for the next tick of the timer. (&mut timer).await; + let now = Instant::now(); + let from_net_state = now.duration_since(last_net_state); // Try to get the latest network information. - let mut net_status = None; - let mut net_state = None; - if let Some(rx) = net_status_rx.as_mut() { - match Self::latest(rx) { - Ok(status) => { net_status = status; } - Err(()) => { net_status_rx = None; } - } - } - if let Some(rx) = net_state_rx.as_mut() { - match Self::latest(rx) { - Ok(state) => { net_state = state; } - Err(()) => { net_state_rx = None; } - } - } + let net_status = network.status().await.ok(); + let net_state = if from_net_state >= net_state_duration { + last_net_state = now; + network.network_state().await.ok() + } else { + None + }; // Update / Send the metrics. self.update( @@ -213,25 +202,6 @@ impl MetricsService { } } - // Try to get the latest value from a receiver, dropping intermediate values. - fn latest(rx: &mut TracingUnboundedReceiver) -> Result, ()> { - let mut value = None; - - while let Ok(next) = rx.try_next() { - match next { - Some(v) => { - value = Some(v) - } - None => { - log::error!("Receiver closed unexpectedly."); - return Err(()) - } - } - } - - Ok(value) - } - fn update( &mut self, info: &ClientInfo, diff --git a/substrate/test-utils/test-runner/src/node.rs b/substrate/test-utils/test-runner/src/node.rs index 50c9c54ea1..ce41e5b5b5 100644 --- a/substrate/test-utils/test-runner/src/node.rs +++ b/substrate/test-utils/test-runner/src/node.rs @@ -138,7 +138,7 @@ impl Node { client.clone(), ); - let (network, network_status_sinks, system_rpc_tx, network_starter) = { + let (network, system_rpc_tx, network_starter) = { let params = BuildNetworkParams { config: &config, client: client.clone(), @@ -182,7 +182,6 @@ impl Node { rpc_extensions_builder: Box::new(move |_, _| jsonrpc_core::IoHandler::default()), remote_blockchain: None, network, - network_status_sinks, system_rpc_tx, telemetry: None };