* `NetworkStatusSinks`
* `sc_service::SpawnTasksParams::network_status_sinks`

Also:
* `sc_service::build_network()` does not return `network_status_sinks`
This commit is contained in:
Roman Proskuryakov
2021-05-27 12:54:37 +00:00
committed by GitHub
parent fa89414bba
commit 8dfb8cd978
10 changed files with 96 additions and 132 deletions
+4 -10
View File
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks,
error::Error, MallocSizeOfWasm, RpcHandlers,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
metrics::MetricsService,
client::{light, Client, ClientConfig},
@@ -519,8 +519,6 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A shared network instance.
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// Sinks to propagate network status updates.
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Telemetry instance for this node.
@@ -590,7 +588,6 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder,
remote_blockchain,
network,
network_status_sinks,
system_rpc_tx,
telemetry,
} = params;
@@ -654,7 +651,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network_status_sinks.clone()
network.clone(),
)
);
@@ -679,7 +676,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.status.clone(),
network.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
@@ -865,7 +862,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
NetworkStatusSinks<TBl>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
NetworkStarter,
),
@@ -959,7 +955,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = NetworkStatusSinks::new();
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
@@ -967,7 +962,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
config.role.clone(),
network_mut,
client,
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
@@ -1010,7 +1004,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
future.await
});
Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
Ok((network, system_rpc_tx, NetworkStarter(network_start_tx)))
}
/// Object used to start the network.
+2 -52
View File
@@ -37,17 +37,16 @@ mod task_manager;
use std::{io, pin::Pin};
use std::net::SocketAddr;
use std::collections::HashMap;
use std::time::Duration;
use std::task::Poll;
use futures::{Future, FutureExt, Stream, StreamExt, stream, compat::*};
use sc_network::{NetworkStatus, network_state::NetworkState, PeerId};
use sc_network::PeerId;
use log::{warn, debug, error};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use parity_util_mem::MallocSizeOf;
use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver}};
use sp_utils::mpsc::TracingUnboundedReceiver;
pub use self::error::Error;
pub use self::builder::{
@@ -124,42 +123,6 @@ impl RpcHandlers {
}
}
/// Sinks to propagate network status updates.
/// For each element, every time the `Interval` fires we push an element on the sender.
#[derive(Clone)]
pub struct NetworkStatusSinks<Block: BlockT> {
status: Arc<status_sinks::StatusSinks<NetworkStatus<Block>>>,
state: Arc<status_sinks::StatusSinks<NetworkState>>,
}
impl<Block: BlockT> NetworkStatusSinks<Block> {
fn new() -> Self {
Self {
status: Arc::new(status_sinks::StatusSinks::new()),
state: Arc::new(status_sinks::StatusSinks::new()),
}
}
/// Returns a receiver that periodically yields a [`NetworkStatus`].
pub fn status_stream(&self, interval: Duration)
-> TracingUnboundedReceiver<NetworkStatus<Block>>
{
let (sink, stream) = tracing_unbounded("mpsc_network_status");
self.status.push(interval, sink);
stream
}
/// Returns a receiver that periodically yields a [`NetworkState`].
pub fn state_stream(&self, interval: Duration)
-> TracingUnboundedReceiver<NetworkState>
{
let (sink, stream) = tracing_unbounded("mpsc_network_state");
self.state.push(interval, sink);
stream
}
}
/// An incomplete set of chain components, but enough to run the chain ops subcommands.
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
/// A shared client instance.
@@ -191,7 +154,6 @@ async fn build_network_future<
role: Role,
mut network: sc_network::NetworkWorker<B, H>,
client: Arc<C>,
status_sinks: NetworkStatusSinks<B>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
announce_imported_blocks: bool,
@@ -335,18 +297,6 @@ async fn build_network_future<
// used in the future to perform actions in response of things that happened on
// the network.
_ = (&mut network).fuse() => {}
// At a regular interval, we send high-level status as well as
// detailed state information of the network on what are called
// "status sinks".
status_sink = status_sinks.status.next().fuse() => {
status_sink.send(network.status());
}
state_sink = status_sinks.state.next().fuse() => {
state_sink.send(network.network_state());
}
}
}
}
+14 -44
View File
@@ -18,7 +18,7 @@
use std::{convert::TryFrom, time::SystemTime};
use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration};
use crate::config::Configuration;
use futures_timer::Delay;
use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec};
use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
@@ -26,9 +26,8 @@ use sp_api::ProvideRuntimeApi;
use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto};
use sp_transaction_pool::{PoolStatus, MaintainedTransactionPool};
use sp_utils::metrics::register_globals;
use sp_utils::mpsc::TracingUnboundedReceiver;
use sc_client_api::{ClientInfo, UsageProvider};
use sc_network::config::Role;
use sc_network::{config::Role, NetworkStatus, NetworkService, network_state::NetworkState};
use std::sync::Arc;
use std::time::Duration;
use wasm_timer::Instant;
@@ -163,7 +162,7 @@ impl MetricsService {
mut self,
client: Arc<TCl>,
transactions: Arc<TExPool>,
network: NetworkStatusSinks<TBl>,
network: Arc<NetworkService<TBl, <TBl as Block>::Hash>>,
) where
TBl: Block,
TCl: ProvideRuntimeApi<TBl> + UsageProvider<TBl>,
@@ -172,33 +171,23 @@ impl MetricsService {
let mut timer = Delay::new(Duration::from_secs(0));
let timer_interval = Duration::from_secs(5);
// Metric and telemetry update interval.
let net_status_interval = timer_interval;
let net_state_interval = Duration::from_secs(30);
// Source of network information.
let mut net_status_rx = Some(network.status_stream(net_status_interval));
let mut net_state_rx = Some(network.state_stream(net_state_interval));
let net_state_duration = Duration::from_secs(30);
let mut last_net_state = Instant::now();
loop {
// Wait for the next tick of the timer.
(&mut timer).await;
let now = Instant::now();
let from_net_state = now.duration_since(last_net_state);
// Try to get the latest network information.
let mut net_status = None;
let mut net_state = None;
if let Some(rx) = net_status_rx.as_mut() {
match Self::latest(rx) {
Ok(status) => { net_status = status; }
Err(()) => { net_status_rx = None; }
}
}
if let Some(rx) = net_state_rx.as_mut() {
match Self::latest(rx) {
Ok(state) => { net_state = state; }
Err(()) => { net_state_rx = None; }
}
}
let net_status = network.status().await.ok();
let net_state = if from_net_state >= net_state_duration {
last_net_state = now;
network.network_state().await.ok()
} else {
None
};
// Update / Send the metrics.
self.update(
@@ -213,25 +202,6 @@ impl MetricsService {
}
}
// Try to get the latest value from a receiver, dropping intermediate values.
fn latest<T>(rx: &mut TracingUnboundedReceiver<T>) -> Result<Option<T>, ()> {
let mut value = None;
while let Ok(next) = rx.try_next() {
match next {
Some(v) => {
value = Some(v)
}
None => {
log::error!("Receiver closed unexpectedly.");
return Err(())
}
}
}
Ok(value)
}
fn update<T: Block>(
&mut self,
info: &ClientInfo<T>,