Handle RPC requests in the substrate-service (#2866)

* Rework RPC queries

* Remove SyncProvider trait

* Fix RPC tests
This commit is contained in:
Pierre Krieger
2019-06-15 15:44:04 +02:00
committed by Gavin Wood
parent c48aebe897
commit f7bd56d2a8
10 changed files with 180 additions and 133 deletions
+37 -30
View File
@@ -22,7 +22,8 @@ pub mod helpers;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use crate::helpers::Receiver;
use futures::sync::{mpsc, oneshot};
use jsonrpc_derive::rpc;
use network;
use runtime_primitives::traits::{self, Header as HeaderT};
@@ -56,39 +57,49 @@ pub trait SystemApi<Hash, Number> {
/// Node is considered healthy if it is:
/// - connected to some peers (unless running in dev mode)
/// - not performing a major sync
#[rpc(name = "system_health")]
fn system_health(&self) -> Result<Health>;
#[rpc(name = "system_health", returns = "Health")]
fn system_health(&self) -> Receiver<Health>;
/// Returns currently connected peers
#[rpc(name = "system_peers")]
fn system_peers(&self) -> Result<Vec<PeerInfo<Hash, Number>>>;
#[rpc(name = "system_peers", returns = "Vec<PeerInfo<Hash, Number>>")]
fn system_peers(&self) -> Receiver<Vec<PeerInfo<Hash, Number>>>;
/// Returns current state of the network.
///
/// **Warning**: This API is not stable.
// TODO: make this stable and move structs https://github.com/paritytech/substrate/issues/1890
#[rpc(name = "system_networkState")]
fn system_network_state(&self) -> Result<network::NetworkState>;
#[rpc(name = "system_networkState", returns = "network::NetworkState")]
fn system_network_state(&self) -> Receiver<network::NetworkState>;
}
/// System API implementation
pub struct System<B: traits::Block> {
info: SystemInfo,
sync: Arc<dyn network::SyncProvider<B>>,
should_have_peers: bool,
send_back: mpsc::UnboundedSender<Request<B>>,
}
/// Request to be processed.
pub enum Request<B: traits::Block> {
/// Must return the health of the network.
Health(oneshot::Sender<Health>),
/// Must return information about the peers we are connected to.
Peers(oneshot::Sender<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>>),
/// Must return the state of the network.
NetworkState(oneshot::Sender<network::NetworkState>),
}
impl<B: traits::Block> System<B> {
/// Creates new `System` given the `SystemInfo`.
/// Creates new `System`.
///
/// The `send_back` will be used to transmit some of the requests. The user is responsible for
/// reading from that channel and answering the requests.
pub fn new(
info: SystemInfo,
sync: Arc<dyn network::SyncProvider<B>>,
should_have_peers: bool,
send_back: mpsc::UnboundedSender<Request<B>>
) -> Self {
System {
info,
should_have_peers,
sync,
send_back,
}
}
}
@@ -110,25 +121,21 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
Ok(self.info.properties.clone())
}
fn system_health(&self) -> Result<Health> {
Ok(Health {
peers: self.sync.peers_debug_info().len(),
is_syncing: self.sync.is_major_syncing(),
should_have_peers: self.should_have_peers,
})
fn system_health(&self) -> Receiver<Health> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::Health(tx));
Receiver(rx)
}
fn system_peers(&self) -> Result<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> {
Ok(self.sync.peers_debug_info().into_iter().map(|(peer_id, p)| PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}).collect())
fn system_peers(&self) -> Receiver<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::Peers(tx));
Receiver(rx)
}
fn system_network_state(&self) -> Result<network::NetworkState> {
Ok(self.sync.network_state())
fn system_network_state(&self) -> Receiver<network::NetworkState> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::NetworkState(tx));
Receiver(rx)
}
}
+60 -53
View File
@@ -16,11 +16,12 @@
use super::*;
use network::{self, ProtocolStatus, PeerId, PeerInfo as NetworkPeerInfo};
use network::{self, PeerId};
use network::config::Roles;
use test_client::runtime::Block;
use assert_matches::assert_matches;
use futures::sync::mpsc;
use futures::{prelude::*, sync::mpsc};
use std::thread;
struct Status {
pub peers: usize,
@@ -40,55 +41,61 @@ impl Default for Status {
}
}
impl network::SyncProvider<Block> for Status {
fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<Block>> {
let (_sink, stream) = mpsc::unbounded();
stream
}
fn network_state(&self) -> network::NetworkState {
network::NetworkState {
peer_id: String::new(),
listened_addresses: Default::default(),
external_addresses: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
peerset: serde_json::Value::Null,
}
}
fn peers_debug_info(&self) -> Vec<(PeerId, NetworkPeerInfo<Block>)> {
let mut peers = vec![];
for _peer in 0..self.peers {
peers.push(
(self.peer_id.clone(), NetworkPeerInfo {
roles: Roles::FULL,
protocol_version: 1,
best_hash: Default::default(),
best_number: 1
})
);
}
peers
}
fn is_major_syncing(&self) -> bool {
self.is_syncing
}
}
fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
let status = sync.into().unwrap_or_default();
let should_have_peers = !status.is_dev;
let (tx, rx) = mpsc::unbounded();
thread::spawn(move || {
tokio::run(rx.for_each(move |request| {
match request {
Request::Health(sender) => {
let _ = sender.send(Health {
peers: status.peers,
is_syncing: status.is_syncing,
should_have_peers,
});
},
Request::Peers(sender) => {
let mut peers = vec![];
for _peer in 0..status.peers {
peers.push(PeerInfo {
peer_id: status.peer_id.to_base58(),
roles: format!("{:?}", Roles::FULL),
protocol_version: 1,
best_hash: Default::default(),
best_number: 1,
});
}
let _ = sender.send(peers);
}
Request::NetworkState(sender) => {
let _ = sender.send(network::NetworkState {
peer_id: String::new(),
listened_addresses: Default::default(),
external_addresses: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
peerset: serde_json::Value::Null,
});
}
};
Ok(())
}))
});
System::new(SystemInfo {
impl_name: "testclient".into(),
impl_version: "0.2.0".into(),
chain_name: "testchain".into(),
properties: Default::default(),
}, Arc::new(status), should_have_peers)
}, tx)
}
fn wait_receiver<T>(rx: Receiver<T>) -> T {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
runtime.block_on(rx).unwrap()
}
#[test]
@@ -126,7 +133,7 @@ fn system_properties_works() {
#[test]
fn system_health() {
assert_matches!(
api(None).system_health().unwrap(),
wait_receiver(api(None).system_health()),
Health {
peers: 0,
is_syncing: false,
@@ -135,12 +142,12 @@ fn system_health() {
);
assert_matches!(
api(Status {
wait_receiver(api(Status {
peer_id: PeerId::random(),
peers: 5,
is_syncing: true,
is_dev: true,
}).system_health().unwrap(),
}).system_health()),
Health {
peers: 5,
is_syncing: true,
@@ -149,12 +156,12 @@ fn system_health() {
);
assert_eq!(
api(Status {
wait_receiver(api(Status {
peer_id: PeerId::random(),
peers: 5,
is_syncing: false,
is_dev: false,
}).system_health().unwrap(),
}).system_health()),
Health {
peers: 5,
is_syncing: false,
@@ -163,12 +170,12 @@ fn system_health() {
);
assert_eq!(
api(Status {
wait_receiver(api(Status {
peer_id: PeerId::random(),
peers: 0,
is_syncing: false,
is_dev: true,
}).system_health().unwrap(),
}).system_health()),
Health {
peers: 0,
is_syncing: false,
@@ -181,12 +188,12 @@ fn system_health() {
fn system_peers() {
let peer_id = PeerId::random();
assert_eq!(
api(Status {
wait_receiver(api(Status {
peer_id: peer_id.clone(),
peers: 1,
is_syncing: false,
is_dev: true,
}).system_peers().unwrap(),
}).system_peers()),
vec![PeerInfo {
peer_id: peer_id.to_base58(),
roles: "FULL".into(),
@@ -200,7 +207,7 @@ fn system_peers() {
#[test]
fn system_network_state() {
assert_eq!(
api(None).system_network_state().unwrap(),
wait_receiver(api(None).system_network_state()),
network::NetworkState {
peer_id: String::new(),
listened_addresses: Default::default(),