mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 14:11:09 +00:00
Additional Metrics collected and exposed via prometheus (#5414)
This PR refactors the metrics measuring and Prometheus exposing entity in sc-service into its own submodule and extends the parameters it exposes by: - system load average (over one, five and 15min) - the TCP connection state of the process (lsof), refs #5304 - number of tokio threads - number of known forks - counter for items in each unbounded queue (with internal unbounded channels) - number of file descriptors opened by this process (*nix only at this point) - number of system threads (*nix only at this point) refs #4679 Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Ashley <ashley.ruglys@gmail.com>
This commit is contained in:
committed by
GitHub
parent
6847f8452e
commit
247822bb33
Generated
+366
-237
File diff suppressed because it is too large
Load Diff
@@ -157,6 +157,7 @@ members = [
|
||||
"primitives/test-primitives",
|
||||
"primitives/transaction-pool",
|
||||
"primitives/trie",
|
||||
"primitives/utils",
|
||||
"primitives/wasm-interface",
|
||||
"test-utils/client",
|
||||
"test-utils/runtime",
|
||||
|
||||
@@ -30,6 +30,7 @@ sp-std = { version = "2.0.0-alpha.5", path = "../primitives/std" }
|
||||
sp-version = { version = "2.0.0-alpha.5", path = "../primitives/version" }
|
||||
sp-api = { version = "2.0.0-alpha.5", path = "../primitives/api" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../primitives/utils" }
|
||||
sp-blockchain = { version = "2.0.0-alpha.5", path = "../primitives/blockchain" }
|
||||
sp-state-machine = { version = "0.8.0-alpha.5", path = "../primitives/state-machine" }
|
||||
sc-telemetry = { version = "2.0.0-alpha.5", path = "telemetry" }
|
||||
|
||||
@@ -26,10 +26,12 @@ sp-keyring = { version = "2.0.0-alpha.5", path = "../../primitives/keyring" }
|
||||
kvdb = "0.5.0"
|
||||
log = { version = "0.4.8" }
|
||||
parking_lot = "0.10.0"
|
||||
lazy_static = "1.4.0"
|
||||
sp-core = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/core" }
|
||||
sp-std = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/std" }
|
||||
sp-version = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/version" }
|
||||
sp-api = { version = "2.0.0-alpha.5", path = "../../primitives/api" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/runtime" }
|
||||
sp-state-machine = { version = "0.8.0-alpha.5", path = "../../primitives/state-machine" }
|
||||
sc-telemetry = { version = "2.0.0-alpha.5", path = "../telemetry" }
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
//! A set of APIs supported by the client along with their primitives.
|
||||
|
||||
use std::{fmt, collections::HashSet};
|
||||
use futures::channel::mpsc;
|
||||
use sp_core::storage::StorageKey;
|
||||
use sp_runtime::{
|
||||
traits::{Block as BlockT, NumberFor},
|
||||
@@ -28,13 +27,14 @@ use sp_consensus::BlockOrigin;
|
||||
|
||||
use crate::blockchain::Info;
|
||||
use crate::notifications::StorageEventStream;
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
use sp_blockchain;
|
||||
|
||||
/// Type that implements `futures::Stream` of block import events.
|
||||
pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
|
||||
pub type ImportNotifications<Block> = TracingUnboundedReceiver<BlockImportNotification<Block>>;
|
||||
|
||||
/// A stream of block finality notifications.
|
||||
pub type FinalityNotifications<Block> = mpsc::UnboundedReceiver<FinalityNotification<Block>>;
|
||||
pub type FinalityNotifications<Block> = TracingUnboundedReceiver<FinalityNotification<Block>>;
|
||||
|
||||
/// Expected hashes of blocks at given heights.
|
||||
///
|
||||
|
||||
@@ -22,9 +22,9 @@ use std::{
|
||||
};
|
||||
|
||||
use fnv::{FnvHashSet, FnvHashMap};
|
||||
use futures::channel::mpsc;
|
||||
use sp_core::storage::{StorageKey, StorageData};
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
|
||||
|
||||
/// Storage change set
|
||||
#[derive(Debug)]
|
||||
@@ -67,7 +67,7 @@ impl StorageChangeSet {
|
||||
}
|
||||
|
||||
/// Type that implements `futures::Stream` of storage change events.
|
||||
pub type StorageEventStream<H> = mpsc::UnboundedReceiver<(H, StorageChangeSet)>;
|
||||
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
|
||||
|
||||
type SubscriberId = u64;
|
||||
|
||||
@@ -82,7 +82,7 @@ pub struct StorageNotifications<Block: BlockT> {
|
||||
FnvHashSet<SubscriberId>
|
||||
)>,
|
||||
sinks: FnvHashMap<SubscriberId, (
|
||||
mpsc::UnboundedSender<(Block::Hash, StorageChangeSet)>,
|
||||
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
|
||||
Option<HashSet<StorageKey>>,
|
||||
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
|
||||
)>,
|
||||
@@ -299,7 +299,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
|
||||
|
||||
// insert sink
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items");
|
||||
self.sinks.insert(current_id, (tx, keys, child_keys));
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -119,6 +119,7 @@ impl<Block: BlockT> HeaderBackend<Block> for TestApi {
|
||||
finalized_hash: Default::default(),
|
||||
finalized_number: Zero::zero(),
|
||||
genesis_hash: Default::default(),
|
||||
number_leaves: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ sc-client-api = { version = "2.0.0-alpha.5", path = "../api" }
|
||||
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" }
|
||||
sc-network = { version = "0.8.0-alpha.5", path = "../network" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
|
||||
sc-service = { version = "0.8.0-alpha.5", default-features = false, path = "../service" }
|
||||
sp-state-machine = { version = "0.8.0-alpha.5", path = "../../primitives/state-machine" }
|
||||
|
||||
@@ -20,6 +20,7 @@ use futures::{Future, future, future::FutureExt};
|
||||
use futures::select;
|
||||
use futures::pin_mut;
|
||||
use sc_service::{AbstractService, Configuration};
|
||||
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
|
||||
use crate::error;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
@@ -73,6 +74,13 @@ fn build_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
tokio::runtime::Builder::new()
|
||||
.thread_name("main-tokio-")
|
||||
.threaded_scheduler()
|
||||
.on_thread_start(||{
|
||||
TOKIO_THREADS_ALIVE.inc();
|
||||
TOKIO_THREADS_TOTAL.inc();
|
||||
})
|
||||
.on_thread_stop(||{
|
||||
TOKIO_THREADS_ALIVE.dec();
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
}
|
||||
|
||||
@@ -425,6 +425,7 @@ impl<Block: BlockT> sc_client::blockchain::HeaderBackend<Block> for BlockchainDb
|
||||
genesis_hash: meta.genesis_hash,
|
||||
finalized_hash: meta.finalized_hash,
|
||||
finalized_number: meta.finalized_number,
|
||||
number_leaves: self.leaves.read().count(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -153,6 +153,7 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
|
||||
genesis_hash: meta.genesis_hash,
|
||||
finalized_hash: meta.finalized_hash,
|
||||
finalized_number: meta.finalized_number,
|
||||
number_leaves: 1,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ assert_matches = "1.3.0"
|
||||
parity-scale-codec = { version = "1.3.0", features = ["derive"] }
|
||||
sp-arithmetic = { version = "2.0.0-alpha.5", path = "../../primitives/arithmetic" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-consensus = { version = "0.8.0-alpha.5", path = "../../primitives/consensus/common" }
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
|
||||
sp-api = { version = "2.0.0-alpha.5", path = "../../primitives/api" }
|
||||
|
||||
@@ -90,7 +90,7 @@ use sp_finality_grandpa::AuthorityId;
|
||||
|
||||
use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
|
||||
use log::{trace, debug};
|
||||
use futures::channel::mpsc;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use prometheus_endpoint::{CounterVec, Opts, PrometheusError, register, Registry, U64};
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
@@ -1254,7 +1254,7 @@ impl Metrics {
|
||||
pub(super) struct GossipValidator<Block: BlockT> {
|
||||
inner: parking_lot::RwLock<Inner<Block>>,
|
||||
set_state: environment::SharedVoterSetState<Block>,
|
||||
report_sender: mpsc::UnboundedSender<PeerReport>,
|
||||
report_sender: TracingUnboundedSender<PeerReport>,
|
||||
metrics: Option<Metrics>,
|
||||
}
|
||||
|
||||
@@ -1266,7 +1266,7 @@ impl<Block: BlockT> GossipValidator<Block> {
|
||||
config: crate::Config,
|
||||
set_state: environment::SharedVoterSetState<Block>,
|
||||
prometheus_registry: Option<&Registry>,
|
||||
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
|
||||
) -> (GossipValidator<Block>, TracingUnboundedReceiver<PeerReport>) {
|
||||
let metrics = match prometheus_registry.map(Metrics::register) {
|
||||
Some(Ok(metrics)) => Some(metrics),
|
||||
Some(Err(e)) => {
|
||||
@@ -1276,7 +1276,7 @@ impl<Block: BlockT> GossipValidator<Block> {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator");
|
||||
let val = GossipValidator {
|
||||
inner: parking_lot::RwLock::new(Inner::new(config)),
|
||||
set_state,
|
||||
|
||||
@@ -58,6 +58,7 @@ use gossip::{
|
||||
use sp_finality_grandpa::{
|
||||
AuthorityPair, AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber,
|
||||
};
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
|
||||
pub mod gossip;
|
||||
mod periodic;
|
||||
@@ -165,7 +166,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
|
||||
// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
|
||||
// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
|
||||
// channel implementation.
|
||||
gossip_validator_report_stream: Arc<Mutex<mpsc::UnboundedReceiver<PeerReport>>>,
|
||||
gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
|
||||
|
||||
@@ -17,9 +17,10 @@
|
||||
//! Periodic rebroadcast of neighbor packets.
|
||||
|
||||
use futures_timer::Delay;
|
||||
use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
|
||||
use futures::{future::{FutureExt as _}, prelude::*, ready, stream::Stream};
|
||||
use log::debug;
|
||||
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
|
||||
use sc_network::PeerId;
|
||||
use sp_runtime::traits::{NumberFor, Block as BlockT};
|
||||
@@ -31,7 +32,7 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
|
||||
/// A sender used to send neighbor packets to a background job.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct NeighborPacketSender<B: BlockT>(
|
||||
mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
|
||||
TracingUnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
|
||||
);
|
||||
|
||||
impl<B: BlockT> NeighborPacketSender<B> {
|
||||
@@ -54,14 +55,15 @@ impl<B: BlockT> NeighborPacketSender<B> {
|
||||
pub(super) struct NeighborPacketWorker<B: BlockT> {
|
||||
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
|
||||
delay: Delay,
|
||||
rx: mpsc::UnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
|
||||
rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
|
||||
|
||||
impl<B: BlockT> NeighborPacketWorker<B> {
|
||||
pub(super) fn new() -> (Self, NeighborPacketSender<B>){
|
||||
let (tx, rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
|
||||
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
|
||||
("mpsc_grandpa_neighbor_packet_worker");
|
||||
let delay = Delay::new(REBROADCAST_AFTER);
|
||||
|
||||
(NeighborPacketWorker {
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
//! Tests for the communication portion of the GRANDPA crate.
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use futures::prelude::*;
|
||||
use sc_network::{Event as NetworkEvent, ObservedRole, PeerId};
|
||||
use sc_network_test::{Block, Hash};
|
||||
@@ -33,7 +33,7 @@ use super::{AuthorityId, VoterSet, Round, SetId};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Event {
|
||||
EventStream(mpsc::UnboundedSender<NetworkEvent>),
|
||||
EventStream(TracingUnboundedSender<NetworkEvent>),
|
||||
WriteNotification(sc_network::PeerId, Vec<u8>),
|
||||
Report(sc_network::PeerId, sc_network::ReputationChange),
|
||||
Announce(Hash),
|
||||
@@ -41,12 +41,12 @@ pub(crate) enum Event {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct TestNetwork {
|
||||
sender: mpsc::UnboundedSender<Event>,
|
||||
sender: TracingUnboundedSender<Event>,
|
||||
}
|
||||
|
||||
impl sc_network_gossip::Network<Block> for TestNetwork {
|
||||
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("test");
|
||||
let _ = self.sender.unbounded_send(Event::EventStream(tx));
|
||||
Box::pin(rx)
|
||||
}
|
||||
@@ -97,7 +97,7 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
|
||||
pub(crate) struct Tester {
|
||||
pub(crate) net_handle: super::NetworkBridge<Block, TestNetwork>,
|
||||
gossip_validator: Arc<GossipValidator<Block>>,
|
||||
pub(crate) events: mpsc::UnboundedReceiver<Event>,
|
||||
pub(crate) events: TracingUnboundedReceiver<Event>,
|
||||
}
|
||||
|
||||
impl Tester {
|
||||
@@ -161,7 +161,7 @@ pub(crate) fn make_test_network() -> (
|
||||
impl Future<Output = Tester>,
|
||||
TestNetwork,
|
||||
) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("test");
|
||||
let net = TestNetwork { sender: tx };
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -18,11 +18,11 @@ use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use log::{debug, trace, info};
|
||||
use parity_scale_codec::Encode;
|
||||
use futures::channel::mpsc;
|
||||
use parking_lot::RwLockWriteGuard;
|
||||
|
||||
use sp_blockchain::{BlockStatus, well_known_cache_keys};
|
||||
use sc_client_api::{backend::Backend, utils::is_descendent_of};
|
||||
use sp_utils::mpsc::TracingUnboundedSender;
|
||||
use sp_api::{TransactionFor};
|
||||
|
||||
use sp_consensus::{
|
||||
@@ -57,7 +57,7 @@ pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
|
||||
inner: Arc<Client>,
|
||||
select_chain: SC,
|
||||
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||
send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
|
||||
authority_set_hard_forks: HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>,
|
||||
_phantom: PhantomData<Backend>,
|
||||
@@ -536,7 +536,7 @@ impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Clie
|
||||
inner: Arc<Client>,
|
||||
select_chain: SC,
|
||||
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||
send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
|
||||
authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
|
||||
) -> GrandpaBlockImport<Backend, Block, Client, SC> {
|
||||
|
||||
@@ -55,7 +55,6 @@
|
||||
use futures::prelude::*;
|
||||
use futures::StreamExt;
|
||||
use log::{debug, info};
|
||||
use futures::channel::mpsc;
|
||||
use sc_client_api::{
|
||||
backend::{AuxStore, Backend},
|
||||
LockImportRun, BlockchainEvents, CallExecutor,
|
||||
@@ -70,6 +69,7 @@ use sc_keystore::KeyStorePtr;
|
||||
use sp_inherents::InherentDataProviders;
|
||||
use sp_consensus::{SelectChain, BlockImport};
|
||||
use sp_core::Pair;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
|
||||
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG};
|
||||
use serde_json;
|
||||
|
||||
@@ -379,7 +379,7 @@ pub struct LinkHalf<Block: BlockT, C, SC> {
|
||||
client: Arc<C>,
|
||||
select_chain: SC,
|
||||
persistent_data: PersistentData<Block>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
}
|
||||
|
||||
/// Provider for the Grandpa authority set configured on the genesis block.
|
||||
@@ -476,7 +476,7 @@ where
|
||||
}
|
||||
)?;
|
||||
|
||||
let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded();
|
||||
let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");
|
||||
|
||||
// create pending change objects with 0 delay and enacted on finality
|
||||
// (i.e. standard changes) for each authority set hard fork.
|
||||
@@ -598,7 +598,7 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
|
||||
/// The inherent data providers.
|
||||
pub inherent_data_providers: InherentDataProviders,
|
||||
/// If supplied, can be used to hook on telemetry connection established events.
|
||||
pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>,
|
||||
pub telemetry_on_connect: Option<TracingUnboundedReceiver<()>>,
|
||||
/// A voting rule used to potentially restrict target votes.
|
||||
pub voting_rule: VR,
|
||||
/// The prometheus metrics registry.
|
||||
@@ -718,7 +718,7 @@ impl Metrics {
|
||||
struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
|
||||
voter: Pin<Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>>,
|
||||
env: Arc<Environment<B, Block, C, N, SC, VR>>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
network: NetworkBridge<Block, N>,
|
||||
|
||||
/// Prometheus metrics.
|
||||
@@ -742,7 +742,7 @@ where
|
||||
select_chain: SC,
|
||||
voting_rule: VR,
|
||||
persistent_data: PersistentData<Block>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
prometheus_registry: Option<prometheus_endpoint::Registry>,
|
||||
) -> Self {
|
||||
let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
|
||||
use finality_grandpa::{
|
||||
BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet
|
||||
@@ -27,8 +27,10 @@ use log::{debug, info, warn};
|
||||
|
||||
use sp_consensus::SelectChain;
|
||||
use sc_client_api::backend::Backend;
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
use sp_runtime::traits::{NumberFor, Block as BlockT};
|
||||
use sp_blockchain::HeaderMetadata;
|
||||
|
||||
use crate::{
|
||||
global_communication, CommandOrError, CommunicationIn, Config, environment,
|
||||
LinkHalf, Error, aux_schema::PersistentData, VoterCommand, VoterSetState,
|
||||
@@ -206,7 +208,7 @@ struct ObserverWork<B: BlockT, BE, Client, N: NetworkT<B>> {
|
||||
network: NetworkBridge<B, N>,
|
||||
persistent_data: PersistentData<B>,
|
||||
keystore: Option<sc_keystore::KeyStorePtr>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
|
||||
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
|
||||
_phantom: PhantomData<BE>,
|
||||
}
|
||||
|
||||
@@ -223,7 +225,7 @@ where
|
||||
network: NetworkBridge<B, Network>,
|
||||
persistent_data: PersistentData<B>,
|
||||
keystore: Option<sc_keystore::KeyStorePtr>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
|
||||
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
|
||||
) -> Self {
|
||||
|
||||
let mut work = ObserverWork {
|
||||
@@ -376,6 +378,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use sp_utils::mpsc::tracing_unbounded;
|
||||
use crate::{aux_schema, communication::tests::{Event, make_test_network}};
|
||||
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
|
||||
use sc_network::PeerId;
|
||||
@@ -412,7 +415,7 @@ mod tests {
|
||||
|| Ok(vec![]),
|
||||
).unwrap();
|
||||
|
||||
let (_tx, voter_command_rx) = mpsc::unbounded();
|
||||
let (_tx, voter_command_rx) = tracing_unbounded("");
|
||||
let observer = ObserverWork::new(
|
||||
client,
|
||||
tester.net_handle.clone(),
|
||||
|
||||
@@ -993,7 +993,7 @@ fn voter_persists_its_votes() {
|
||||
use std::iter::FromIterator;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use futures::future;
|
||||
use futures::channel::mpsc;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
|
||||
|
||||
let _ = env_logger::try_init();
|
||||
let mut runtime = Runtime::new().unwrap();
|
||||
@@ -1018,7 +1018,7 @@ fn voter_persists_its_votes() {
|
||||
|
||||
// channel between the voter and the main controller.
|
||||
// sending a message on the `voter_tx` restarts the voter.
|
||||
let (voter_tx, voter_rx) = mpsc::unbounded::<()>();
|
||||
let (voter_tx, voter_rx) = tracing_unbounded::<()>("");
|
||||
|
||||
let mut keystore_paths = Vec::new();
|
||||
|
||||
@@ -1031,7 +1031,7 @@ fn voter_persists_its_votes() {
|
||||
|
||||
struct ResettableVoter {
|
||||
voter: Pin<Box<dyn Future<Output = ()> + Send + Unpin>>,
|
||||
voter_rx: mpsc::UnboundedReceiver<()>,
|
||||
voter_rx: TracingUnboundedReceiver<()>,
|
||||
net: Arc<Mutex<GrandpaTestNet>>,
|
||||
client: PeersClient,
|
||||
keystore: KeyStorePtr,
|
||||
|
||||
@@ -29,10 +29,10 @@ use super::{
|
||||
};
|
||||
|
||||
use log::{debug, warn};
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
use futures::prelude::*;
|
||||
use futures::stream::Fuse;
|
||||
use futures_timer::Delay;
|
||||
use futures::channel::mpsc::UnboundedReceiver;
|
||||
use finality_grandpa::voter;
|
||||
use parking_lot::Mutex;
|
||||
use prometheus_endpoint::{
|
||||
@@ -140,7 +140,7 @@ impl Drop for Metrics {
|
||||
/// Buffering imported messages until blocks with given hashes are imported.
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
|
||||
import_notifications: Fuse<UnboundedReceiver<BlockImportNotification<Block>>>,
|
||||
import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
|
||||
block_sync_requester: BlockSyncRequester,
|
||||
status_check: BlockStatus,
|
||||
#[pin]
|
||||
@@ -541,18 +541,18 @@ mod tests {
|
||||
use sc_client_api::BlockImportNotification;
|
||||
use futures::future::Either;
|
||||
use futures_timer::Delay;
|
||||
use futures::channel::mpsc;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
|
||||
use finality_grandpa::Precommit;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestChainState {
|
||||
sender: mpsc::UnboundedSender<BlockImportNotification<Block>>,
|
||||
sender: TracingUnboundedSender<BlockImportNotification<Block>>,
|
||||
known_blocks: Arc<Mutex<HashMap<Hash, u64>>>,
|
||||
}
|
||||
|
||||
impl TestChainState {
|
||||
fn new() -> (Self, ImportNotifications<Block>) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("test");
|
||||
let state = TestChainState {
|
||||
sender: tx,
|
||||
known_blocks: Arc::new(Mutex::new(HashMap::new())),
|
||||
@@ -649,7 +649,7 @@ mod tests {
|
||||
// enact all dependencies before importing the message
|
||||
enact_dependencies(&chain_state);
|
||||
|
||||
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
|
||||
let (global_tx, global_rx) = tracing_unbounded("test");
|
||||
|
||||
let until_imported = UntilGlobalMessageBlocksImported::new(
|
||||
import_notifications,
|
||||
@@ -676,7 +676,7 @@ mod tests {
|
||||
let (chain_state, import_notifications) = TestChainState::new();
|
||||
let block_status = chain_state.block_status();
|
||||
|
||||
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
|
||||
let (global_tx, global_rx) = tracing_unbounded("test");
|
||||
|
||||
let until_imported = UntilGlobalMessageBlocksImported::new(
|
||||
import_notifications,
|
||||
@@ -929,7 +929,7 @@ mod tests {
|
||||
let (chain_state, import_notifications) = TestChainState::new();
|
||||
let block_status = chain_state.block_status();
|
||||
|
||||
let (global_tx, global_rx) = futures::channel::mpsc::unbounded();
|
||||
let (global_tx, global_rx) = tracing_unbounded("test");
|
||||
|
||||
let block_sync_requester = TestBlockSyncRequester::default();
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ log = "0.4.8"
|
||||
lru = "0.4.3"
|
||||
sc-network = { version = "0.8.0-alpha.5", path = "../network" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
wasm-timer = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -19,10 +19,11 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA
|
||||
|
||||
use sc_network::{Event, ReputationChange};
|
||||
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
use libp2p::PeerId;
|
||||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
|
||||
use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}};
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
|
||||
/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
|
||||
/// top of it.
|
||||
@@ -86,7 +87,7 @@ impl<B: BlockT> GossipEngine<B> {
|
||||
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
|
||||
pub fn messages_for(&mut self, topic: B::Hash)
|
||||
-> mpsc::UnboundedReceiver<TopicNotification>
|
||||
-> TracingUnboundedReceiver<TopicNotification>
|
||||
{
|
||||
self.state_machine.messages_for(self.engine_id, topic)
|
||||
}
|
||||
|
||||
@@ -21,10 +21,10 @@ use std::sync::Arc;
|
||||
use std::iter;
|
||||
use std::time;
|
||||
use log::trace;
|
||||
use futures::channel::mpsc;
|
||||
use lru::LruCache;
|
||||
use libp2p::PeerId;
|
||||
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use sp_runtime::ConsensusEngineId;
|
||||
use sc_network::ObservedRole;
|
||||
use wasm_timer::Instant;
|
||||
@@ -164,7 +164,7 @@ fn propagate<'a, B: BlockT, I>(
|
||||
/// Consensus network protocol handler. Manages statements and candidate requests.
|
||||
pub struct ConsensusGossip<B: BlockT> {
|
||||
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
|
||||
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<TopicNotification>>>,
|
||||
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<TracingUnboundedSender<TopicNotification>>>,
|
||||
messages: Vec<MessageEntry<B>>,
|
||||
known_messages: LruCache<B::Hash, ()>,
|
||||
validators: HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
|
||||
@@ -333,9 +333,9 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
|
||||
pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash)
|
||||
-> mpsc::UnboundedReceiver<TopicNotification>
|
||||
-> TracingUnboundedReceiver<TopicNotification>
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
|
||||
for entry in self.messages.iter_mut()
|
||||
.filter(|e| e.topic == topic && e.engine_id == engine_id)
|
||||
{
|
||||
|
||||
@@ -47,6 +47,7 @@ slog = { version = "2.5.2", features = ["nested-values"] }
|
||||
slog_derive = "0.2.0"
|
||||
smallvec = "0.6.10"
|
||||
sp-arithmetic = { version = "2.0.0-alpha.5", path = "../../primitives/arithmetic" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" }
|
||||
sp-consensus = { version = "0.8.0-alpha.5", path = "../../primitives/consensus/common" }
|
||||
sp-consensus-babe = { version = "0.8.0-alpha.5", path = "../../primitives/consensus/babe" }
|
||||
|
||||
@@ -18,12 +18,13 @@
|
||||
|
||||
use crate::protocol::light_client_handler;
|
||||
|
||||
use futures::{channel::mpsc, channel::oneshot, prelude::*};
|
||||
use futures::{channel::oneshot, prelude::*};
|
||||
use parking_lot::Mutex;
|
||||
use sc_client_api::{
|
||||
FetchChecker, Fetcher, RemoteBodyRequest, RemoteCallRequest, RemoteChangesRequest,
|
||||
RemoteHeaderRequest, RemoteReadChildRequest, RemoteReadRequest, StorageProof, ChangesProof,
|
||||
};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use sp_blockchain::Error as ClientError;
|
||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Context, task::Poll};
|
||||
@@ -42,10 +43,10 @@ pub struct OnDemand<B: BlockT> {
|
||||
/// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method
|
||||
/// from the `OnDemand`. However there exists no popular implementation of MPMC channels in
|
||||
/// asynchronous Rust at the moment
|
||||
requests_queue: Mutex<Option<mpsc::UnboundedReceiver<light_client_handler::Request<B>>>>,
|
||||
requests_queue: Mutex<Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>>,
|
||||
|
||||
/// Sending side of `requests_queue`.
|
||||
requests_send: mpsc::UnboundedSender<light_client_handler::Request<B>>,
|
||||
requests_send: TracingUnboundedSender<light_client_handler::Request<B>>,
|
||||
}
|
||||
|
||||
/// Dummy implementation of `FetchChecker` that always assumes that responses are bad.
|
||||
@@ -112,7 +113,7 @@ where
|
||||
{
|
||||
/// Creates new on-demand service.
|
||||
pub fn new(checker: Arc<dyn FetchChecker<B>>) -> Self {
|
||||
let (requests_send, requests_queue) = mpsc::unbounded();
|
||||
let (requests_send, requests_queue) = tracing_unbounded("mpsc_ondemand");
|
||||
let requests_queue = Mutex::new(Some(requests_queue));
|
||||
|
||||
OnDemand {
|
||||
@@ -134,9 +135,9 @@ where
|
||||
///
|
||||
/// If this function returns `None`, that means that the receiver has already been extracted in
|
||||
/// the past, and therefore that something already handles the requests.
|
||||
pub(crate) fn extract_receiver(
|
||||
&self,
|
||||
) -> Option<mpsc::UnboundedReceiver<light_client_handler::Request<B>>> {
|
||||
pub(crate) fn extract_receiver(&self)
|
||||
-> Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>
|
||||
{
|
||||
self.requests_queue.lock().take()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,8 @@ use crate::{
|
||||
protocol::{self, event::Event, light_client_handler, sync::SyncState, PeerInfo, Protocol},
|
||||
transport, ReputationChange,
|
||||
};
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
|
||||
use libp2p::{kad::record, Multiaddr, PeerId};
|
||||
use log::{error, info, trace, warn};
|
||||
@@ -159,7 +160,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
|
||||
/// nodes it should be connected to or not.
|
||||
peerset: PeersetHandle,
|
||||
/// Channel that sends messages to the actual worker.
|
||||
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, H>>,
|
||||
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B, H>>,
|
||||
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
|
||||
/// compatibility.
|
||||
_marker: PhantomData<H>,
|
||||
@@ -172,7 +173,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
/// for the network processing to advance. From it, you can extract a `NetworkService` using
|
||||
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
|
||||
pub fn new(params: Params<B, H>) -> Result<NetworkWorker<B, H>, Error> {
|
||||
let (to_worker, from_worker) = mpsc::unbounded();
|
||||
let (to_worker, from_worker) = tracing_unbounded("mpsc_network_worker");
|
||||
|
||||
if let Some(ref path) = params.network_config.net_config_path {
|
||||
fs::create_dir_all(Path::new(path))?;
|
||||
@@ -550,7 +551,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
|
||||
/// The stream never ends (unless the `NetworkWorker` gets shut down).
|
||||
pub fn event_stream(&self) -> impl Stream<Item = Event> {
|
||||
// Note: when transitioning to stable futures, remove the `Error` entirely
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_network_event_stream");
|
||||
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
|
||||
rx
|
||||
}
|
||||
@@ -770,7 +771,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
|
||||
PutValue(record::Key, Vec<u8>),
|
||||
AddKnownAddress(PeerId, Multiaddr),
|
||||
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
|
||||
EventStream(mpsc::UnboundedSender<Event>),
|
||||
EventStream(TracingUnboundedSender<Event>),
|
||||
WriteNotification {
|
||||
message: Vec<u8>,
|
||||
engine_id: ConsensusEngineId,
|
||||
@@ -801,11 +802,11 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
|
||||
/// The import queue that was passed as initialization.
|
||||
import_queue: Box<dyn ImportQueue<B>>,
|
||||
/// Messages from the `NetworkService` and that must be processed.
|
||||
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, H>>,
|
||||
from_worker: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>,
|
||||
/// Receiver for queries from the light client that must be processed.
|
||||
light_client_rqs: Option<mpsc::UnboundedReceiver<light_client_handler::Request<B>>>,
|
||||
light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>,
|
||||
/// Senders for events that happen on the network.
|
||||
event_streams: Vec<mpsc::UnboundedSender<Event>>,
|
||||
event_streams: Vec<TracingUnboundedSender<Event>>,
|
||||
/// Prometheus network metrics.
|
||||
metrics: Option<Metrics>,
|
||||
/// The `PeerId`'s of all boot nodes.
|
||||
|
||||
@@ -24,6 +24,7 @@ parking_lot = "0.10.0"
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
|
||||
rand = "0.7.2"
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sc-network = { version = "0.8.0-alpha.5", path = "../network" }
|
||||
sc-keystore = { version = "2.0.0-alpha.5", path = "../keystore" }
|
||||
|
||||
|
||||
@@ -32,11 +32,12 @@ use futures::{prelude::*, future, channel::mpsc};
|
||||
use log::error;
|
||||
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
||||
use std::{fmt, io::Read as _, mem, pin::Pin, task::Context, task::Poll};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
||||
pub fn http() -> (HttpApi, HttpWorker) {
|
||||
let (to_worker, from_api) = mpsc::unbounded();
|
||||
let (to_api, from_worker) = mpsc::unbounded();
|
||||
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
|
||||
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");
|
||||
|
||||
let api = HttpApi {
|
||||
to_worker,
|
||||
@@ -63,10 +64,10 @@ pub fn http() -> (HttpApi, HttpWorker) {
|
||||
/// to offchain workers.
|
||||
pub struct HttpApi {
|
||||
/// Used to sends messages to the worker.
|
||||
to_worker: mpsc::UnboundedSender<ApiToWorker>,
|
||||
to_worker: TracingUnboundedSender<ApiToWorker>,
|
||||
/// Used to receive messages from the worker.
|
||||
/// We use a `Fuse` in order to have an extra protection against panicking.
|
||||
from_worker: stream::Fuse<mpsc::UnboundedReceiver<WorkerToApi>>,
|
||||
from_worker: stream::Fuse<TracingUnboundedReceiver<WorkerToApi>>,
|
||||
/// Id to assign to the next HTTP request that is started.
|
||||
next_id: HttpRequestId,
|
||||
/// List of HTTP requests in preparation or in progress.
|
||||
@@ -546,9 +547,9 @@ enum WorkerToApi {
|
||||
/// Must be continuously polled for the [`HttpApi`] to properly work.
|
||||
pub struct HttpWorker {
|
||||
/// Used to sends messages to the `HttpApi`.
|
||||
to_api: mpsc::UnboundedSender<WorkerToApi>,
|
||||
to_api: TracingUnboundedSender<WorkerToApi>,
|
||||
/// Used to receive messages from the `HttpApi`.
|
||||
from_api: mpsc::UnboundedReceiver<ApiToWorker>,
|
||||
from_api: TracingUnboundedReceiver<ApiToWorker>,
|
||||
/// The engine that runs HTTP requests.
|
||||
http_client: hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
|
||||
/// HTTP requests that are being worked on by the engine.
|
||||
|
||||
@@ -13,6 +13,7 @@ documentation = "https://docs.rs/sc-peerset"
|
||||
[dependencies]
|
||||
futures = "0.3.4"
|
||||
libp2p = { version = "0.16.2", default-features = false }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils"}
|
||||
log = "0.4.8"
|
||||
serde_json = "1.0.41"
|
||||
wasm-timer = "0.2"
|
||||
|
||||
@@ -20,11 +20,12 @@
|
||||
mod peersstate;
|
||||
|
||||
use std::{collections::{HashSet, HashMap}, collections::VecDeque};
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
use log::{debug, error, trace};
|
||||
use serde_json::json;
|
||||
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
|
||||
use wasm_timer::Instant;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
pub use libp2p::PeerId;
|
||||
|
||||
@@ -73,7 +74,7 @@ impl ReputationChange {
|
||||
/// Shared handle to the peer set manager (PSM). Distributed around the code.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeersetHandle {
|
||||
tx: mpsc::UnboundedSender<Action>,
|
||||
tx: TracingUnboundedSender<Action>,
|
||||
}
|
||||
|
||||
impl PeersetHandle {
|
||||
@@ -183,9 +184,9 @@ pub struct Peerset {
|
||||
/// If true, we only accept reserved nodes.
|
||||
reserved_only: bool,
|
||||
/// Receiver for messages from the `PeersetHandle` and from `tx`.
|
||||
rx: mpsc::UnboundedReceiver<Action>,
|
||||
rx: TracingUnboundedReceiver<Action>,
|
||||
/// Sending side of `rx`.
|
||||
tx: mpsc::UnboundedSender<Action>,
|
||||
tx: TracingUnboundedSender<Action>,
|
||||
/// Queue of messages to be emitted when the `Peerset` is polled.
|
||||
message_queue: VecDeque<Message>,
|
||||
/// When the `Peerset` was created.
|
||||
@@ -197,7 +198,7 @@ pub struct Peerset {
|
||||
impl Peerset {
|
||||
/// Builds a new peerset from the given configuration.
|
||||
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages");
|
||||
|
||||
let handle = PeersetHandle {
|
||||
tx: tx.clone(),
|
||||
|
||||
@@ -24,6 +24,7 @@ serde_json = "1.0.41"
|
||||
sp-session = { version = "2.0.0-alpha.5", path = "../../primitives/session" }
|
||||
sp-offchain = { version = "2.0.0-alpha.5", path = "../../primitives/offchain" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-rpc = { version = "2.0.0-alpha.5", path = "../../primitives/rpc" }
|
||||
sp-state-machine = { version = "0.8.0-alpha.5", path = "../../primitives/state-machine" }
|
||||
sc-executor = { version = "0.8.0-alpha.5", path = "../executor" }
|
||||
|
||||
@@ -20,8 +20,9 @@
|
||||
mod tests;
|
||||
|
||||
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
|
||||
use futures::{channel::{mpsc, oneshot}, compat::Compat};
|
||||
use futures::{channel::oneshot, compat::Compat};
|
||||
use sc_rpc_api::Receiver;
|
||||
use sp_utils::mpsc::TracingUnboundedSender;
|
||||
use sp_runtime::traits::{self, Header as HeaderT};
|
||||
|
||||
use self::error::Result;
|
||||
@@ -33,7 +34,7 @@ pub use self::gen_client::Client as SystemClient;
|
||||
/// System API implementation
|
||||
pub struct System<B: traits::Block> {
|
||||
info: SystemInfo,
|
||||
send_back: mpsc::UnboundedSender<Request<B>>,
|
||||
send_back: TracingUnboundedSender<Request<B>>,
|
||||
}
|
||||
|
||||
/// Request to be processed.
|
||||
@@ -59,7 +60,7 @@ impl<B: traits::Block> System<B> {
|
||||
/// reading from that channel and answering the requests.
|
||||
pub fn new(
|
||||
info: SystemInfo,
|
||||
send_back: mpsc::UnboundedSender<Request<B>>,
|
||||
send_back: TracingUnboundedSender<Request<B>>,
|
||||
) -> Self {
|
||||
System {
|
||||
info,
|
||||
|
||||
@@ -36,6 +36,7 @@ target_info = "0.1.0"
|
||||
sc-keystore = { version = "2.0.0-alpha.5", path = "../keystore" }
|
||||
sp-io = { version = "2.0.0-alpha.5", path = "../../primitives/io" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" }
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
|
||||
sp-session = { version = "2.0.0-alpha.5", path = "../../primitives/session" }
|
||||
@@ -61,6 +62,14 @@ sc-tracing = { version = "2.0.0-alpha.5", path = "../tracing" }
|
||||
tracing = "0.1.10"
|
||||
parity-util-mem = { version = "0.6.0", default-features = false, features = ["primitive-types"] }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
procfs = '0.7.8'
|
||||
netstat2 = "0.8.1"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
netstat2 = "0.8.1"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" }
|
||||
sp-consensus-babe = { version = "0.8.0-alpha.5", path = "../../primitives/consensus/babe" }
|
||||
|
||||
@@ -18,6 +18,7 @@ use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL
|
||||
use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter};
|
||||
use crate::status_sinks;
|
||||
use crate::config::{Configuration, DatabaseConfig, KeystoreConfig, PrometheusConfig};
|
||||
use crate::metrics::MetricsService;
|
||||
use sc_client_api::{
|
||||
self,
|
||||
BlockchainEvents,
|
||||
@@ -25,12 +26,12 @@ use sc_client_api::{
|
||||
execution_extensions::ExtensionsFactory,
|
||||
ExecutorProvider, CallExecutor
|
||||
};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
|
||||
use sc_client::Client;
|
||||
use sc_chain_spec::get_extension;
|
||||
use sp_consensus::import_queue::ImportQueue;
|
||||
use futures::{
|
||||
Future, FutureExt, StreamExt,
|
||||
channel::mpsc,
|
||||
future::ready,
|
||||
};
|
||||
use sc_keystore::{Store as Keystore};
|
||||
@@ -40,7 +41,7 @@ use sc_network::{NetworkService, NetworkStateInfo};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use sp_runtime::generic::BlockId;
|
||||
use sp_runtime::traits::{
|
||||
Block as BlockT, NumberFor, SaturatedConversion, HashFor, UniqueSaturatedInto,
|
||||
Block as BlockT, NumberFor, SaturatedConversion, HashFor,
|
||||
};
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sc_executor::{NativeExecutor, NativeExecutionDispatch};
|
||||
@@ -49,56 +50,9 @@ use std::{
|
||||
marker::PhantomData, sync::Arc, pin::Pin
|
||||
};
|
||||
use wasm_timer::SystemTime;
|
||||
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
|
||||
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
|
||||
use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
|
||||
use sp_blockchain;
|
||||
use prometheus_endpoint::{register, Gauge, U64, F64, Registry, PrometheusError, Opts, GaugeVec};
|
||||
|
||||
struct ServiceMetrics {
|
||||
block_height_number: GaugeVec<U64>,
|
||||
ready_transactions_number: Gauge<U64>,
|
||||
memory_usage_bytes: Gauge<U64>,
|
||||
cpu_usage_percentage: Gauge<F64>,
|
||||
network_per_sec_bytes: GaugeVec<U64>,
|
||||
database_cache: Gauge<U64>,
|
||||
state_cache: Gauge<U64>,
|
||||
state_db: GaugeVec<U64>,
|
||||
}
|
||||
|
||||
impl ServiceMetrics {
|
||||
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
|
||||
Ok(Self {
|
||||
block_height_number: register(GaugeVec::new(
|
||||
Opts::new("block_height_number", "Height of the chain"),
|
||||
&["status"]
|
||||
)?, registry)?,
|
||||
ready_transactions_number: register(Gauge::new(
|
||||
"ready_transactions_number", "Number of transactions in the ready queue",
|
||||
)?, registry)?,
|
||||
memory_usage_bytes: register(Gauge::new(
|
||||
"memory_usage_bytes", "Node memory (resident set size) usage",
|
||||
)?, registry)?,
|
||||
cpu_usage_percentage: register(Gauge::new(
|
||||
"cpu_usage_percentage", "Node CPU usage",
|
||||
)?, registry)?,
|
||||
network_per_sec_bytes: register(GaugeVec::new(
|
||||
Opts::new("network_per_sec_bytes", "Networking bytes per second"),
|
||||
&["direction"]
|
||||
)?, registry)?,
|
||||
database_cache: register(Gauge::new(
|
||||
"database_cache_bytes", "RocksDB cache size in bytes",
|
||||
)?, registry)?,
|
||||
state_cache: register(Gauge::new(
|
||||
"state_cache_bytes", "State cache size in bytes",
|
||||
)?, registry)?,
|
||||
state_db: register(GaugeVec::new(
|
||||
Opts::new("state_db_cache_bytes", "State DB cache in bytes"),
|
||||
&["subtype"]
|
||||
)?, registry)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type BackgroundTask = Pin<Box<dyn Future<Output=()> + Send>>;
|
||||
|
||||
@@ -820,7 +774,7 @@ ServiceBuilder<
|
||||
)?;
|
||||
|
||||
// A side-channel for essential tasks to communicate shutdown.
|
||||
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
|
||||
let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks");
|
||||
|
||||
let import_queue = Box::new(import_queue);
|
||||
let chain_info = client.chain_info();
|
||||
@@ -992,122 +946,44 @@ ServiceBuilder<
|
||||
}
|
||||
|
||||
// Prometheus metrics.
|
||||
let metrics = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
|
||||
let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
|
||||
// Set static metrics.
|
||||
register(Gauge::<U64>::with_opts(
|
||||
Opts::new(
|
||||
"build_info",
|
||||
"A metric with a constant '1' value labeled by name, version, and commit."
|
||||
)
|
||||
.const_label("name", config.impl_name)
|
||||
.const_label("version", config.impl_version)
|
||||
.const_label("commit", config.impl_commit),
|
||||
)?, ®istry)?.set(1);
|
||||
|
||||
|
||||
let role_bits = match config.role {
|
||||
Role::Full => 1,
|
||||
Role::Light => 2,
|
||||
Role::Sentry { .. } => 3,
|
||||
Role::Authority { .. } => 4,
|
||||
Role::Full => 1u64,
|
||||
Role::Light => 2u64,
|
||||
Role::Sentry { .. } => 3u64,
|
||||
Role::Authority { .. } => 4u64,
|
||||
};
|
||||
register(Gauge::<U64>::new(
|
||||
"node_role", "The role the node is running as",
|
||||
)?, ®istry)?.set(role_bits);
|
||||
|
||||
let metrics = ServiceMetrics::register(®istry)?;
|
||||
|
||||
let metrics = MetricsService::with_prometheus(
|
||||
®istry,
|
||||
&config.name,
|
||||
&config.impl_version,
|
||||
role_bits,
|
||||
)?;
|
||||
spawn_handle.spawn(
|
||||
"prometheus-endpoint",
|
||||
prometheus_endpoint::init_prometheus(port, registry).map(drop)
|
||||
);
|
||||
|
||||
Some(metrics)
|
||||
metrics
|
||||
} else {
|
||||
None
|
||||
MetricsService::new()
|
||||
};
|
||||
|
||||
// Periodically notify the telemetry.
|
||||
let transaction_pool_ = transaction_pool.clone();
|
||||
let client_ = client.clone();
|
||||
let mut sys = System::new();
|
||||
let self_pid = get_current_pid().ok();
|
||||
let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
|
||||
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
|
||||
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
|
||||
let tel_task = state_rx.for_each(move |(net_status, _)| {
|
||||
let info = client_.usage_info();
|
||||
let best_number = info.chain.best_number.saturated_into::<u64>();
|
||||
let best_hash = info.chain.best_hash;
|
||||
let num_peers = net_status.num_connected_peers;
|
||||
let txpool_status = transaction_pool_.status();
|
||||
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
|
||||
let bandwidth_download = net_status.average_download_per_sec;
|
||||
let bandwidth_upload = net_status.average_upload_per_sec;
|
||||
let best_seen_block = net_status.best_seen_block
|
||||
.map(|num: NumberFor<TBl>| num.unique_saturated_into() as u64);
|
||||
|
||||
// get cpu usage and memory usage of this process
|
||||
let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
|
||||
if sys.refresh_process(self_pid) {
|
||||
let proc = sys.get_process(self_pid)
|
||||
.expect("Above refresh_process succeeds, this should be Some(), qed");
|
||||
(proc.cpu_usage(), proc.memory())
|
||||
} else { (0.0, 0) }
|
||||
} else { (0.0, 0) };
|
||||
|
||||
telemetry!(
|
||||
SUBSTRATE_INFO;
|
||||
"system.interval";
|
||||
"peers" => num_peers,
|
||||
"height" => best_number,
|
||||
"best" => ?best_hash,
|
||||
"txcount" => txpool_status.ready,
|
||||
"cpu" => cpu_usage,
|
||||
"memory" => memory,
|
||||
"finalized_height" => finalized_number,
|
||||
"finalized_hash" => ?info.chain.finalized_hash,
|
||||
"bandwidth_download" => bandwidth_download,
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
"used_state_cache_size" => info.usage.as_ref()
|
||||
.map(|usage| usage.memory.state_cache.as_bytes())
|
||||
.unwrap_or(0),
|
||||
"used_db_cache_size" => info.usage.as_ref()
|
||||
.map(|usage| usage.memory.database_cache.as_bytes())
|
||||
.unwrap_or(0),
|
||||
"disk_read_per_sec" => info.usage.as_ref()
|
||||
.map(|usage| usage.io.bytes_read)
|
||||
.unwrap_or(0),
|
||||
"disk_write_per_sec" => info.usage.as_ref()
|
||||
.map(|usage| usage.io.bytes_written)
|
||||
.unwrap_or(0),
|
||||
metrics_service.tick(
|
||||
&info,
|
||||
&transaction_pool_.status(),
|
||||
&net_status,
|
||||
);
|
||||
if let Some(metrics) = metrics.as_ref() {
|
||||
// `sysinfo::Process::memory` returns memory usage in KiB and not bytes.
|
||||
metrics.memory_usage_bytes.set(memory * 1024);
|
||||
metrics.cpu_usage_percentage.set(f64::from(cpu_usage));
|
||||
metrics.ready_transactions_number.set(txpool_status.ready as u64);
|
||||
|
||||
metrics.network_per_sec_bytes.with_label_values(&["download"]).set(net_status.average_download_per_sec);
|
||||
metrics.network_per_sec_bytes.with_label_values(&["upload"]).set(net_status.average_upload_per_sec);
|
||||
|
||||
metrics.block_height_number.with_label_values(&["finalized"]).set(finalized_number);
|
||||
metrics.block_height_number.with_label_values(&["best"]).set(best_number);
|
||||
|
||||
if let Some(best_seen_block) = best_seen_block {
|
||||
metrics.block_height_number.with_label_values(&["sync_target"]).set(best_seen_block);
|
||||
}
|
||||
|
||||
if let Some(info) = info.usage.as_ref() {
|
||||
metrics.database_cache.set(info.memory.database_cache.as_bytes() as u64);
|
||||
metrics.state_cache.set(info.memory.state_cache.as_bytes() as u64);
|
||||
|
||||
metrics.state_db.with_label_values(&["non_canonical"]).set(info.memory.state_db.non_canonical.as_bytes() as u64);
|
||||
if let Some(pruning) = info.memory.state_db.pruning {
|
||||
metrics.state_db.with_label_values(&["pruning"]).set(pruning.as_bytes() as u64);
|
||||
}
|
||||
metrics.state_db.with_label_values(&["pinned"]).set(info.memory.state_db.pinned.as_bytes() as u64);
|
||||
}
|
||||
}
|
||||
|
||||
ready(())
|
||||
});
|
||||
|
||||
@@ -1117,7 +993,7 @@ ServiceBuilder<
|
||||
);
|
||||
|
||||
// Periodically send the network state to the telemetry.
|
||||
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
|
||||
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
|
||||
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
|
||||
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
|
||||
telemetry!(
|
||||
@@ -1133,7 +1009,7 @@ ServiceBuilder<
|
||||
);
|
||||
|
||||
// RPC
|
||||
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
|
||||
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
|
||||
let gen_handler = || {
|
||||
use sc_rpc::{chain, state, author, system, offchain};
|
||||
|
||||
@@ -1215,7 +1091,7 @@ ServiceBuilder<
|
||||
),
|
||||
);
|
||||
|
||||
let telemetry_connection_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>> = Default::default();
|
||||
let telemetry_connection_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>> = Default::default();
|
||||
|
||||
// Telemetry
|
||||
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod config;
|
||||
pub mod chain_ops;
|
||||
pub mod error;
|
||||
|
||||
mod metrics;
|
||||
mod builder;
|
||||
mod status_sinks;
|
||||
mod task_manager;
|
||||
@@ -40,7 +41,6 @@ use parking_lot::Mutex;
|
||||
use sc_client::Client;
|
||||
use futures::{
|
||||
Future, FutureExt, Stream, StreamExt,
|
||||
channel::mpsc,
|
||||
compat::*,
|
||||
sink::SinkExt,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
@@ -51,6 +51,7 @@ use codec::{Encode, Decode};
|
||||
use sp_runtime::generic::BlockId;
|
||||
use sp_runtime::traits::{NumberFor, Block as BlockT};
|
||||
use parity_util_mem::MallocSizeOf;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
|
||||
pub use self::error::Error;
|
||||
pub use self::builder::{
|
||||
@@ -98,13 +99,13 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
|
||||
transaction_pool: Arc<TTxPool>,
|
||||
/// Send a signal when a spawned essential task has concluded. The next time
|
||||
/// the service future is polled it should complete with an error.
|
||||
essential_failed_tx: mpsc::UnboundedSender<()>,
|
||||
essential_failed_tx: TracingUnboundedSender<()>,
|
||||
/// A receiver for spawned essential-tasks concluding.
|
||||
essential_failed_rx: mpsc::UnboundedReceiver<()>,
|
||||
essential_failed_rx: TracingUnboundedReceiver<()>,
|
||||
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
|
||||
_rpc: Box<dyn std::any::Any + Send + Sync>,
|
||||
_telemetry: Option<sc_telemetry::Telemetry>,
|
||||
_telemetry_on_connect_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>>,
|
||||
_telemetry_on_connect_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>>,
|
||||
_offchain_workers: Option<Arc<TOc>>,
|
||||
keystore: sc_keystore::KeyStorePtr,
|
||||
marker: PhantomData<TBl>,
|
||||
@@ -130,7 +131,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
|
||||
type TransactionPool: TransactionPool<Block = Self::Block> + MallocSizeOfWasm;
|
||||
|
||||
/// Get event stream for telemetry connection established events.
|
||||
fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()>;
|
||||
fn telemetry_on_connect_stream(&self) -> TracingUnboundedReceiver<()>;
|
||||
|
||||
/// return a shared instance of Telemetry (if enabled)
|
||||
fn telemetry(&self) -> Option<sc_telemetry::Telemetry>;
|
||||
@@ -171,7 +172,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
|
||||
-> Arc<NetworkService<Self::Block, <Self::Block as BlockT>::Hash>>;
|
||||
|
||||
/// Returns a receiver that periodically receives a status of the network.
|
||||
fn network_status(&self, interval: Duration) -> mpsc::UnboundedReceiver<(NetworkStatus<Self::Block>, NetworkState)>;
|
||||
fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus<Self::Block>, NetworkState)>;
|
||||
|
||||
/// Get shared transaction pool instance.
|
||||
fn transaction_pool(&self) -> Arc<Self::TransactionPool>;
|
||||
@@ -203,8 +204,8 @@ where
|
||||
type SelectChain = TSc;
|
||||
type TransactionPool = TExPool;
|
||||
|
||||
fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
|
||||
let (sink, stream) = futures::channel::mpsc::unbounded();
|
||||
fn telemetry_on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
|
||||
let (sink, stream) = tracing_unbounded("mpsc_telemetry_on_connect");
|
||||
self._telemetry_on_connect_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
@@ -259,8 +260,8 @@ where
|
||||
self.network.clone()
|
||||
}
|
||||
|
||||
fn network_status(&self, interval: Duration) -> mpsc::UnboundedReceiver<(NetworkStatus<Self::Block>, NetworkState)> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus<Self::Block>, NetworkState)> {
|
||||
let (sink, stream) = tracing_unbounded("mpsc_network_status");
|
||||
self.network_status_sinks.lock().push(interval, sink);
|
||||
stream
|
||||
}
|
||||
@@ -326,7 +327,7 @@ fn build_network_future<
|
||||
mut network: sc_network::NetworkWorker<B, H>,
|
||||
client: Arc<C>,
|
||||
status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>>,
|
||||
mut rpc_rx: mpsc::UnboundedReceiver<sc_rpc::system::Request<B>>,
|
||||
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
|
||||
should_have_peers: bool,
|
||||
announce_imported_blocks: bool,
|
||||
) -> impl Future<Output = ()> {
|
||||
|
||||
@@ -0,0 +1,428 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::NetworkStatus;
|
||||
use prometheus_endpoint::{register, Gauge, U64, F64, Registry, PrometheusError, Opts, GaugeVec};
|
||||
use sc_client::ClientInfo;
|
||||
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
|
||||
use std::convert::TryFrom;
|
||||
use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto};
|
||||
use sp_transaction_pool::PoolStatus;
|
||||
use sp_utils::metrics::register_globals;
|
||||
|
||||
#[cfg(any(windows, unix))]
|
||||
use sysinfo::{ProcessExt, System, SystemExt};
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
use netstat2::{TcpState, ProtocolSocketInfo, iterate_sockets_info, AddressFamilyFlags, ProtocolFlags};
|
||||
|
||||
#[cfg(not(unix))]
|
||||
use sysinfo::get_current_pid;
|
||||
|
||||
#[cfg(unix)]
|
||||
use procfs;
|
||||
|
||||
struct PrometheusMetrics {
|
||||
// system
|
||||
#[cfg(any(unix, windows))]
|
||||
load_avg: GaugeVec<F64>,
|
||||
|
||||
// process
|
||||
cpu_usage_percentage: Gauge<F64>,
|
||||
memory_usage_bytes: Gauge<U64>,
|
||||
threads: Gauge<U64>,
|
||||
open_files: GaugeVec<U64>,
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
netstat: GaugeVec<U64>,
|
||||
|
||||
// -- inner counters
|
||||
// generic info
|
||||
block_height: GaugeVec<U64>,
|
||||
number_leaves: Gauge<U64>,
|
||||
ready_transactions_number: Gauge<U64>,
|
||||
|
||||
// I/O
|
||||
network_per_sec_bytes: GaugeVec<U64>,
|
||||
database_cache: Gauge<U64>,
|
||||
state_cache: Gauge<U64>,
|
||||
state_db: GaugeVec<U64>,
|
||||
}
|
||||
|
||||
impl PrometheusMetrics {
|
||||
fn setup(registry: &Registry, name: &str, version: &str, roles: u64)
|
||||
-> Result<Self, PrometheusError>
|
||||
{
|
||||
register(Gauge::<U64>::with_opts(
|
||||
Opts::new(
|
||||
"build_info",
|
||||
"A metric with a constant '1' value labeled by name, version"
|
||||
)
|
||||
.const_label("name", name)
|
||||
.const_label("version", version)
|
||||
)?, ®istry)?.set(1);
|
||||
|
||||
register(Gauge::<U64>::new(
|
||||
"node_roles", "The roles the node is running as",
|
||||
)?, ®istry)?.set(roles);
|
||||
|
||||
register_globals(registry)?;
|
||||
|
||||
Ok(Self {
|
||||
// system
|
||||
#[cfg(any(unix, windows))]
|
||||
load_avg: register(GaugeVec::new(
|
||||
Opts::new("load_avg", "System load average"),
|
||||
&["over"]
|
||||
)?, registry)?,
|
||||
|
||||
// process
|
||||
memory_usage_bytes: register(Gauge::new(
|
||||
"memory_usage_bytes", "Node memory (resident set size) usage",
|
||||
)?, registry)?,
|
||||
|
||||
cpu_usage_percentage: register(Gauge::new(
|
||||
"cpu_usage_percentage", "Node CPU usage",
|
||||
)?, registry)?,
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
netstat: register(GaugeVec::new(
|
||||
Opts::new("netstat_tcp", "Current TCP connections "),
|
||||
&["status"]
|
||||
)?, registry)?,
|
||||
|
||||
threads: register(Gauge::new(
|
||||
"threads", "Number of threads used by the process",
|
||||
)?, registry)?,
|
||||
|
||||
open_files: register(GaugeVec::new(
|
||||
Opts::new("open_file_handles", "Open file handlers held by the process"),
|
||||
&["fd_type"]
|
||||
)?, registry)?,
|
||||
|
||||
// --- internal
|
||||
|
||||
// generic internals
|
||||
|
||||
block_height: register(GaugeVec::new(
|
||||
Opts::new("block_height", "Block height info of the chain"),
|
||||
&["status"]
|
||||
)?, registry)?,
|
||||
|
||||
number_leaves: register(Gauge::new(
|
||||
"number_leaves", "Number of known chain leaves (aka forks)",
|
||||
)?, registry)?,
|
||||
|
||||
ready_transactions_number: register(Gauge::new(
|
||||
"ready_transactions_number", "Number of transactions in the ready queue",
|
||||
)?, registry)?,
|
||||
|
||||
// I/ O
|
||||
|
||||
network_per_sec_bytes: register(GaugeVec::new(
|
||||
Opts::new("network_per_sec_bytes", "Networking bytes per second"),
|
||||
&["direction"]
|
||||
)?, registry)?,
|
||||
database_cache: register(Gauge::new(
|
||||
"database_cache_bytes", "RocksDB cache size in bytes",
|
||||
)?, registry)?,
|
||||
state_cache: register(Gauge::new(
|
||||
"state_cache_bytes", "State cache size in bytes",
|
||||
)?, registry)?,
|
||||
state_db: register(GaugeVec::new(
|
||||
Opts::new("state_db_cache_bytes", "State DB cache in bytes"),
|
||||
&["subtype"]
|
||||
)?, registry)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
#[derive(Default)]
|
||||
struct ConnectionsCount {
|
||||
listen: u64,
|
||||
established: u64,
|
||||
starting: u64,
|
||||
closing: u64,
|
||||
closed: u64,
|
||||
other: u64
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct FdCounter {
|
||||
paths: u64,
|
||||
sockets: u64,
|
||||
net: u64,
|
||||
pipes: u64,
|
||||
anon_inode: u64,
|
||||
mem: u64,
|
||||
other: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ProcessInfo {
|
||||
cpu_usage: f64,
|
||||
memory: u64,
|
||||
threads: Option<u64>,
|
||||
open_fd: Option<FdCounter>,
|
||||
}
|
||||
|
||||
pub struct MetricsService {
|
||||
metrics: Option<PrometheusMetrics>,
|
||||
#[cfg(any(windows, unix))]
|
||||
system: System,
|
||||
pid: Option<i32>,
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl MetricsService {
|
||||
fn inner_new(metrics: Option<PrometheusMetrics>) -> Self {
|
||||
let process = procfs::process::Process::myself()
|
||||
.expect("Procfs doesn't fail on unix. qed");
|
||||
|
||||
Self {
|
||||
metrics,
|
||||
system: System::new(),
|
||||
pid: Some(process.pid),
|
||||
}
|
||||
}
|
||||
fn process_info(&mut self) -> ProcessInfo {
|
||||
let pid = self.pid.clone().expect("unix always has a pid. qed");
|
||||
let mut info = self._process_info_for(&pid);
|
||||
let process = procfs::process::Process::new(pid).expect("Our process exists. qed.");
|
||||
info.threads = process.stat().ok().map(|s|
|
||||
u64::try_from(s.num_threads).expect("There are no negative thread counts. qed"));
|
||||
info.open_fd = process.fd().ok().map(|i|
|
||||
i.into_iter().fold(FdCounter::default(), |mut f, info| {
|
||||
match info.target {
|
||||
procfs::process::FDTarget::Path(_) => f.paths += 1,
|
||||
procfs::process::FDTarget::Socket(_) => f.sockets += 1,
|
||||
procfs::process::FDTarget::Net(_) => f.net += 1,
|
||||
procfs::process::FDTarget::Pipe(_) => f.pipes += 1,
|
||||
procfs::process::FDTarget::AnonInode(_) => f.anon_inode += 1,
|
||||
procfs::process::FDTarget::MemFD(_) => f.mem += 1,
|
||||
procfs::process::FDTarget::Other(_,_) => f.other += 1,
|
||||
};
|
||||
f
|
||||
})
|
||||
);
|
||||
info
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
#[cfg(windows)]
|
||||
impl MetricsService {
|
||||
fn inner_new(metrics: Option<PrometheusMetrics>) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
system: System(),
|
||||
pid: get_current_pid().ok()
|
||||
}
|
||||
}
|
||||
|
||||
fn process_info(&mut self) -> ProcessInfo {
|
||||
self.pid.map(|pid| self._process_info_for(pid)).or_else(ProcessInfo::default)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(unix, windows)))]
|
||||
impl MetricsService {
|
||||
fn inner_new(metrics: Option<PrometheusMetrics>) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
pid: None
|
||||
}
|
||||
}
|
||||
|
||||
fn process_info(&mut self) -> ProcessInfo {
|
||||
ProcessInfo::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl MetricsService {
|
||||
|
||||
pub fn with_prometheus(registry: &Registry, name: &str, version: &str, roles: u64)
|
||||
-> Result<Self, PrometheusError>
|
||||
{
|
||||
PrometheusMetrics::setup(registry, name, version, roles).map(|p| {
|
||||
Self::inner_new(Some(p))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self::inner_new(None)
|
||||
}
|
||||
|
||||
#[cfg(any(windows, unix))]
|
||||
fn _process_info_for(&mut self, pid: &i32) -> ProcessInfo {
|
||||
let mut info = ProcessInfo::default();
|
||||
if self.system.refresh_process(*pid) {
|
||||
let prc = self.system.get_process(*pid)
|
||||
.expect("Above refresh_process succeeds, this must be Some(), qed");
|
||||
info.cpu_usage = prc.cpu_usage().into();
|
||||
info.memory = prc.memory();
|
||||
}
|
||||
info
|
||||
}
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
fn connections_info(&self) -> Option<ConnectionsCount> {
|
||||
self.pid.as_ref().and_then(|pid| {
|
||||
let af_flags = AddressFamilyFlags::IPV4 | AddressFamilyFlags::IPV6;
|
||||
let proto_flags = ProtocolFlags::TCP;
|
||||
let netstat_pid = *pid as u32;
|
||||
|
||||
iterate_sockets_info(af_flags, proto_flags).ok().map(|iter|
|
||||
iter.filter_map(|r|
|
||||
r.ok().and_then(|s| {
|
||||
if s.associated_pids.contains(&netstat_pid) {
|
||||
match s.protocol_socket_info {
|
||||
ProtocolSocketInfo::Tcp(info) => Some(info.state),
|
||||
_ => None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
).fold(ConnectionsCount::default(), |mut counter, socket_state| {
|
||||
match socket_state {
|
||||
TcpState::Listen => counter.listen += 1,
|
||||
TcpState::Established => counter.established += 1,
|
||||
TcpState::Closed => counter.closed += 1,
|
||||
TcpState::SynSent | TcpState::SynReceived => counter.starting += 1,
|
||||
TcpState::FinWait1 | TcpState::FinWait2 | TcpState::CloseWait
|
||||
| TcpState::Closing | TcpState::LastAck => counter.closing += 1,
|
||||
_ => counter.other += 1
|
||||
}
|
||||
|
||||
counter
|
||||
})
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tick<T: Block>(
|
||||
&mut self,
|
||||
info: &ClientInfo<T>,
|
||||
txpool_status: &PoolStatus,
|
||||
net_status: &NetworkStatus<T>
|
||||
) {
|
||||
|
||||
let best_number = info.chain.best_number.saturated_into::<u64>();
|
||||
let best_hash = info.chain.best_hash;
|
||||
let num_peers = net_status.num_connected_peers;
|
||||
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
|
||||
let bandwidth_download = net_status.average_download_per_sec;
|
||||
let bandwidth_upload = net_status.average_upload_per_sec;
|
||||
let best_seen_block = net_status.best_seen_block
|
||||
.map(|num: NumberFor<T>| num.unique_saturated_into() as u64);
|
||||
let process_info = self.process_info();
|
||||
|
||||
telemetry!(
|
||||
SUBSTRATE_INFO;
|
||||
"system.interval";
|
||||
"peers" => num_peers,
|
||||
"height" => best_number,
|
||||
"best" => ?best_hash,
|
||||
"txcount" => txpool_status.ready,
|
||||
"cpu" => process_info.cpu_usage,
|
||||
"memory" => process_info.memory,
|
||||
"finalized_height" => finalized_number,
|
||||
"finalized_hash" => ?info.chain.finalized_hash,
|
||||
"bandwidth_download" => bandwidth_download,
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
"used_state_cache_size" => info.usage.as_ref()
|
||||
.map(|usage| usage.memory.state_cache.as_bytes())
|
||||
.unwrap_or(0),
|
||||
"used_db_cache_size" => info.usage.as_ref()
|
||||
.map(|usage| usage.memory.database_cache.as_bytes())
|
||||
.unwrap_or(0),
|
||||
"disk_read_per_sec" => info.usage.as_ref()
|
||||
.map(|usage| usage.io.bytes_read)
|
||||
.unwrap_or(0),
|
||||
"disk_write_per_sec" => info.usage.as_ref()
|
||||
.map(|usage| usage.io.bytes_written)
|
||||
.unwrap_or(0),
|
||||
);
|
||||
|
||||
if let Some(metrics) = self.metrics.as_ref() {
|
||||
metrics.cpu_usage_percentage.set(process_info.cpu_usage as f64);
|
||||
// `sysinfo::Process::memory` returns memory usage in KiB and not bytes.
|
||||
metrics.memory_usage_bytes.set(process_info.memory * 1024);
|
||||
|
||||
if let Some(threads) = process_info.threads {
|
||||
metrics.threads.set(threads);
|
||||
}
|
||||
|
||||
if let Some(fd_info) = process_info.open_fd {
|
||||
metrics.open_files.with_label_values(&["paths"]).set(fd_info.paths);
|
||||
metrics.open_files.with_label_values(&["mem"]).set(fd_info.mem);
|
||||
metrics.open_files.with_label_values(&["sockets"]).set(fd_info.sockets);
|
||||
metrics.open_files.with_label_values(&["net"]).set(fd_info.net);
|
||||
metrics.open_files.with_label_values(&["pipe"]).set(fd_info.pipes);
|
||||
metrics.open_files.with_label_values(&["anon_inode"]).set(fd_info.anon_inode);
|
||||
metrics.open_files.with_label_values(&["other"]).set(fd_info.other);
|
||||
}
|
||||
|
||||
|
||||
metrics.network_per_sec_bytes.with_label_values(&["download"]).set(net_status.average_download_per_sec);
|
||||
metrics.network_per_sec_bytes.with_label_values(&["upload"]).set(net_status.average_upload_per_sec);
|
||||
|
||||
metrics.block_height.with_label_values(&["finalized"]).set(finalized_number);
|
||||
metrics.block_height.with_label_values(&["best"]).set(best_number);
|
||||
if let Ok(leaves) = u64::try_from(info.chain.number_leaves) {
|
||||
metrics.number_leaves.set(leaves);
|
||||
}
|
||||
|
||||
metrics.ready_transactions_number.set(txpool_status.ready as u64);
|
||||
|
||||
if let Some(best_seen_block) = best_seen_block {
|
||||
metrics.block_height.with_label_values(&["sync_target"]).set(best_seen_block);
|
||||
}
|
||||
|
||||
if let Some(info) = info.usage.as_ref() {
|
||||
metrics.database_cache.set(info.memory.database_cache.as_bytes() as u64);
|
||||
metrics.state_cache.set(info.memory.state_cache.as_bytes() as u64);
|
||||
|
||||
metrics.state_db.with_label_values(&["non_canonical"]).set(info.memory.state_db.non_canonical.as_bytes() as u64);
|
||||
if let Some(pruning) = info.memory.state_db.pruning {
|
||||
metrics.state_db.with_label_values(&["pruning"]).set(pruning.as_bytes() as u64);
|
||||
}
|
||||
metrics.state_db.with_label_values(&["pinned"]).set(info.memory.state_db.pinned.as_bytes() as u64);
|
||||
}
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
{
|
||||
let load = self.system.get_load_average();
|
||||
metrics.load_avg.with_label_values(&["1min"]).set(load.one);
|
||||
metrics.load_avg.with_label_values(&["5min"]).set(load.five);
|
||||
metrics.load_avg.with_label_values(&["15min"]).set(load.fifteen);
|
||||
|
||||
if let Some(conns) = self.connections_info() {
|
||||
metrics.netstat.with_label_values(&["listen"]).set(conns.listen);
|
||||
metrics.netstat.with_label_values(&["established"]).set(conns.established);
|
||||
metrics.netstat.with_label_values(&["starting"]).set(conns.starting);
|
||||
metrics.netstat.with_label_values(&["closing"]).set(conns.closing);
|
||||
metrics.netstat.with_label_values(&["closed"]).set(conns.closed);
|
||||
metrics.netstat.with_label_values(&["other"]).set(conns.other);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,11 +14,12 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::{Stream, stream::futures_unordered::FuturesUnordered, channel::mpsc};
|
||||
use futures::{Stream, stream::futures_unordered::FuturesUnordered};
|
||||
use std::time::Duration;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Poll, Context};
|
||||
use futures_timer::Delay;
|
||||
use sp_utils::mpsc::TracingUnboundedSender;
|
||||
|
||||
/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
|
||||
/// period elapses, we push an element on the sender.
|
||||
@@ -31,7 +32,7 @@ pub struct StatusSinks<T> {
|
||||
struct YieldAfter<T> {
|
||||
delay: Delay,
|
||||
interval: Duration,
|
||||
sender: Option<mpsc::UnboundedSender<T>>,
|
||||
sender: Option<TracingUnboundedSender<T>>,
|
||||
}
|
||||
|
||||
impl<T> StatusSinks<T> {
|
||||
@@ -45,7 +46,7 @@ impl<T> StatusSinks<T> {
|
||||
/// Adds a sender to the collection.
|
||||
///
|
||||
/// The `interval` is the time period between two pushes on the sender.
|
||||
pub fn push(&mut self, interval: Duration, sender: mpsc::UnboundedSender<T>) {
|
||||
pub fn push(&mut self, interval: Duration, sender: TracingUnboundedSender<T>) {
|
||||
self.entries.push(YieldAfter {
|
||||
delay: Delay::new(interval),
|
||||
interval,
|
||||
@@ -88,7 +89,7 @@ impl<T> StatusSinks<T> {
|
||||
}
|
||||
|
||||
impl<T> futures::Future for YieldAfter<T> {
|
||||
type Output = (mpsc::UnboundedSender<T>, Duration);
|
||||
type Output = (TracingUnboundedSender<T>, Duration);
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
@@ -22,17 +22,18 @@ use exit_future::Signal;
|
||||
use log::{debug, error};
|
||||
use futures::{
|
||||
Future, FutureExt, Stream,
|
||||
future::select, channel::mpsc,
|
||||
future::select,
|
||||
compat::*,
|
||||
task::{Spawn, FutureObj, SpawnError},
|
||||
};
|
||||
use sc_client_api::CloneableSpawn;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
/// Type alias for service task executor (usually runtime).
|
||||
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>;
|
||||
|
||||
/// Type alias for the task scheduler.
|
||||
pub type TaskScheduler = mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>;
|
||||
pub type TaskScheduler = TracingUnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>;
|
||||
|
||||
/// Helper struct to setup background tasks execution for service.
|
||||
pub struct TaskManagerBuilder {
|
||||
@@ -44,14 +45,14 @@ pub struct TaskManagerBuilder {
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
to_spawn_rx: TracingUnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
}
|
||||
|
||||
impl TaskManagerBuilder {
|
||||
/// New asynchronous task manager setup.
|
||||
pub fn new() -> Self {
|
||||
let (signal, on_exit) = exit_future::signal();
|
||||
let (to_spawn_tx, to_spawn_rx) = mpsc::unbounded();
|
||||
let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager");
|
||||
Self {
|
||||
on_exit,
|
||||
signal: Some(signal),
|
||||
@@ -144,7 +145,7 @@ pub struct TaskManager {
|
||||
/// Sender for futures that must be spawned as background tasks.
|
||||
to_spawn_tx: TaskScheduler,
|
||||
/// Receiver for futures that must be spawned as background tasks.
|
||||
to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
to_spawn_rx: TracingUnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>,
|
||||
/// How to spawn background tasks.
|
||||
executor: ServiceTaskExecutor,
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ use std::{
|
||||
result,
|
||||
};
|
||||
use log::{info, trace, warn};
|
||||
use futures::channel::mpsc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use codec::{Encode, Decode};
|
||||
use hash_db::Prefix;
|
||||
@@ -78,6 +77,7 @@ pub use sc_client_api::{
|
||||
notifications::{StorageNotifications, StorageEventStream},
|
||||
CallExecutor, ExecutorProvider, ProofProvider, CloneableSpawn,
|
||||
};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
|
||||
use sp_blockchain::Error;
|
||||
use prometheus_endpoint::Registry;
|
||||
|
||||
@@ -93,8 +93,8 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
|
||||
backend: Arc<B>,
|
||||
executor: E,
|
||||
storage_notifications: Mutex<StorageNotifications<Block>>,
|
||||
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
|
||||
finality_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<FinalityNotification<Block>>>>,
|
||||
import_notification_sinks: Mutex<Vec<TracingUnboundedSender<BlockImportNotification<Block>>>>,
|
||||
finality_notification_sinks: Mutex<Vec<TracingUnboundedSender<FinalityNotification<Block>>>>,
|
||||
// holds the block hash currently being imported. TODO: replace this with block queue
|
||||
importing_block: RwLock<Option<Block::Hash>>,
|
||||
block_rules: BlockRules<Block>,
|
||||
@@ -1764,13 +1764,13 @@ where
|
||||
{
|
||||
/// Get block import event stream.
|
||||
fn import_notification_stream(&self) -> ImportNotifications<Block> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream");
|
||||
self.import_notification_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream");
|
||||
self.finality_notification_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
@@ -304,6 +304,7 @@ impl<Block: BlockT> HeaderBackend<Block> for Blockchain<Block> {
|
||||
genesis_hash: storage.genesis_hash,
|
||||
finalized_hash: storage.finalized_hash,
|
||||
finalized_number: storage.finalized_number,
|
||||
number_leaves: storage.leaves.count()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -195,6 +195,11 @@ impl<H, N> LeafSet<H, N> where
|
||||
self.storage.iter().flat_map(|(_, hashes)| hashes.iter()).cloned().collect()
|
||||
}
|
||||
|
||||
/// Number of known leaves
|
||||
pub fn count(&self) -> usize {
|
||||
self.storage.len()
|
||||
}
|
||||
|
||||
/// Write the leaf list to the database transaction.
|
||||
pub fn prepare_transaction(&mut self, tx: &mut DBTransaction, column: u32, prefix: &[u8]) {
|
||||
let mut buf = prefix.to_vec();
|
||||
|
||||
@@ -19,6 +19,7 @@ wasm-timer = "0.2"
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
|
||||
sp-api = { version = "2.0.0-alpha.5", path = "../../primitives/api" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
|
||||
sc-transaction-graph = { version = "2.0.0-alpha.5", path = "./graph" }
|
||||
sp-transaction-pool = { version = "2.0.0-alpha.5", path = "../../primitives/transaction-pool" }
|
||||
sc-client-api = { version = "2.0.0-alpha.5", path = "../api" }
|
||||
|
||||
@@ -16,6 +16,7 @@ parking_lot = "0.10.0"
|
||||
serde = { version = "1.0.101", features = ["derive"] }
|
||||
wasm-timer = "0.2"
|
||||
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../../primitives/blockchain" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../../primitives/utils" }
|
||||
sp-core = { version = "2.0.0-alpha.5", path = "../../../primitives/core" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../../primitives/runtime" }
|
||||
sp-transaction-pool = { version = "2.0.0-alpha.5", path = "../../../primitives/transaction-pool" }
|
||||
|
||||
@@ -24,10 +24,7 @@ use crate::base_pool as base;
|
||||
use crate::watcher::Watcher;
|
||||
use serde::Serialize;
|
||||
|
||||
use futures::{
|
||||
Future, FutureExt,
|
||||
channel::mpsc,
|
||||
};
|
||||
use futures::{Future, FutureExt};
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{self, SaturatedConversion},
|
||||
@@ -37,12 +34,13 @@ use sp_runtime::{
|
||||
};
|
||||
use sp_transaction_pool::error;
|
||||
use wasm_timer::Instant;
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
|
||||
use crate::validated_pool::ValidatedPool;
|
||||
pub use crate::validated_pool::ValidatedTransaction;
|
||||
|
||||
/// Modification notification event stream type;
|
||||
pub type EventStream<H> = mpsc::UnboundedReceiver<H>;
|
||||
pub type EventStream<H> = TracingUnboundedReceiver<H>;
|
||||
|
||||
/// Extrinsic hash type for a pool.
|
||||
pub type ExHash<A> = <A as ChainApi>::Hash;
|
||||
|
||||
@@ -27,7 +27,6 @@ use crate::watcher::Watcher;
|
||||
use serde::Serialize;
|
||||
use log::{debug, warn};
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
@@ -36,6 +35,7 @@ use sp_runtime::{
|
||||
};
|
||||
use sp_transaction_pool::{error, PoolStatus};
|
||||
use wasm_timer::Instant;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
|
||||
|
||||
use crate::base_pool::PruneStatus;
|
||||
use crate::pool::{EventStream, Options, ChainApi, ExHash, ExtrinsicFor, TransactionFor};
|
||||
@@ -95,7 +95,7 @@ pub struct ValidatedPool<B: ChainApi> {
|
||||
ExHash<B>,
|
||||
ExtrinsicFor<B>,
|
||||
>>,
|
||||
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<ExHash<B>>>>,
|
||||
import_notification_sinks: Mutex<Vec<TracingUnboundedSender<ExHash<B>>>>,
|
||||
rotator: PoolRotator<ExHash<B>>,
|
||||
}
|
||||
|
||||
@@ -504,7 +504,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
/// Consumers of this stream should use the `ready` method to actually get the
|
||||
/// pending transactions in the right order.
|
||||
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
let (sink, stream) = tracing_unbounded("mpsc_import_notifications");
|
||||
self.import_notification_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
@@ -16,18 +16,16 @@
|
||||
|
||||
//! Extrinsics status updates.
|
||||
|
||||
use futures::{
|
||||
Stream,
|
||||
channel::mpsc,
|
||||
};
|
||||
use futures::Stream;
|
||||
use sp_transaction_pool::TransactionStatus;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
/// Extrinsic watcher.
|
||||
///
|
||||
/// Represents a stream of status updates for particular extrinsic.
|
||||
#[derive(Debug)]
|
||||
pub struct Watcher<H, BH> {
|
||||
receiver: mpsc::UnboundedReceiver<TransactionStatus<H, BH>>,
|
||||
receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
|
||||
hash: H,
|
||||
}
|
||||
|
||||
@@ -48,7 +46,7 @@ impl<H, BH> Watcher<H, BH> {
|
||||
/// Sender part of the watcher. Exposed only for testing purposes.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<H, BH> {
|
||||
receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, BH>>>,
|
||||
receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
|
||||
is_finalized: bool,
|
||||
}
|
||||
|
||||
@@ -64,7 +62,7 @@ impl<H, BH> Default for Sender<H, BH> {
|
||||
impl<H: Clone, BH: Clone> Sender<H, BH> {
|
||||
/// Add a new watcher to this sender object.
|
||||
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
|
||||
let (tx, receiver) = mpsc::unbounded();
|
||||
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher");
|
||||
self.receivers.push(tx);
|
||||
Watcher {
|
||||
receiver,
|
||||
|
||||
@@ -22,8 +22,9 @@ use sc_transaction_graph::{ChainApi, Pool, ExHash, NumberFor, ValidatedTransacti
|
||||
use sp_runtime::traits::{Zero, SaturatedConversion};
|
||||
use sp_runtime::generic::BlockId;
|
||||
use sp_runtime::transaction_validity::TransactionValidityError;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(not(test))]
|
||||
@@ -202,7 +203,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
/// transactions from the pool.
|
||||
pub async fn run<R: intervalier::IntoStream>(
|
||||
mut self,
|
||||
from_queue: mpsc::UnboundedReceiver<WorkerPayload<Api>>,
|
||||
from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
|
||||
interval: R,
|
||||
) where R: Send, R::Guard: Send
|
||||
{
|
||||
@@ -252,7 +253,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
pub struct RevalidationQueue<Api: ChainApi> {
|
||||
pool: Arc<Pool<Api>>,
|
||||
api: Arc<Api>,
|
||||
background: Option<mpsc::UnboundedSender<WorkerPayload<Api>>>,
|
||||
background: Option<TracingUnboundedSender<WorkerPayload<Api>>>,
|
||||
}
|
||||
|
||||
impl<Api: ChainApi> RevalidationQueue<Api>
|
||||
@@ -275,7 +276,7 @@ where
|
||||
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
|
||||
where R: Send + 'static, R::Guard: Send
|
||||
{
|
||||
let (to_worker, from_queue) = mpsc::unbounded();
|
||||
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");
|
||||
|
||||
let worker = RevalidationWorker::new(api.clone(), pool.clone());
|
||||
|
||||
|
||||
@@ -254,6 +254,8 @@ pub struct Info<Block: BlockT> {
|
||||
pub finalized_hash: Block::Hash,
|
||||
/// Last finalized block number.
|
||||
pub finalized_number: <<Block as BlockT>::Header as HeaderT>::Number,
|
||||
/// Number of concurrent leave forks.
|
||||
pub number_leaves: usize
|
||||
}
|
||||
|
||||
/// Block status.
|
||||
|
||||
@@ -23,6 +23,7 @@ futures-diagnose = "1.0"
|
||||
sp-std = { version = "2.0.0-alpha.5", path = "../../std" }
|
||||
sp-version = { version = "2.0.0-alpha.5", path = "../../version" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", path = "../../runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", path = "../../utils" }
|
||||
codec = { package = "parity-scale-codec", version = "1.3.0", features = ["derive"] }
|
||||
parking_lot = "0.10.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
||||
@@ -15,10 +15,11 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc};
|
||||
use futures::{prelude::*, channel::mpsc, task::Context, task::Poll};
|
||||
use futures::{prelude::*, task::Context, task::Poll};
|
||||
use futures_timer::Delay;
|
||||
use parking_lot::{Mutex, Condvar};
|
||||
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
|
||||
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
|
||||
|
||||
use crate::block_import::BlockOrigin;
|
||||
use crate::import_queue::{
|
||||
@@ -32,7 +33,7 @@ use crate::import_queue::{
|
||||
/// task, with plugable verification.
|
||||
pub struct BasicQueue<B: BlockT, Transaction> {
|
||||
/// Channel to send messages to the background task.
|
||||
sender: mpsc::UnboundedSender<ToWorkerMsg<B>>,
|
||||
sender: TracingUnboundedSender<ToWorkerMsg<B>>,
|
||||
/// Results coming from the worker task.
|
||||
result_port: BufferedLinkReceiver<B>,
|
||||
/// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in
|
||||
@@ -195,8 +196,8 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
|
||||
block_import: BoxBlockImport<B, Transaction>,
|
||||
justification_import: Option<BoxJustificationImport<B>>,
|
||||
finality_proof_import: Option<BoxFinalityProofImport<B>>,
|
||||
) -> (impl Future<Output = ()> + Send, mpsc::UnboundedSender<ToWorkerMsg<B>>) {
|
||||
let (sender, mut port) = mpsc::unbounded();
|
||||
) -> (impl Future<Output = ()> + Send, TracingUnboundedSender<ToWorkerMsg<B>>) {
|
||||
let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker");
|
||||
|
||||
let mut worker = BlockImportWorker {
|
||||
result_sender,
|
||||
|
||||
@@ -37,8 +37,9 @@
|
||||
//! ```
|
||||
//!
|
||||
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use futures::prelude::*;
|
||||
use sp_runtime::traits::{Block as BlockT, NumberFor};
|
||||
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
|
||||
use std::{pin::Pin, task::Context, task::Poll};
|
||||
use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
|
||||
|
||||
@@ -46,7 +47,7 @@ use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
|
||||
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
|
||||
/// them to another link.
|
||||
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
|
||||
let tx = BufferedLinkSender { tx };
|
||||
let rx = BufferedLinkReceiver { rx };
|
||||
(tx, rx)
|
||||
@@ -54,7 +55,7 @@ pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceive
|
||||
|
||||
/// See [`buffered_link`].
|
||||
pub struct BufferedLinkSender<B: BlockT> {
|
||||
tx: mpsc::UnboundedSender<BlockImportWorkerMsg<B>>,
|
||||
tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> BufferedLinkSender<B> {
|
||||
@@ -125,7 +126,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
|
||||
|
||||
/// See [`buffered_link`].
|
||||
pub struct BufferedLinkReceiver<B: BlockT> {
|
||||
rx: mpsc::UnboundedReceiver<BlockImportWorkerMsg<B>>,
|
||||
rx: TracingUnboundedReceiver<BlockImportWorkerMsg<B>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> BufferedLinkReceiver<B> {
|
||||
|
||||
@@ -18,6 +18,7 @@ log = { version = "0.4.8", optional = true }
|
||||
serde = { version = "1.0.101", features = ["derive"], optional = true}
|
||||
sp-api = { version = "2.0.0-alpha.5", default-features = false, path = "../api" }
|
||||
sp-runtime = { version = "2.0.0-alpha.5", default-features = false, path = "../runtime" }
|
||||
sp-utils = { version = "2.0.0-alpha.5", default-features = false, path = "../utils" }
|
||||
|
||||
[features]
|
||||
default = [ "std" ]
|
||||
|
||||
@@ -22,11 +22,9 @@ use std::{
|
||||
sync::Arc,
|
||||
pin::Pin,
|
||||
};
|
||||
use futures::{
|
||||
Future, Stream,
|
||||
channel::mpsc,
|
||||
};
|
||||
use futures::{Future, Stream,};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sp_utils::mpsc;
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, Member, NumberFor},
|
||||
@@ -132,7 +130,7 @@ pub enum TransactionStatus<Hash, BlockHash> {
|
||||
pub type TransactionStatusStream<Hash, BlockHash> = dyn Stream<Item=TransactionStatus<Hash, BlockHash>> + Send + Unpin;
|
||||
|
||||
/// The import notification event stream.
|
||||
pub type ImportNotificationStream<H> = mpsc::UnboundedReceiver<H>;
|
||||
pub type ImportNotificationStream<H> = mpsc::TracingUnboundedReceiver<H>;
|
||||
|
||||
/// Transaction hash type for a pool.
|
||||
pub type TxHash<P> = <P as TransactionPool>::Hash;
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "sp-utils"
|
||||
version = "2.0.0-alpha.5"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2018"
|
||||
license = "GPL-3.0"
|
||||
homepage = "https://substrate.dev"
|
||||
repository = "https://github.com/paritytech/substrate/"
|
||||
description = "I/O for Substrate runtimes"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.4"
|
||||
futures-core = "0.3.4"
|
||||
lazy_static = "1.4.0"
|
||||
prometheus = "0.8.0"
|
||||
|
||||
[features]
|
||||
default = ["metered"]
|
||||
metered = []
|
||||
@@ -0,0 +1,20 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Utilities Primitives for Substrate
|
||||
|
||||
pub mod metrics;
|
||||
pub mod mpsc;
|
||||
@@ -0,0 +1,58 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Metering primitives and globals
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
Registry, Error as PrometheusError,
|
||||
core::{ AtomicU64, GenericGauge, GenericCounter },
|
||||
};
|
||||
|
||||
#[cfg(features = "metered")]
|
||||
use prometheus::{core::GenericGaugeVec, Opts};
|
||||
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TOKIO_THREADS_TOTAL: GenericCounter<AtomicU64> = GenericCounter::new(
|
||||
"tokio_threads_total", "Total number of threads created"
|
||||
).expect("Creating of statics doesn't fail. qed");
|
||||
|
||||
pub static ref TOKIO_THREADS_ALIVE: GenericGauge<AtomicU64> = GenericGauge::new(
|
||||
"tokio_threads_alive", "Number of threads alive right now"
|
||||
).expect("Creating of statics doesn't fail. qed");
|
||||
}
|
||||
|
||||
#[cfg(features = "metered")]
|
||||
lazy_static! {
|
||||
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
|
||||
Opts::new("unbounded_channel_len", "Items in each mpsc::unbounded instance"),
|
||||
&["entity", "action"] // 'name of channel, send|received|dropped
|
||||
).expect("Creating of statics doesn't fail. qed");
|
||||
|
||||
}
|
||||
|
||||
|
||||
/// Register the statics to report to registry
|
||||
pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
|
||||
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
|
||||
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
|
||||
|
||||
#[cfg(features = "metered")]
|
||||
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,232 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Features to meter unbounded channels
|
||||
|
||||
#[cfg(not(features = "metered"))]
|
||||
mod inner {
|
||||
// just aliased, non performance implications
|
||||
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
|
||||
pub type TracingUnboundedSender<T> = UnboundedSender<T>;
|
||||
pub type TracingUnboundedReceiver<T> = UnboundedReceiver<T>;
|
||||
|
||||
/// Alias `mpsc::unbounded`
|
||||
pub fn tracing_unbounded<T>(_key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
|
||||
mpsc::unbounded()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(features = "metered")]
|
||||
mod inner {
|
||||
//tracing implementation
|
||||
use futures::channel::mpsc::{self,
|
||||
UnboundedReceiver, UnboundedSender,
|
||||
TryRecvError, TrySendError, SendError
|
||||
};
|
||||
use futures::{sink::Sink, task::{Poll, Context}, stream::Stream};
|
||||
use std::pin::Pin;
|
||||
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
|
||||
|
||||
/// Wrapper Type around `UnboundedSender` that increases the global
|
||||
/// measure when a message is added
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TracingUnboundedSender<T>(&'static str, UnboundedSender<T>);
|
||||
|
||||
/// Wrapper Type around `UnboundedReceiver` that decreases the global
|
||||
/// measure when a message is polled
|
||||
#[derive(Debug)]
|
||||
pub struct TracingUnboundedReceiver<T>(&'static str, UnboundedReceiver<T>);
|
||||
|
||||
/// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via
|
||||
/// `UNBOUNDED_CHANNELS_COUNTER`
|
||||
pub fn tracing_unbounded<T>(key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
|
||||
let (s, r) = mpsc::unbounded();
|
||||
(TracingUnboundedSender(key.clone(), s), TracingUnboundedReceiver(key,r))
|
||||
}
|
||||
|
||||
impl<T> TracingUnboundedSender<T> {
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn poll_ready(&self, ctx: &mut Context) -> Poll<Result<(), SendError>> {
|
||||
self.1.poll_ready(ctx)
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn is_closed(&self) -> bool {
|
||||
self.1.is_closed()
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn close_channel(&self) {
|
||||
self.1.close_channel()
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn disconnect(&mut self) {
|
||||
self.1.disconnect()
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
|
||||
self.1.start_send(msg)
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
|
||||
self.1.unbounded_send(msg).map(|s|{
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).incr();
|
||||
s
|
||||
})
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedSender
|
||||
pub fn same_receiver(&self, other: &UnboundedSender<T>) -> bool {
|
||||
self.1.same_receiver(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TracingUnboundedReceiver<T> {
|
||||
|
||||
fn consume(&mut self) {
|
||||
// consume all items, make sure to reflect the updated count
|
||||
let mut count = 0;
|
||||
while let Ok(Some(..)) = self.try_next() {
|
||||
count += 1;
|
||||
}
|
||||
|
||||
// and discount the messages
|
||||
if count > 0 {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).incr_by(count);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedReceiver
|
||||
/// that consumes all messages first and updates the counter
|
||||
pub fn close(&mut self) {
|
||||
self.consume();
|
||||
self.1.close()
|
||||
}
|
||||
|
||||
/// Proxy function to mpsc::UnboundedReceiver
|
||||
/// that discounts the messages taken out
|
||||
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
|
||||
self.1.try_next().map(|s| {
|
||||
if s.is_some() {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).incr();
|
||||
}
|
||||
s
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for TracingUnboundedReceiver<T> {
|
||||
fn drop(&mut self) {
|
||||
self.consume();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for TracingUnboundedReceiver<T> {}
|
||||
|
||||
impl<T> Stream for TracingUnboundedReceiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<T>> {
|
||||
let s = self.get_mut();
|
||||
match Pin::new(&mut s.1).poll_next(cx) {
|
||||
Poll::Ready(msg) => {
|
||||
if msg.is_some() {
|
||||
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).incr();
|
||||
}
|
||||
Poll::Ready(msg)
|
||||
}
|
||||
Poll::Pending => {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T> Sink<T> for TracingUnboundedSender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
TracingUnboundedSender::poll_ready(&*self, cx)
|
||||
}
|
||||
|
||||
fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
msg: T,
|
||||
) -> Result<(), Self::Error> {
|
||||
TracingUnboundedSender::start_send(&mut *self, msg)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
self.disconnect();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sink<T> for &TracingUnboundedSender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
TracingUnboundedSender::poll_ready(*self, cx)
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
|
||||
self.unbounded_send(msg)
|
||||
.map_err(TrySendError::into_send_error)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
self.close_channel();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub use inner::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
@@ -10,7 +10,7 @@ repository = "https://github.com/paritytech/substrate/"
|
||||
|
||||
[dependencies]
|
||||
log = "0.4.8"
|
||||
prometheus = "0.7"
|
||||
prometheus = "0.8"
|
||||
futures-util = { version = "0.3.1", default-features = false, features = ["io"] }
|
||||
derive_more = "0.99"
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use futures_util::{FutureExt, future::Future};
|
||||
pub use prometheus::{
|
||||
self,
|
||||
Registry, Error as PrometheusError, Opts,
|
||||
Histogram, HistogramOpts, HistogramVec,
|
||||
core::{
|
||||
|
||||
Reference in New Issue
Block a user