From a81f7f48a0b8560548033124f4e478e105537cfc Mon Sep 17 00:00:00 2001 From: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Sat, 2 Mar 2019 21:35:16 +0800 Subject: [PATCH] Refactor Sync status updates into a stream of updates (#1858) * refactor sync provider * relative use of interval * typo * set propagate timeout to 2500ms * address comments * fix instant calc * update intervals --- substrate/Cargo.lock | 1 + substrate/core/cli/src/informant.rs | 23 +++++++-------- substrate/core/network/src/protocol.rs | 32 +++++++++++++++------ substrate/core/network/src/service.rs | 35 +++++++++++++++------- substrate/core/network/src/test/mod.rs | 3 ++ substrate/core/rpc/Cargo.toml | 1 + substrate/core/rpc/src/system/mod.rs | 5 ++-- substrate/core/rpc/src/system/tests.rs | 40 ++++++++++++++------------ substrate/core/service/test/src/lib.rs | 4 +-- 9 files changed, 90 insertions(+), 54 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 3b82569af5..fa169c393b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3964,6 +3964,7 @@ version = "0.1.0" dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 10.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-derive 10.0.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index 3955152691..647925bc52 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -17,44 +17,41 @@ //! Console informant. Prints sync progress and block events. Runs on the calling thread. use ansi_term::Colour; -use std::{fmt, time::{Duration, Instant}}; +use std::fmt; +use std::time; use futures::{Future, Stream}; use service::{Service, Components}; use tokio::runtime::TaskExecutor; -use tokio::timer::Interval; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use network::{SyncState, SyncProvider}; use client::{backend::Backend, BlockchainEvents}; use substrate_telemetry::*; -use log::{debug, info, warn}; +use log::{info, warn}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Header, As}; -const TIMER_INTERVAL_MS: u64 = 5000; - /// Spawn informant on the event loop pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExecutor) where C: Components, { - let interval = Interval::new(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS)); - let network = service.network(); let client = service.client(); let txpool = service.transaction_pool(); let mut last_number = None; + let mut last_update = time::Instant::now(); let mut sys = System::new(); let self_pid = get_current_pid(); - let display_notifications = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { - let sync_status = network.status(); + let display_notifications = network.status().for_each(move |sync_status| { if let Ok(info) = client.info() { let best_number: u64 = info.chain.best_number.as_(); let best_hash = info.chain.best_hash; let num_peers = sync_status.num_peers; - let speed = move || speed(best_number, last_number); + let speed = move || speed(best_number, last_number, last_update); + last_update = time::Instant::now(); let (status, target) = match (sync_status.sync.state, sync_status.sync.best_seen_block) { (SyncState::Idle, _) => ("Idle".into(), "".into()), (SyncState::Downloading, None) => (format!("Syncing{}", speed()), "".into()), @@ -153,9 +150,11 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe handle.spawn(exit.until(informant_work).map(|_| ())); } -fn speed(best_number: u64, last_number: Option) -> String { +fn speed(best_number: u64, last_number: Option, last_update: time::Instant) -> String { + let since_last_millis = last_update.elapsed().as_secs() * 1000; + let since_last_subsec_millis = last_update.elapsed().subsec_millis() as u64; let speed = match last_number { - Some(num) => (best_number.saturating_sub(num) * 10_000 / TIMER_INTERVAL_MS) as f64, + Some(num) => (best_number.saturating_sub(num) * 10_000 / (since_last_millis + since_last_subsec_millis)) as f64, None => 0.0 }; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 47bfde1bfb..237e06751e 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -15,6 +15,8 @@ // along with Substrate. If not, see . use crossbeam_channel::{self as channel, Receiver, Sender, select}; +use futures::sync::mpsc; +use parking_lot::Mutex; use network_libp2p::{NodeIndex, PeerId, Severity}; use primitives::storage::StorageKey; use runtime_primitives::generic::BlockId; @@ -40,8 +42,12 @@ use client::light::fetcher::ChangesProof; use crate::{error, util::LruHashSet}; const REQUEST_TIMEOUT_SEC: u64 = 40; -const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000); -const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000); +/// Interval at which we perform time based maintenance +const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); +/// Interval at which we propagate exstrinsics; +const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); +/// Interval at which we send status updates on the SyncProvider status stream. +const STATUS_INTERVAL: time::Duration = time::Duration::from_millis(5000); /// Current protocol version. pub(crate) const CURRENT_VERSION: u32 = 2; @@ -57,6 +63,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { + status_sinks: Arc>>>>, network_chan: NetworkChan, port: Receiver>, from_network_port: Receiver>, @@ -210,8 +217,8 @@ pub enum ProtocolMsg> { BlocksProcessed(Vec, bool), /// Tell protocol to restart sync. RestartSync, - /// Ask the protocol for its status. - Status(Sender>), + /// Propagate status updates. + Status, /// Tell protocol to propagate extrinsics. PropagateExtrinsics, /// Tell protocol that a block was imported (sent by the import-queue). @@ -262,6 +269,7 @@ enum Incoming> { impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( + status_sinks: Arc>>>>, is_offline: Arc, is_major_syncing: Arc, connected_peers: Arc>>>, @@ -281,6 +289,7 @@ impl, H: ExHashT> Protocol { .name("Protocol".into()) .spawn(move || { let mut protocol = Protocol { + status_sinks, network_chan, from_network_port, port, @@ -300,7 +309,8 @@ impl, H: ExHashT> Protocol { }; let tick_timeout = channel::tick(TICK_TIMEOUT); let propagate_timeout = channel::tick(PROPAGATE_TIMEOUT); - while protocol.run(&tick_timeout, &propagate_timeout) { + let status_interval = channel::tick(STATUS_INTERVAL); + while protocol.run(&tick_timeout, &propagate_timeout, &status_interval) { // Running until all senders have been dropped... } }) @@ -312,6 +322,7 @@ impl, H: ExHashT> Protocol { &mut self, tick_timeout: &Receiver, propagate_timeout: &Receiver, + status_interval: &Receiver, ) -> bool { let msg = select! { recv(self.port) -> event => { @@ -338,6 +349,9 @@ impl, H: ExHashT> Protocol { recv(propagate_timeout) -> _ => { Incoming::FromClient(ProtocolMsg::PropagateExtrinsics) }, + recv(status_interval) -> _ => { + Incoming::FromClient(ProtocolMsg::Status) + }, }; self.handle_msg(msg) } @@ -351,7 +365,7 @@ impl, H: ExHashT> Protocol { fn handle_client_msg(&mut self, msg: ProtocolMsg) -> bool { match msg { - ProtocolMsg::Status(sender) => self.status(sender), + ProtocolMsg::Status => self.on_status(), ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { @@ -428,8 +442,8 @@ impl, H: ExHashT> Protocol { None } - /// Returns protocol status - fn status(&mut self, sender: Sender>) { + /// Propagates protocol statuses. + fn on_status(&mut self) { let status = ProtocolStatus { sync: self.sync.status(), num_peers: self.context_data.peers.values().count(), @@ -440,7 +454,7 @@ impl, H: ExHashT> Protocol { .filter(|p| p.block_request.is_some()) .count(), }; - let _ = sender.send(status); + self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); } fn on_custom_message(&mut self, who: NodeIndex, message: Message) { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 23eb1229ab..993af60129 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; use log::{warn, debug, error, trace, info}; -use futures::{Async, Future, Stream, stream, sync::oneshot}; +use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, ErrorKind, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; @@ -42,14 +42,17 @@ pub use network_libp2p::PeerId; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; + /// Sync status pub trait SyncProvider: Send + Sync { - /// Get sync status - fn status(&self) -> ProtocolStatus; + /// Get a stream of sync statuses. + fn status(&self) -> mpsc::UnboundedReceiver>; /// Get network state. fn network_state(&self) -> NetworkState; /// Get currently connected peers fn peers(&self) -> Vec<(NodeIndex, PeerInfo)>; + /// Are we in the process of downloading the chain? + fn is_major_syncing(&self) -> bool; } /// Minimum Requirements for a Hash within Networking @@ -121,6 +124,8 @@ impl> Link for NetworkLink { /// Substrate network service. Handles network IO and manages connectivity. pub struct Service> { + /// Sinks to propagate status updates. + status_sinks: Arc>>>>, /// Are we connected to any peer? is_offline: Arc, /// Are we actively catching up with the chain? @@ -145,11 +150,13 @@ impl> Service { import_queue: Box>, ) -> Result<(Arc>, NetworkChan), Error> { let (network_chan, network_port) = network_channel(protocol_id); + let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( + status_sinks.clone(), is_offline.clone(), is_major_syncing.clone(), peers.clone(), @@ -171,6 +178,7 @@ impl> Service { )?; let service = Arc::new(Service { + status_sinks, is_offline, is_major_syncing, peers, @@ -260,11 +268,17 @@ impl> Service { .protocol_sender .send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); } + + /// Are we in the process of downloading the chain? + /// Used by both SyncProvider and SyncOracle. + fn is_major_syncing(&self) -> bool { + self.is_major_syncing.load(Ordering::Relaxed) + } } impl> ::consensus::SyncOracle for Service { fn is_major_syncing(&self) -> bool { - self.is_major_syncing.load(Ordering::Relaxed) + self.is_major_syncing() } fn is_offline(&self) -> bool { self.is_offline.load(Ordering::Relaxed) @@ -283,13 +297,14 @@ impl> Drop for Service { } impl> SyncProvider for Service { + fn is_major_syncing(&self) -> bool { + self.is_major_syncing() + } /// Get sync status - fn status(&self) -> ProtocolStatus { - let (sender, port) = channel::unbounded(); - let _ = self.protocol_sender.send(ProtocolMsg::Status(sender)); - port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, - or the ProtocolMsg::Stop message is received, - 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") + fn status(&self) -> mpsc::UnboundedReceiver> { + let (sink, stream) = mpsc::unbounded(); + self.status_sinks.lock().push(sink); + stream } fn network_state(&self) -> NetworkState { diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 3adf917a5c..c7d9484e2b 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -558,11 +558,14 @@ pub trait TestNetFactory: Sized { let (network_sender, network_port) = network_channel(ProtocolId::default()); let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); + let status_sinks = Arc::new(Mutex::new(Vec::new())); let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let specialization = self::SpecializationFactory::create(); let peers: Arc>>> = Arc::new(Default::default()); + let (protocol_sender, network_to_protocol_sender) = Protocol::new( + status_sinks, is_offline.clone(), is_major_syncing.clone(), peers.clone(), diff --git a/substrate/core/rpc/Cargo.toml b/substrate/core/rpc/Cargo.toml index d85a9c018c..47edca65dc 100644 --- a/substrate/core/rpc/Cargo.toml +++ b/substrate/core/rpc/Cargo.toml @@ -27,6 +27,7 @@ tokio = "0.1.7" [dev-dependencies] assert_matches = "1.1" +futures = "0.1.17" test_client = { package = "substrate-test-client", path = "../test-client" } consensus = { package = "substrate-consensus-common", path = "../consensus/common" } rustc-hex = "2.0" diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index 57abae0019..6819d23593 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -109,10 +109,9 @@ impl SystemApi::Number> for Sy } fn system_health(&self) -> Result { - let status = self.sync.status(); Ok(Health { - peers: status.num_peers, - is_syncing: status.sync.is_major_syncing(), + peers: self.sync.peers().len(), + is_syncing: self.sync.is_major_syncing(), should_have_peers: self.should_have_peers, }) } diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index d1ca9339dc..319b5bb85a 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -16,10 +16,11 @@ use super::*; -use network::{self, SyncState, SyncStatus, ProtocolStatus, NodeIndex, PeerId, PeerInfo as NetworkPeerInfo}; +use network::{self, ProtocolStatus, NodeIndex, PeerId, PeerInfo as NetworkPeerInfo}; use network::config::Roles; use test_client::runtime::Block; use assert_matches::assert_matches; +use futures::sync::mpsc; struct Status { pub peers: usize, @@ -40,16 +41,9 @@ impl Default for Status { } impl network::SyncProvider for Status { - fn status(&self) -> ProtocolStatus { - ProtocolStatus { - sync: SyncStatus { - state: if self.is_syncing { SyncState::Downloading } else { SyncState::Idle }, - best_seen_block: None, - num_peers: self.peers as u32, - }, - num_peers: self.peers, - num_active_peers: 0, - } + fn status(&self) -> mpsc::UnboundedReceiver> { + let (_sink, stream) = mpsc::unbounded(); + stream } fn network_state(&self) -> network::NetworkState { @@ -67,13 +61,23 @@ impl network::SyncProvider for Status { } fn peers(&self) -> Vec<(NodeIndex, NetworkPeerInfo)> { - vec![(1, NetworkPeerInfo { - peer_id: self.peer_id.clone(), - roles: Roles::FULL, - protocol_version: 1, - best_hash: Default::default(), - best_number: 1 - })] + let mut peers = vec![]; + for _peer in 0..self.peers { + peers.push( + (1, NetworkPeerInfo { + peer_id: self.peer_id.clone(), + roles: Roles::FULL, + protocol_version: 1, + best_hash: Default::default(), + best_number: 1 + }) + ); + } + peers + } + + fn is_major_syncing(&self) -> bool { + self.is_syncing } } diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 795fa05bc7..26ba01697c 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -181,7 +181,7 @@ pub fn connectivity(spec: FactoryChainSpec) { service.network().add_reserved_peer(first_address.clone()).expect("Error adding reserved peer"); } network.run_until_all_full(|_index, service| - service.network().status().num_peers == NUM_NODES as usize - 1 + service.network().peers().len() == NUM_NODES as usize - 1 ); network.runtime }; @@ -201,7 +201,7 @@ pub fn connectivity(spec: FactoryChainSpec) { address = service.network().node_id().expect("No node address"); } network.run_until_all_full(|_index, service| { - service.network().status().num_peers == NUM_NODES as usize - 1 + service.network().peers().len() == NUM_NODES as usize - 1 }); } temp.close().expect("Error removing temp dir");