diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index a6aca5edb0..7c8ec3ac1e 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -22,7 +22,7 @@ use std::time; use futures::{Future, Stream}; use service::{Service, Components}; use tokio::runtime::TaskExecutor; -use network::{SyncState, SyncProvider}; +use network::SyncState; use client::{backend::Backend, BlockchainEvents}; use log::{info, warn}; diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 2ef682f33f..105b021315 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -43,7 +43,7 @@ pub mod test; pub use chain::{Client as ClientHandle, FinalityProofProvider}; pub use service::{ NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork, - NetworkMsg, SyncProvider, ExHashT, ReportHandle, + NetworkMsg, ExHashT, ReportHandle, }; pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret}; pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization}; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 7009310a8d..59618aa5b4 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -46,7 +46,7 @@ use crate::protocol::specialization::NetworkSpecialization; mod tests; -/// Interval at which we send status updates on the SyncProvider status stream. +/// Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); /// Interval at which we update the `peers` field on the main thread. const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500); @@ -56,23 +56,6 @@ pub use libp2p::PeerId; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; -/// Sync status -pub trait SyncProvider: Send + Sync { - /// Get a stream of sync statuses. - fn status(&self) -> mpsc::UnboundedReceiver>; - /// Get network state. - fn network_state(&self) -> NetworkState; - - /// Get currently connected peers. - /// - /// > **Warning**: This method can return outdated information and should only ever be used - /// > when obtaining outdated information is acceptable. - fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo)>; - - /// Are we in the process of downloading the chain? - fn is_major_syncing(&self) -> bool; -} - /// Minimum Requirements for a Hash within Networking pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static @@ -382,35 +365,19 @@ impl> NetworkService { } /// Are we in the process of downloading the chain? - /// Used by both SyncProvider and SyncOracle. - fn is_major_syncing(&self) -> bool { + pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } -} - -impl> ::consensus::SyncOracle for NetworkService { - fn is_major_syncing(&self) -> bool { - self.is_major_syncing() - } - - fn is_offline(&self) -> bool { - self.is_offline.load(Ordering::Relaxed) - } -} - -impl> SyncProvider for NetworkService { - fn is_major_syncing(&self) -> bool { - self.is_major_syncing() - } /// Get sync status - fn status(&self) -> mpsc::UnboundedReceiver> { + pub fn status(&self) -> mpsc::UnboundedReceiver> { let (sink, stream) = mpsc::unbounded(); self.status_sinks.lock().push(sink); stream } - fn network_state(&self) -> NetworkState { + /// Get network state. + pub fn network_state(&self) -> NetworkState { let mut swarm = self.network.lock(); let open = swarm.user_protocol().open_peers().cloned().collect::>(); @@ -465,12 +432,26 @@ impl> SyncProvider for Netwo } } - fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo)> { + /// Get currently connected peers. + /// + /// > **Warning**: This method can return outdated information and should only ever be used + /// > when obtaining outdated information is acceptable. + pub fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo)> { let peers = (*self.peers.read()).clone(); peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() } } +impl> ::consensus::SyncOracle for NetworkService { + fn is_major_syncing(&self) -> bool { + self.is_major_syncing() + } + + fn is_offline(&self) -> bool { + self.is_offline.load(Ordering::Relaxed) + } +} + /// Trait for managing network pub trait ManageNetwork { /// Set to allow unreserved peers to connect diff --git a/substrate/core/rpc/Cargo.toml b/substrate/core/rpc/Cargo.toml index 6ef9af2a0a..e72f62e500 100644 --- a/substrate/core/rpc/Cargo.toml +++ b/substrate/core/rpc/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" +futures = "0.1" jsonrpc-core = "12.0.0" jsonrpc-core-client = "12.0.0" jsonrpc-pubsub = "12.0.0" diff --git a/substrate/core/rpc/src/helpers.rs b/substrate/core/rpc/src/helpers.rs index e579c743ac..ccfde6afb5 100644 --- a/substrate/core/rpc/src/helpers.rs +++ b/substrate/core/rpc/src/helpers.rs @@ -14,6 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use futures::{prelude::*, sync::oneshot}; + +/// Wraps around `oneshot::Receiver` and adjusts the error type to produce an internal error if the +/// sender gets dropped. +pub struct Receiver(pub oneshot::Receiver); + +impl Future for Receiver { + type Item = T; + type Error = jsonrpc_core::Error; + + fn poll(&mut self) -> Poll { + self.0.poll().map_err(|_| jsonrpc_core::Error::internal_error()) + } +} + /// Unwraps the trailing parameter or falls back with the closure result. pub fn unwrap_or_else(or_else: F, optional: Option) -> Result where F: FnOnce() -> Result, diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index d6f2b75515..d0578590ae 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -22,7 +22,8 @@ pub mod helpers; #[cfg(test)] mod tests; -use std::sync::Arc; +use crate::helpers::Receiver; +use futures::sync::{mpsc, oneshot}; use jsonrpc_derive::rpc; use network; use runtime_primitives::traits::{self, Header as HeaderT}; @@ -56,39 +57,49 @@ pub trait SystemApi { /// Node is considered healthy if it is: /// - connected to some peers (unless running in dev mode) /// - not performing a major sync - #[rpc(name = "system_health")] - fn system_health(&self) -> Result; + #[rpc(name = "system_health", returns = "Health")] + fn system_health(&self) -> Receiver; /// Returns currently connected peers - #[rpc(name = "system_peers")] - fn system_peers(&self) -> Result>>; + #[rpc(name = "system_peers", returns = "Vec>")] + fn system_peers(&self) -> Receiver>>; /// Returns current state of the network. /// /// **Warning**: This API is not stable. // TODO: make this stable and move structs https://github.com/paritytech/substrate/issues/1890 - #[rpc(name = "system_networkState")] - fn system_network_state(&self) -> Result; + #[rpc(name = "system_networkState", returns = "network::NetworkState")] + fn system_network_state(&self) -> Receiver; } /// System API implementation pub struct System { info: SystemInfo, - sync: Arc>, - should_have_peers: bool, + send_back: mpsc::UnboundedSender>, +} + +/// Request to be processed. +pub enum Request { + /// Must return the health of the network. + Health(oneshot::Sender), + /// Must return information about the peers we are connected to. + Peers(oneshot::Sender::Number>>>), + /// Must return the state of the network. + NetworkState(oneshot::Sender), } impl System { - /// Creates new `System` given the `SystemInfo`. + /// Creates new `System`. + /// + /// The `send_back` will be used to transmit some of the requests. The user is responsible for + /// reading from that channel and answering the requests. pub fn new( info: SystemInfo, - sync: Arc>, - should_have_peers: bool, + send_back: mpsc::UnboundedSender> ) -> Self { System { info, - should_have_peers, - sync, + send_back, } } } @@ -110,25 +121,21 @@ impl SystemApi::Number> for Sy Ok(self.info.properties.clone()) } - fn system_health(&self) -> Result { - Ok(Health { - peers: self.sync.peers_debug_info().len(), - is_syncing: self.sync.is_major_syncing(), - should_have_peers: self.should_have_peers, - }) + fn system_health(&self) -> Receiver { + let (tx, rx) = oneshot::channel(); + let _ = self.send_back.unbounded_send(Request::Health(tx)); + Receiver(rx) } - fn system_peers(&self) -> Result::Number>>> { - Ok(self.sync.peers_debug_info().into_iter().map(|(peer_id, p)| PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - }).collect()) + fn system_peers(&self) -> Receiver::Number>>> { + let (tx, rx) = oneshot::channel(); + let _ = self.send_back.unbounded_send(Request::Peers(tx)); + Receiver(rx) } - fn system_network_state(&self) -> Result { - Ok(self.sync.network_state()) + fn system_network_state(&self) -> Receiver { + let (tx, rx) = oneshot::channel(); + let _ = self.send_back.unbounded_send(Request::NetworkState(tx)); + Receiver(rx) } } diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index 14cd421fd1..2dc4139da3 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -16,11 +16,12 @@ use super::*; -use network::{self, ProtocolStatus, PeerId, PeerInfo as NetworkPeerInfo}; +use network::{self, PeerId}; use network::config::Roles; use test_client::runtime::Block; use assert_matches::assert_matches; -use futures::sync::mpsc; +use futures::{prelude::*, sync::mpsc}; +use std::thread; struct Status { pub peers: usize, @@ -40,55 +41,61 @@ impl Default for Status { } } -impl network::SyncProvider for Status { - fn status(&self) -> mpsc::UnboundedReceiver> { - let (_sink, stream) = mpsc::unbounded(); - stream - } - - fn network_state(&self) -> network::NetworkState { - network::NetworkState { - peer_id: String::new(), - listened_addresses: Default::default(), - external_addresses: Default::default(), - connected_peers: Default::default(), - not_connected_peers: Default::default(), - average_download_per_sec: 0, - average_upload_per_sec: 0, - peerset: serde_json::Value::Null, - } - } - - fn peers_debug_info(&self) -> Vec<(PeerId, NetworkPeerInfo)> { - let mut peers = vec![]; - for _peer in 0..self.peers { - peers.push( - (self.peer_id.clone(), NetworkPeerInfo { - roles: Roles::FULL, - protocol_version: 1, - best_hash: Default::default(), - best_number: 1 - }) - ); - } - peers - } - - fn is_major_syncing(&self) -> bool { - self.is_syncing - } -} - - fn api>>(sync: T) -> System { let status = sync.into().unwrap_or_default(); let should_have_peers = !status.is_dev; + let (tx, rx) = mpsc::unbounded(); + thread::spawn(move || { + tokio::run(rx.for_each(move |request| { + match request { + Request::Health(sender) => { + let _ = sender.send(Health { + peers: status.peers, + is_syncing: status.is_syncing, + should_have_peers, + }); + }, + Request::Peers(sender) => { + let mut peers = vec![]; + for _peer in 0..status.peers { + peers.push(PeerInfo { + peer_id: status.peer_id.to_base58(), + roles: format!("{:?}", Roles::FULL), + protocol_version: 1, + best_hash: Default::default(), + best_number: 1, + }); + } + let _ = sender.send(peers); + } + Request::NetworkState(sender) => { + let _ = sender.send(network::NetworkState { + peer_id: String::new(), + listened_addresses: Default::default(), + external_addresses: Default::default(), + connected_peers: Default::default(), + not_connected_peers: Default::default(), + average_download_per_sec: 0, + average_upload_per_sec: 0, + peerset: serde_json::Value::Null, + }); + } + }; + + Ok(()) + })) + }); System::new(SystemInfo { impl_name: "testclient".into(), impl_version: "0.2.0".into(), chain_name: "testchain".into(), properties: Default::default(), - }, Arc::new(status), should_have_peers) + }, tx) +} + +fn wait_receiver(rx: Receiver) -> T { + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(rx).unwrap() } #[test] @@ -126,7 +133,7 @@ fn system_properties_works() { #[test] fn system_health() { assert_matches!( - api(None).system_health().unwrap(), + wait_receiver(api(None).system_health()), Health { peers: 0, is_syncing: false, @@ -135,12 +142,12 @@ fn system_health() { ); assert_matches!( - api(Status { + wait_receiver(api(Status { peer_id: PeerId::random(), peers: 5, is_syncing: true, is_dev: true, - }).system_health().unwrap(), + }).system_health()), Health { peers: 5, is_syncing: true, @@ -149,12 +156,12 @@ fn system_health() { ); assert_eq!( - api(Status { + wait_receiver(api(Status { peer_id: PeerId::random(), peers: 5, is_syncing: false, is_dev: false, - }).system_health().unwrap(), + }).system_health()), Health { peers: 5, is_syncing: false, @@ -163,12 +170,12 @@ fn system_health() { ); assert_eq!( - api(Status { + wait_receiver(api(Status { peer_id: PeerId::random(), peers: 0, is_syncing: false, is_dev: true, - }).system_health().unwrap(), + }).system_health()), Health { peers: 0, is_syncing: false, @@ -181,12 +188,12 @@ fn system_health() { fn system_peers() { let peer_id = PeerId::random(); assert_eq!( - api(Status { + wait_receiver(api(Status { peer_id: peer_id.clone(), peers: 1, is_syncing: false, is_dev: true, - }).system_peers().unwrap(), + }).system_peers()), vec![PeerInfo { peer_id: peer_id.to_base58(), roles: "FULL".into(), @@ -200,7 +207,7 @@ fn system_peers() { #[test] fn system_network_state() { assert_eq!( - api(None).system_network_state().unwrap(), + wait_receiver(api(None).system_network_state()), network::NetworkState { peer_id: String::new(), listened_addresses: Default::default(), diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index c095bdb7f4..85ab9feb99 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -34,6 +34,7 @@ use crate::config::Configuration; use primitives::{Blake2Hasher, H256}; use rpc::{self, apis::system::SystemInfo}; use parking_lot::Mutex; +use futures::sync::mpsc; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -139,8 +140,7 @@ pub trait StartRPC { fn start_rpc( client: Arc>, - network: Arc>>, - should_have_peers: bool, + system_send_back: mpsc::UnboundedSender>>, system_info: SystemInfo, rpc_http: Option, rpc_ws: Option, @@ -159,8 +159,7 @@ impl StartRPC for C where fn start_rpc( client: Arc>, - network: Arc>>, - should_have_peers: bool, + system_send_back: mpsc::UnboundedSender>>, rpc_system_info: SystemInfo, rpc_http: Option, rpc_ws: Option, @@ -178,7 +177,7 @@ impl StartRPC for C where client.clone(), transaction_pool.clone(), subscriptions ); let system = rpc::apis::system::System::new( - rpc_system_info.clone(), network.clone(), should_have_peers + rpc_system_info.clone(), system_send_back.clone() ); rpc::rpc_handler::, ComponentExHash, _, _, _, _>( state, diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 1a3c00e4e7..644341f91a 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -41,7 +41,6 @@ use primitives::Pair; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Header, SaturatedConversion}; use substrate_executor::NativeExecutor; -use network::SyncProvider; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; @@ -384,10 +383,10 @@ impl Service { impl_version: config.impl_version.into(), properties: config.chain_spec.properties(), }; + let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); let rpc = Components::RuntimeServices::start_rpc( client.clone(), - network.clone(), - has_bootnodes, + system_rpc_tx, system_info, config.rpc_http, config.rpc_ws, @@ -396,6 +395,11 @@ impl Service { task_executor.clone(), transaction_pool.clone(), )?; + task_executor.spawn(build_system_rpc_handler::( + network.clone(), + system_rpc_rx, + has_bootnodes + )); let telemetry_connection_sinks: Arc>>> = Default::default(); @@ -611,6 +615,39 @@ impl network::TransactionPool, ComponentBlock< } } +/// Builds a never-ending `Future` that answers the RPC requests coming on the receiver. +fn build_system_rpc_handler( + network: Arc>, + rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, +) -> impl Future { + rx.for_each(move |request| { + match request { + rpc::apis::system::Request::Health(sender) => { + let _ = sender.send(rpc::apis::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.is_major_syncing(), + should_have_peers, + }); + }, + rpc::apis::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + }).collect()); + } + rpc::apis::system::Request::NetworkState(sender) => { + let _ = sender.send(network.network_state()); + } + }; + + Ok(()) + }) +} + /// Constructs a service factory with the given name that implements the `ServiceFactory` trait. /// The required parameters are required to be given in the exact order. Some parameters are followed /// by `{}` blocks. These blocks are required and used to initialize the given parameter. diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index e007641255..d3d9677bb2 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -34,7 +34,7 @@ use service::{ Roles, FactoryExtrinsic, }; -use network::{multiaddr, Multiaddr, SyncProvider, ManageNetwork}; +use network::{multiaddr, Multiaddr, ManageNetwork}; use network::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode}; use sr_primitives::generic::BlockId; use consensus::{ImportBlock, BlockImport};