Move the network status reporting to the service (#2916)

* Move the network status reporting to the service

* Fix tests

* Fix build
This commit is contained in:
Pierre Krieger
2019-06-21 23:13:11 +02:00
committed by Gavin Wood
parent 01fcdc2b1a
commit 437a6bc6b1
10 changed files with 167 additions and 112 deletions
+2 -2
View File
@@ -184,8 +184,8 @@ pub use service::{
NetworkMsg, ExHashT, ReportHandle,
};
pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret};
pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::sync::{Status as SyncStatus, SyncState};
pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, multiaddr, build_multiaddr};
pub use libp2p::{identity, PeerId, core::PublicKey};
+24 -28
View File
@@ -33,7 +33,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage};
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
use specialization::NetworkSpecialization;
use sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState};
use sync::{ChainSync, Context as SyncContext, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::Roles;
use rustc_hex::ToHex;
@@ -115,17 +115,6 @@ struct HandshakingPeer {
timestamp: time::Instant,
}
/// Syncing status and statistics
#[derive(Clone)]
pub struct ProtocolStatus<B: BlockT> {
/// Sync status.
pub sync: SyncStatus<B>,
/// Total number of connected peers
pub num_peers: usize,
/// Total number of active peers.
pub num_active_peers: usize,
}
/// Peer information
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
@@ -415,26 +404,33 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
})
}
/// Returns an object representing the status of the protocol.
pub fn status(&self) -> ProtocolStatus<B> {
ProtocolStatus {
sync: self.sync.status(),
num_peers: self.context_data.peers.values().count(),
num_active_peers: self
.context_data
.peers
.values()
.filter(|p| p.block_request.is_some())
.count(),
}
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.context_data.peers.values().count()
}
pub fn is_major_syncing(&self) -> bool {
self.sync.status().is_major_syncing()
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.context_data
.peers
.values()
.filter(|p| p.block_request.is_some())
.count()
}
pub fn is_offline(&self) -> bool {
self.sync.status().is_offline()
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
self.sync.status().state
}
/// Target sync block number.
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.sync.status().best_seen_block
}
/// Number of peers participating in syncing.
pub fn num_sync_peers(&self) -> u32 {
self.sync.status().num_peers
}
/// Starts a new data demand request.
@@ -194,22 +194,6 @@ pub struct Status<B: BlockT> {
pub num_peers: u32,
}
impl<B: BlockT> Status<B> {
/// Whether the synchronization status is doing major downloading work or
/// is near the head of the chain.
pub fn is_major_syncing(&self) -> bool {
match self.state {
SyncState::Idle => false,
SyncState::Downloading => true,
}
}
/// Are we all alone?
pub fn is_offline(&self) -> bool {
self.num_peers == 0
}
}
impl<B: BlockT> ChainSync<B> {
/// Create a new instance. Pass the initial known state of the chain.
pub(crate) fn new(role: Roles, info: &ClientInfo<B>) -> Self {
+42 -36
View File
@@ -39,13 +39,12 @@ use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as Gos
use crate::protocol::message::Message;
use crate::protocol::on_demand::RequestData;
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer};
use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut};
use crate::protocol::{PeerInfo, NetworkOut};
use crate::protocol::sync::SyncState;
use crate::config::Params;
use crate::error::Error;
use crate::protocol::specialization::NetworkSpecialization;
/// Interval at which we send status updates on the status stream.
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
/// Interval at which we update the `peers` field on the main thread.
const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500);
@@ -90,8 +89,6 @@ impl ReportHandle {
/// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Sinks to propagate status updates.
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
/// Are we connected to any peer?
is_offline: Arc<AtomicBool>,
/// Are we actively catching up with the chain?
@@ -122,7 +119,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
) -> Result<NetworkWorker<B, S, H>, Error> {
let (network_chan, network_port) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let status_sinks = Arc::new(Mutex::new(Vec::new()));
// Start in off-line mode, since we're not connected to any nodes yet.
let is_offline = Arc::new(AtomicBool::new(true));
@@ -148,7 +144,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
};
let service = Arc::new(NetworkService {
status_sinks: status_sinks.clone(),
bandwidth,
is_offline: is_offline.clone(),
is_major_syncing: is_major_syncing.clone(),
@@ -172,13 +167,46 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
finality_proof_provider: params.finality_proof_provider,
network_port,
protocol_rx,
status_sinks,
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
status_interval: tokio_timer::Interval::new_interval(STATUS_INTERVAL),
connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL),
})
}
/// Returns the downloaded bytes per second averaged over the past few seconds.
pub fn average_download_per_sec(&self) -> u64 {
self.service.bandwidth.average_download_per_sec()
}
/// Returns the uploaded bytes per second averaged over the past few seconds.
pub fn average_upload_per_sec(&self) -> u64 {
self.service.bandwidth.average_upload_per_sec()
}
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.protocol.num_connected_peers()
}
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.protocol.num_active_peers()
}
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
self.protocol.sync_state()
}
/// Target sync block number.
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.protocol.best_seen_block()
}
/// Number of peers participating in syncing.
pub fn num_sync_peers(&self) -> u32 {
self.protocol.num_sync_peers()
}
/// Return a `NetworkService` that can be shared through the code base and can be used to
/// manipulate the worker.
pub fn service(&self) -> &Arc<NetworkService<B, S>> {
@@ -187,16 +215,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
/// Returns the downloaded bytes per second averaged over the past few seconds.
pub fn average_download_per_sec(&self) -> u64 {
self.bandwidth.average_download_per_sec()
}
/// Returns the uploaded bytes per second averaged over the past few seconds.
pub fn average_upload_per_sec(&self) -> u64 {
self.bandwidth.average_upload_per_sec()
}
/// Returns the network identity of the node.
pub fn local_peer_id(&self) -> PeerId {
Swarm::<B>::local_peer_id(&*self.network.lock()).clone()
@@ -287,13 +305,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
self.is_major_syncing.load(Ordering::Relaxed)
}
/// Get sync status
pub fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<B>> {
let (sink, stream) = mpsc::unbounded();
self.status_sinks.lock().push(sink);
stream
}
/// Get network state.
pub fn network_state(&self) -> NetworkState {
let mut swarm = self.network.lock();
@@ -497,12 +508,9 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>,
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
peerset: PeersetHandle,
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Interval at which we send status updates on the `status_sinks`.
status_interval: tokio_timer::Interval,
/// Interval at which we update the `connected_peers` Arc.
connected_peers_interval: tokio_timer::Interval,
}
@@ -580,11 +588,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
}
}
while let Ok(Async::Ready(_)) = self.status_interval.poll() {
let status = self.protocol.status();
self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
}
{
let mut network_service = self.network_service.lock();
let mut link = NetworkLink {
@@ -742,8 +745,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
}
}
self.is_offline.store(self.protocol.is_offline(), Ordering::Relaxed);
self.is_major_syncing.store(self.protocol.is_major_syncing(), Ordering::Relaxed);
self.is_offline.store(self.protocol.num_connected_peers() == 0, Ordering::Relaxed);
self.is_major_syncing.store(match self.protocol.sync_state() {
SyncState::Idle => false,
SyncState::Downloading => true,
}, Ordering::Relaxed);
Ok(Async::NotReady)
}
+17 -11
View File
@@ -46,7 +46,8 @@ use crate::message::Message;
use libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, Blake2Hasher};
use crate::protocol::{Context, Protocol, ProtocolConfig, ProtocolStatus, CustomMessageOutcome, NetworkOut};
use crate::SyncState;
use crate::protocol::{Context, Protocol, ProtocolConfig, CustomMessageOutcome, NetworkOut};
use runtime_primitives::generic::{BlockId, OpaqueDigestItemId};
use runtime_primitives::traits::{Block as BlockT, Header, NumberFor};
use runtime_primitives::{Justification, ConsensusEngineId};
@@ -365,7 +366,8 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> {
/// instantiation paths or field names is too much hassle, hence
/// we allow it to be unused.
#[cfg_attr(not(test), allow(unused))]
protocol_status: Arc<RwLock<ProtocolStatus<Block>>>,
/// `(is_offline, is_major_syncing, num_peers)`
protocol_status: Arc<RwLock<(bool, bool, usize)>>,
import_queue: Arc<Mutex<Box<BasicQueue<Block>>>>,
pub data: D,
best_hash: Mutex<Option<H256>>,
@@ -509,7 +511,7 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> {
impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
fn new(
protocol_status: Arc<RwLock<ProtocolStatus<Block>>>,
protocol_status: Arc<RwLock<(bool, bool, usize)>>,
client: PeersClient,
import_queue: Arc<Mutex<Box<BasicQueue<Block>>>>,
use_tokio: bool,
@@ -560,19 +562,19 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// SyncOracle: are we connected to any peer?
#[cfg(test)]
fn is_offline(&self) -> bool {
self.protocol_status.read().sync.is_offline()
self.protocol_status.read().0
}
/// SyncOracle: are we in the process of catching-up with the chain?
#[cfg(test)]
fn is_major_syncing(&self) -> bool {
self.protocol_status.read().sync.is_major_syncing()
self.protocol_status.read().1
}
/// Get protocol status.
#[cfg(test)]
fn protocol_status(&self) -> ProtocolStatus<Block> {
self.protocol_status.read().clone()
fn num_peers(&self) -> usize {
self.protocol_status.read().2
}
/// Called on connection to other indicated peer.
@@ -873,7 +875,7 @@ pub trait TestNetFactory: Sized {
/// Add created peer.
fn add_peer(
&mut self,
protocol_status: Arc<RwLock<ProtocolStatus<Block>>>,
protocol_status: Arc<RwLock<(bool, bool, usize)>>,
import_queue: Arc<Mutex<Box<BasicQueue<Block>>>>,
tx_pool: EmptyTransactionPool,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<Block>>>,
@@ -1009,7 +1011,11 @@ pub trait TestNetFactory: Sized {
return Ok(Async::Ready(()))
}
*protocol_status.write() = protocol.status();
*protocol_status.write() = (
protocol.num_connected_peers() == 0,
protocol.sync_state() == SyncState::Downloading,
protocol.num_connected_peers()
);
Ok(Async::NotReady)
}));
});
@@ -1054,7 +1060,7 @@ pub trait TestNetFactory: Sized {
specialization,
).unwrap();
let protocol_status = Arc::new(RwLock::new(protocol.status()));
let protocol_status = Arc::new(RwLock::new((true, false, 0)));
self.add_peer(
protocol_status.clone(),
import_queue.clone(),
@@ -1110,7 +1116,7 @@ pub trait TestNetFactory: Sized {
specialization,
).unwrap();
let protocol_status = Arc::new(RwLock::new(protocol.status()));
let protocol_status = Arc::new(RwLock::new((true, false, 0)));
self.add_peer(
protocol_status.clone(),
import_queue.clone(),
+4 -5
View File
@@ -45,7 +45,7 @@ fn sync_peers_works() {
net.sync();
for peer in 0..3 {
// Assert peers is up to date.
assert_eq!(net.peer(peer).protocol_status.read().num_peers, 2);
assert_eq!(net.peer(peer).num_peers(), 2);
// And then disconnect.
for other in 0..3 {
if other != peer {
@@ -56,8 +56,7 @@ fn sync_peers_works() {
net.sync();
// Now peers are disconnected.
for peer in 0..3 {
let status = net.peer(peer).protocol_status.read();
assert_eq!(status.num_peers, 0);
assert_eq!(net.peer(peer).num_peers(), 0);
}
}
@@ -433,7 +432,7 @@ fn can_not_sync_from_light_peer() {
assert_eq!(net.peer(2).client.info().chain.best_number, 0);
// and that the #1 is still connected to #2
// (because #2 has not tried to fetch block data from the #1 light node)
assert_eq!(net.peer(1).protocol_status().num_peers, 2);
assert_eq!(net.peer(1).num_peers(), 2);
// and now try to fetch block data from light peer #1
// (this should result in disconnect)
@@ -450,7 +449,7 @@ fn can_not_sync_from_light_peer() {
);
net.sync();
// check that light #1 has disconnected from #2
assert_eq!(net.peer(1).protocol_status().num_peers, 1);
assert_eq!(net.peer(1).num_peers(), 1);
}
#[test]