Upgrade jsonrpc to 0.18.0 (#9547)

* Upgrade jsonrpc to 0.18.0

I think this says all :P

* 🤦

* Fmt etc

* Fix tests

* Fix tests again...

* Better impl

* Revert "Tell dependabot to ignore jsonrpc-* updates (#9518)"

This reverts commit 6e0cd5587d.
This commit is contained in:
Bastian Köcher
2021-08-13 08:46:07 +02:00
committed by GitHub
parent 199b2883af
commit c44aba89e6
65 changed files with 1422 additions and 1291 deletions
+49 -46
View File
@@ -28,12 +28,10 @@ use sp_blockchain::HeaderBackend;
use codec::{Decode, Encode};
use futures::{
compat::Compat,
future::{ready, FutureExt, TryFutureExt},
StreamExt as _,
future::{FutureExt, TryFutureExt},
SinkExt, StreamExt as _,
};
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use rpc::futures::{future::result, Future, Sink};
use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
@@ -42,7 +40,7 @@ use sc_transaction_pool_api::{
use sp_api::ProvideRuntimeApi;
use sp_core::Bytes;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::generic;
use sp_runtime::{generic, traits::Block as BlockT};
use sp_session::SessionKeys;
use self::error::{Error, FutureResult, Result};
@@ -88,6 +86,8 @@ where
P: TransactionPool + Sync + Send + 'static,
Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
Client::Api: SessionKeys<P::Block>,
P::Hash: Unpin,
<P::Block as BlockT>::Hash: Unpin,
{
type Metadata = crate::Metadata;
@@ -135,19 +135,18 @@ where
fn submit_extrinsic(&self, ext: Bytes) -> FutureResult<TxHash<P>> {
let xt = match Decode::decode(&mut &ext[..]) {
Ok(xt) => xt,
Err(err) => return Box::new(result(Err(err.into()))),
Err(err) => return async move { Err(err.into()) }.boxed(),
};
let best_block_hash = self.client.info().best_hash;
Box::new(
self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), TX_SOURCE, xt)
.compat()
.map_err(|e| {
e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
}),
)
self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), TX_SOURCE, xt)
.map_err(|e| {
e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
})
.boxed()
}
fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
@@ -185,46 +184,50 @@ where
subscriber: Subscriber<TransactionStatus<TxHash<P>, BlockHash<P>>>,
xt: Bytes,
) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info().best_hash;
let dxt = TransactionFor::<P>::decode(&mut &xt[..]).map_err(error::Error::from)?;
Ok(self
.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), TX_SOURCE, dxt)
.map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
}))
let best_block_hash = self.client.info().best_hash;
let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(error::Error::from) {
Ok(tx) => tx,
Err(err) => {
warn!("Failed to submit extrinsic: {}", err);
// reject the subscriber (ignore errors - we don't care if subscriber is no longer
// there).
let _ = subscriber.reject(err.into());
return
},
};
let submit = self
.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), TX_SOURCE, dxt)
.map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
});
let subscriptions = self.subscriptions.clone();
let future = ready(submit())
.and_then(|res| res)
// convert the watcher into a `Stream`
.map(|res| res.map(|stream| stream.map(|v| Ok::<_, ()>(Ok(v)))))
// now handle the import result,
// start a new subscrition
.map(move |result| match result {
Ok(watcher) => {
subscriptions.add(subscriber, move |sink| {
sink.sink_map_err(|e| log::debug!("Subscription sink failed: {:?}", e))
.send_all(Compat::new(watcher))
.map(|_| ())
});
},
let future = async move {
let tx_stream = match submit.await {
Ok(s) => s,
Err(err) => {
warn!("Failed to submit extrinsic: {}", err);
// reject the subscriber (ignore errors - we don't care if subscriber is no
// longer there).
let _ = subscriber.reject(err.into());
return
},
});
};
let res = self
.subscriptions
.executor()
.execute(Box::new(Compat::new(future.map(|_| Ok(())))));
subscriptions.add(subscriber, move |sink| {
tx_stream
.map(|v| Ok(Ok(v)))
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
.map(drop)
});
};
let res = self.subscriptions.executor().spawn_obj(future.boxed().into());
if res.is_err() {
warn!("Error spawning subscription RPC task.");
}
+14 -15
View File
@@ -20,8 +20,7 @@ use super::*;
use assert_matches::assert_matches;
use codec::Encode;
use futures::{compat::Future01CompatExt, executor};
use rpc::futures::Stream as _;
use futures::executor;
use sc_transaction_pool::{BasicPool, FullChainApi};
use sp_core::{
blake2_256,
@@ -86,10 +85,10 @@ fn submit_transaction_should_not_cause_error() {
let h: H256 = blake2_256(&xt).into();
assert_matches!(
AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(),
executor::block_on(AuthorApi::submit_extrinsic(&p, xt.clone().into())),
Ok(h2) if h == h2
);
assert!(AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err());
assert!(executor::block_on(AuthorApi::submit_extrinsic(&p, xt.into())).is_err());
}
#[test]
@@ -99,10 +98,10 @@ fn submit_rich_transaction_should_not_cause_error() {
let h: H256 = blake2_256(&xt).into();
assert_matches!(
AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(),
executor::block_on(AuthorApi::submit_extrinsic(&p, xt.clone().into())),
Ok(h2) if h == h2
);
assert!(AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err());
assert!(executor::block_on(AuthorApi::submit_extrinsic(&p, xt.into())).is_err());
}
#[test]
@@ -120,7 +119,7 @@ fn should_watch_extrinsic() {
uxt(AccountKeyring::Alice, 0).encode().into(),
);
let id = executor::block_on(id_rx.compat()).unwrap().unwrap();
let id = executor::block_on(id_rx).unwrap().unwrap();
assert_matches!(id, SubscriptionId::String(_));
let id = match id {
@@ -138,8 +137,8 @@ fn should_watch_extrinsic() {
};
tx.into_signed_tx()
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();
executor::block_on(AuthorApi::submit_extrinsic(&p, replacement.encode().into())).unwrap();
let (res, data) = executor::block_on(data.into_future());
let expected = Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":"ready","subscription":"{}"}}}}"#,
@@ -154,7 +153,7 @@ fn should_watch_extrinsic() {
id,
));
let res = executor::block_on(data.into_future().compat()).unwrap().0;
let res = executor::block_on(data.into_future()).0;
assert_eq!(res, expected);
}
@@ -174,7 +173,7 @@ fn should_return_watch_validation_error() {
);
// then
let res = executor::block_on(id_rx.compat()).unwrap();
let res = executor::block_on(id_rx).unwrap();
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
}
@@ -183,7 +182,7 @@ fn should_return_pending_extrinsics() {
let p = TestSetup::default().author();
let ex = uxt(AccountKeyring::Alice, 0);
AuthorApi::submit_extrinsic(&p, ex.encode().into()).wait().unwrap();
executor::block_on(AuthorApi::submit_extrinsic(&p, ex.encode().into())).unwrap();
assert_matches!(
p.pending_extrinsics(),
Ok(ref expected) if *expected == vec![Bytes(ex.encode())]
@@ -196,11 +195,11 @@ fn should_remove_extrinsics() {
let p = setup.author();
let ex1 = uxt(AccountKeyring::Alice, 0);
p.submit_extrinsic(ex1.encode().into()).wait().unwrap();
executor::block_on(p.submit_extrinsic(ex1.encode().into())).unwrap();
let ex2 = uxt(AccountKeyring::Alice, 1);
p.submit_extrinsic(ex2.encode().into()).wait().unwrap();
executor::block_on(p.submit_extrinsic(ex2.encode().into())).unwrap();
let ex3 = uxt(AccountKeyring::Bob, 0);
let hash3 = p.submit_extrinsic(ex3.encode().into()).wait().unwrap();
let hash3 = executor::block_on(p.submit_extrinsic(ex3.encode().into())).unwrap();
assert_eq!(setup.pool.status().ready, 3);
// now remove all 3
+9 -13
View File
@@ -18,19 +18,16 @@
//! Blockchain API backend for full nodes.
use super::{client_err, error::FutureResult, ChainBackend};
use futures::FutureExt;
use jsonrpc_pubsub::manager::SubscriptionManager;
use rpc::futures::future::result;
use std::sync::Arc;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
use sp_runtime::{
generic::{BlockId, SignedBlock},
traits::Block as BlockT,
};
use super::{client_err, error::FutureResult, ChainBackend};
use sp_blockchain::HeaderBackend;
use std::marker::PhantomData;
use std::{marker::PhantomData, sync::Arc};
/// Blockchain API backend for full nodes. Reads all the data from local database.
pub struct FullChain<Block: BlockT, Client> {
@@ -52,6 +49,7 @@ impl<Block: BlockT, Client> FullChain<Block, Client> {
impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
fn client(&self) -> &Arc<Client> {
@@ -63,14 +61,12 @@ where
}
fn header(&self, hash: Option<Block::Hash>) -> FutureResult<Option<Block::Header>> {
Box::new(result(
self.client.header(BlockId::Hash(self.unwrap_or_best(hash))).map_err(client_err),
))
let res = self.client.header(BlockId::Hash(self.unwrap_or_best(hash))).map_err(client_err);
async move { res }.boxed()
}
fn block(&self, hash: Option<Block::Hash>) -> FutureResult<Option<SignedBlock<Block>>> {
Box::new(result(
self.client.block(&BlockId::Hash(self.unwrap_or_best(hash))).map_err(client_err),
))
let res = self.client.block(&BlockId::Hash(self.unwrap_or_best(hash))).map_err(client_err);
async move { res }.boxed()
}
}
+24 -24
View File
@@ -20,7 +20,6 @@
use futures::{future::ready, FutureExt, TryFutureExt};
use jsonrpc_pubsub::manager::SubscriptionManager;
use rpc::futures::future::{result, Either, Future};
use std::sync::Arc;
use sc_client_api::light::{Fetcher, RemoteBlockchain, RemoteBodyRequest};
@@ -61,6 +60,7 @@ impl<Block: BlockT, Client, F: Fetcher<Block>> LightChain<Block, Client, F> {
impl<Block, Client, F> ChainBackend<Client, Block> for LightChain<Block, Client, F>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: BlockchainEvents<Block> + HeaderBackend<Block> + Send + Sync + 'static,
F: Fetcher<Block> + Send + Sync + 'static,
{
@@ -82,33 +82,33 @@ where
BlockId::Hash(hash),
);
Box::new(
maybe_header
.then(move |result| ready(result.map_err(client_err)))
.boxed()
.compat(),
)
maybe_header.then(move |result| ready(result.map_err(client_err))).boxed()
}
fn block(&self, hash: Option<Block::Hash>) -> FutureResult<Option<SignedBlock<Block>>> {
let fetcher = self.fetcher.clone();
let block = self.header(hash).and_then(move |header| match header {
Some(header) => Either::A(
fetcher
.remote_body(RemoteBodyRequest {
header: header.clone(),
retry_count: Default::default(),
})
.boxed()
.compat()
.map(move |body| {
Some(SignedBlock { block: Block::new(header, body), justifications: None })
})
.map_err(client_err),
),
None => Either::B(result(Ok(None))),
});
self.header(hash)
.and_then(move |header| async move {
match header {
Some(header) => {
let body = fetcher
.remote_body(RemoteBodyRequest {
header: header.clone(),
retry_count: Default::default(),
})
.await;
Box::new(block)
body.map(|body| {
Some(SignedBlock {
block: Block::new(header, body),
justifications: None,
})
})
.map_err(client_err)
},
None => Ok(None),
}
})
.boxed()
}
}
+17 -15
View File
@@ -27,7 +27,7 @@ mod tests;
use futures::{future, StreamExt, TryStreamExt};
use log::warn;
use rpc::{
futures::{stream, Future, Sink, Stream},
futures::{stream, FutureExt, SinkExt, Stream},
Result as RpcResult,
};
use std::sync::Arc;
@@ -53,6 +53,7 @@ use sp_blockchain::HeaderBackend;
trait ChainBackend<Client, Block: BlockT>: Send + Sync + 'static
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
/// Get client reference.
@@ -120,8 +121,7 @@ where
|| {
self.client()
.import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.header))
.compat()
.map(|notification| Ok::<_, rpc::Error>(notification.header))
},
)
}
@@ -150,8 +150,7 @@ where
self.client()
.import_notification_stream()
.filter(|notification| future::ready(notification.is_new_best))
.map(|notification| Ok::<_, ()>(notification.header))
.compat()
.map(|notification| Ok::<_, rpc::Error>(notification.header))
},
)
}
@@ -179,8 +178,7 @@ where
|| {
self.client()
.finality_notification_stream()
.map(|notification| Ok::<_, ()>(notification.header))
.compat()
.map(|notification| Ok::<_, rpc::Error>(notification.header))
},
)
}
@@ -202,6 +200,7 @@ pub fn new_full<Block: BlockT, Client>(
) -> Chain<Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
Chain { backend: Box::new(self::chain_full::FullChain::new(client, subscriptions)) }
@@ -216,6 +215,7 @@ pub fn new_light<Block: BlockT, Client, F: Fetcher<Block>>(
) -> Chain<Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
F: Send + Sync + 'static,
{
@@ -238,6 +238,7 @@ impl<Block, Client> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, Signe
for Chain<Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
type Metadata = crate::Metadata;
@@ -312,7 +313,7 @@ where
}
/// Subscribe to new headers.
fn subscribe_headers<Block, Client, F, G, S, ERR>(
fn subscribe_headers<Block, Client, F, G, S>(
client: &Arc<Client>,
subscriptions: &SubscriptionManager,
subscriber: Subscriber<Block::Header>,
@@ -320,27 +321,28 @@ fn subscribe_headers<Block, Client, F, G, S, ERR>(
stream: F,
) where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + 'static,
F: FnOnce() -> S,
G: FnOnce() -> Block::Hash,
ERR: ::std::fmt::Debug,
S: Stream<Item = Block::Header, Error = ERR> + Send + 'static,
S: Stream<Item = std::result::Result<Block::Header, rpc::Error>> + Send + 'static,
{
subscriptions.add(subscriber, |sink| {
// send current head right at the start.
let header = client
.header(BlockId::Hash(best_block_hash()))
.map_err(client_err)
.and_then(|header| header.ok_or_else(|| "Best header missing.".to_owned().into()))
.and_then(|header| header.ok_or_else(|| "Best header missing.".to_string().into()))
.map_err(Into::into);
// send further subscriptions
let stream = stream()
.map(|res| Ok(res))
.map_err(|e| warn!("Block notification stream error: {:?}", e));
.inspect_err(|e| warn!("Block notification stream error: {:?}", e))
.map(|res| Ok(res));
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream::iter_result(vec![Ok(header)]).chain(stream))
stream::iter(vec![Ok(header)])
.chain(stream)
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
+26 -41
View File
@@ -19,10 +19,7 @@
use super::*;
use crate::testing::TaskExecutor;
use assert_matches::assert_matches;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
executor,
};
use futures::executor;
use sc_block_builder::BlockBuilderProvider;
use sp_consensus::BlockOrigin;
use sp_rpc::list::ListOrValue;
@@ -37,7 +34,7 @@ fn should_return_header() {
let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
assert_matches!(
api.header(Some(client.genesis_hash()).into()).wait(),
executor::block_on(api.header(Some(client.genesis_hash()).into())),
Ok(Some(ref x)) if x == &Header {
parent_hash: H256::from_low_u64_be(0),
number: 0,
@@ -49,7 +46,7 @@ fn should_return_header() {
);
assert_matches!(
api.header(None.into()).wait(),
executor::block_on(api.header(None.into())),
Ok(Some(ref x)) if x == &Header {
parent_hash: H256::from_low_u64_be(0),
number: 0,
@@ -60,7 +57,10 @@ fn should_return_header() {
}
);
assert_matches!(api.header(Some(H256::from_low_u64_be(5)).into()).wait(), Ok(None));
assert_matches!(
executor::block_on(api.header(Some(H256::from_low_u64_be(5)).into())),
Ok(None)
);
}
#[test]
@@ -74,12 +74,12 @@ fn should_return_a_block() {
// Genesis block is not justified
assert_matches!(
api.block(Some(client.genesis_hash()).into()).wait(),
executor::block_on(api.block(Some(client.genesis_hash()).into())),
Ok(Some(SignedBlock { justifications: None, .. }))
);
assert_matches!(
api.block(Some(block_hash).into()).wait(),
executor::block_on(api.block(Some(block_hash).into())),
Ok(Some(ref x)) if x.block == Block {
header: Header {
parent_hash: client.genesis_hash(),
@@ -94,7 +94,7 @@ fn should_return_a_block() {
);
assert_matches!(
api.block(None.into()).wait(),
executor::block_on(api.block(None.into())),
Ok(Some(ref x)) if x.block == Block {
header: Header {
parent_hash: client.genesis_hash(),
@@ -108,7 +108,7 @@ fn should_return_a_block() {
}
);
assert_matches!(api.block(Some(H256::from_low_u64_be(5)).into()).wait(), Ok(None));
assert_matches!(executor::block_on(api.block(Some(H256::from_low_u64_be(5)).into())), Ok(None));
}
#[test]
@@ -182,7 +182,7 @@ fn should_return_finalized_hash() {
#[test]
fn should_notify_about_latest_block() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
@@ -191,25 +191,20 @@ fn should_notify_about_latest_block() {
api.subscribe_all_heads(Default::default(), subscriber);
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
}
// assert initial head sent.
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
// Check for the correct number of notifications
executor::block_on((&mut transport).take(2).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
#[test]
fn should_notify_about_best_block() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
@@ -218,25 +213,20 @@ fn should_notify_about_best_block() {
api.subscribe_new_heads(Default::default(), subscriber);
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
}
// assert initial head sent.
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(Stream01CompatExt::compat(next).into_future()).0, None);
// Assert that the correct number of notifications have been sent.
executor::block_on((&mut transport).take(2).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
#[test]
fn should_notify_about_finalized_block() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
@@ -245,19 +235,14 @@ fn should_notify_about_finalized_block() {
api.subscribe_finalized_heads(Default::default(), subscriber);
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
client.finalize_block(BlockId::number(1), None).unwrap();
}
// assert initial head sent.
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
// Assert that the correct number of notifications have been sent.
executor::block_on((&mut transport).take(2).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
+11 -8
View File
@@ -22,8 +22,10 @@
#![warn(missing_docs)]
use futures::{compat::Future01CompatExt, FutureExt};
use rpc::futures::future::{ExecuteError, Executor, Future};
use futures::{
task::{FutureObj, Spawn, SpawnError},
FutureExt,
};
use sp_core::traits::SpawnNamed;
use std::sync::Arc;
@@ -50,12 +52,13 @@ impl SubscriptionTaskExecutor {
}
}
impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SubscriptionTaskExecutor {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
self.0.spawn("substrate-rpc-subscription", future.compat().map(drop).boxed());
impl Spawn for SubscriptionTaskExecutor {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed());
Ok(())
}
fn status(&self) -> Result<(), SpawnError> {
Ok(())
}
}
+14 -12
View File
@@ -24,11 +24,9 @@ mod state_light;
#[cfg(test)]
mod tests;
use futures::FutureExt;
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use rpc::{
futures::{future::result, Future},
Result as RpcResult,
};
use rpc::Result as RpcResult;
use std::sync::Arc;
use sc_client_api::light::{Fetcher, RemoteBlockchain};
@@ -192,6 +190,7 @@ pub fn new_full<BE, Block: BlockT, Client>(
) -> (State<Block, Client>, ChildState<Block, Client>)
where
Block: BlockT + 'static,
Block::Hash: Unpin,
BE: Backend<Block> + 'static,
Client: ExecutorProvider<Block>
+ StorageProvider<Block, BE>
@@ -227,6 +226,7 @@ pub fn new_light<BE, Block: BlockT, Client, F: Fetcher<Block>>(
) -> (State<Block, Client>, ChildState<Block, Client>)
where
Block: BlockT + 'static,
Block::Hash: Unpin,
BE: Backend<Block> + 'static,
Client: ExecutorProvider<Block>
+ StorageProvider<Block, BE>
@@ -287,7 +287,7 @@ where
block: Option<Block::Hash>,
) -> FutureResult<Vec<(StorageKey, StorageData)>> {
if let Err(err) = self.deny_unsafe.check_if_safe() {
return Box::new(result(Err(err.into())))
return async move { Err(err.into()) }.boxed()
}
self.backend.storage_pairs(block, key_prefix)
@@ -301,10 +301,10 @@ where
block: Option<Block::Hash>,
) -> FutureResult<Vec<StorageKey>> {
if count > STORAGE_KEYS_PAGED_MAX_COUNT {
return Box::new(result(Err(Error::InvalidCount {
value: count,
max: STORAGE_KEYS_PAGED_MAX_COUNT,
})))
return async move {
Err(Error::InvalidCount { value: count, max: STORAGE_KEYS_PAGED_MAX_COUNT })
}
.boxed()
}
self.backend.storage_keys_paged(block, prefix, count, start_key)
}
@@ -344,7 +344,7 @@ where
to: Option<Block::Hash>,
) -> FutureResult<Vec<StorageChangeSet<Block::Hash>>> {
if let Err(err) = self.deny_unsafe.check_if_safe() {
return Box::new(result(Err(err.into())))
return async move { Err(err.into()) }.boxed()
}
self.backend.query_storage(from, to, keys)
@@ -415,7 +415,7 @@ where
storage_keys: Option<String>,
) -> FutureResult<sp_rpc::tracing::TraceBlockResponse> {
if let Err(err) = self.deny_unsafe.check_if_safe() {
return Box::new(result(Err(err.into())))
return async move { Err(err.into()) }.boxed()
}
self.backend.trace_block(block, targets, storage_keys)
@@ -478,7 +478,9 @@ where
storage_key: PrefixedStorageKey,
key: StorageKey,
) -> FutureResult<Option<u64>> {
Box::new(self.storage(block, storage_key, key).map(|x| x.map(|x| x.0.len() as u64)))
self.storage(block, storage_key, key)
.map(|x| x.map(|r| r.map(|v| v.0.len() as u64)))
.boxed()
}
}
+191 -192
View File
@@ -18,13 +18,10 @@
//! State API backend for full nodes.
use futures::{future, StreamExt as _, TryStreamExt as _};
use futures::{future, stream, FutureExt, SinkExt, StreamExt};
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use log::warn;
use rpc::{
futures::{future::result, stream, Future, Sink, Stream},
Result as RpcResult,
};
use rpc::Result as RpcResult;
use std::{
collections::{BTreeMap, HashMap},
ops::Range,
@@ -263,6 +260,7 @@ where
impl<BE, Block, Client> StateBackend<Block, Client> for FullState<BE, Block, Client>
where
Block: BlockT + 'static,
Block::Hash: Unpin,
BE: Backend<Block> + 'static,
Client: ExecutorProvider<Block>
+ StorageProvider<Block, BE>
@@ -299,7 +297,7 @@ where
.map(Into::into)
})
.map_err(client_err);
Box::new(result(r))
async move { r }.boxed()
}
fn storage_keys(
@@ -307,11 +305,11 @@ where
block: Option<Block::Hash>,
prefix: StorageKey,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| self.client.storage_keys(&BlockId::Hash(block), &prefix))
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| self.client.storage_keys(&BlockId::Hash(block), &prefix))
.map_err(client_err);
async move { r }.boxed()
}
fn storage_pairs(
@@ -319,11 +317,11 @@ where
block: Option<Block::Hash>,
prefix: StorageKey,
) -> FutureResult<Vec<(StorageKey, StorageData)>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| self.client.storage_pairs(&BlockId::Hash(block), &prefix))
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| self.client.storage_pairs(&BlockId::Hash(block), &prefix))
.map_err(client_err);
async move { r }.boxed()
}
fn storage_keys_paged(
@@ -333,18 +331,18 @@ where
count: u32,
start_key: Option<StorageKey>,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
self.client.storage_keys_iter(
&BlockId::Hash(block),
prefix.as_ref(),
start_key.as_ref(),
)
})
.map(|iter| iter.take(count as usize).collect())
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
self.client.storage_keys_iter(
&BlockId::Hash(block),
prefix.as_ref(),
start_key.as_ref(),
)
})
.map(|iter| iter.take(count as usize).collect())
.map_err(client_err);
async move { r }.boxed()
}
fn storage(
@@ -352,11 +350,11 @@ where
block: Option<Block::Hash>,
key: StorageKey,
) -> FutureResult<Option<StorageData>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| self.client.storage(&BlockId::Hash(block), &key))
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| self.client.storage(&BlockId::Hash(block), &key))
.map_err(client_err);
async move { r }.boxed()
}
fn storage_size(
@@ -366,28 +364,28 @@ where
) -> FutureResult<Option<u64>> {
let block = match self.block_or_best(block) {
Ok(b) => b,
Err(e) => return Box::new(result(Err(client_err(e)))),
Err(e) => return async move { Err(client_err(e)) }.boxed(),
};
match self.client.storage(&BlockId::Hash(block), &key) {
Ok(Some(d)) => return Box::new(result(Ok(Some(d.0.len() as u64)))),
Err(e) => return Box::new(result(Err(client_err(e)))),
Ok(Some(d)) => return async move { Ok(Some(d.0.len() as u64)) }.boxed(),
Err(e) => return async move { Err(client_err(e)) }.boxed(),
Ok(None) => {},
}
Box::new(result(
self.client
.storage_pairs(&BlockId::Hash(block), &key)
.map(|kv| {
let item_sum = kv.iter().map(|(_, v)| v.0.len() as u64).sum::<u64>();
if item_sum > 0 {
Some(item_sum)
} else {
None
}
})
.map_err(client_err),
))
let r = self
.client
.storage_pairs(&BlockId::Hash(block), &key)
.map(|kv| {
let item_sum = kv.iter().map(|(_, v)| v.0.len() as u64).sum::<u64>();
if item_sum > 0 {
Some(item_sum)
} else {
None
}
})
.map_err(client_err);
async move { r }.boxed()
}
fn storage_hash(
@@ -395,29 +393,31 @@ where
block: Option<Block::Hash>,
key: StorageKey,
) -> FutureResult<Option<Block::Hash>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| self.client.storage_hash(&BlockId::Hash(block), &key))
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| self.client.storage_hash(&BlockId::Hash(block), &key))
.map_err(client_err);
async move { r }.boxed()
}
fn metadata(&self, block: Option<Block::Hash>) -> FutureResult<Bytes> {
Box::new(result(self.block_or_best(block).map_err(client_err).and_then(|block| {
let r = self.block_or_best(block).map_err(client_err).and_then(|block| {
self.client
.runtime_api()
.metadata(&BlockId::Hash(block))
.map(Into::into)
.map_err(|e| Error::Client(Box::new(e)))
})))
});
async move { r }.boxed()
}
fn runtime_version(&self, block: Option<Block::Hash>) -> FutureResult<RuntimeVersion> {
Box::new(result(self.block_or_best(block).map_err(client_err).and_then(|block| {
let r = self.block_or_best(block).map_err(client_err).and_then(|block| {
self.client
.runtime_version_at(&BlockId::Hash(block))
.map_err(|e| Error::Client(Box::new(e)))
})))
});
async move { r }.boxed()
}
fn query_storage(
@@ -434,7 +434,9 @@ where
self.query_storage_filtered(&range, &keys, &last_values, &mut changes)?;
Ok(changes)
};
Box::new(result(call_fn()))
let r = call_fn();
async move { r }.boxed()
}
fn query_storage_at(
@@ -451,19 +453,16 @@ where
block: Option<Block::Hash>,
keys: Vec<StorageKey>,
) -> FutureResult<ReadProof<Block::Hash>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
self.client
.read_proof(
&BlockId::Hash(block),
&mut keys.iter().map(|key| key.0.as_ref()),
)
.map(|proof| proof.iter_nodes().map(|node| node.into()).collect())
.map(|proof| ReadProof { at: block, proof })
})
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
self.client
.read_proof(&BlockId::Hash(block), &mut keys.iter().map(|key| key.0.as_ref()))
.map(|proof| proof.iter_nodes().map(|node| node.into()).collect())
.map(|proof| ReadProof { at: block, proof })
})
.map_err(client_err);
async move { r }.boxed()
}
fn subscribe_runtime_version(
@@ -483,29 +482,34 @@ where
};
self.subscriptions.add(subscriber, |sink| {
let version = self.runtime_version(None.into()).map_err(Into::into).wait();
let version = self
.block_or_best(None)
.and_then(|block| {
self.client.runtime_version_at(&BlockId::Hash(block)).map_err(Into::into)
})
.map_err(client_err)
.map_err(Into::into);
let client = self.client.clone();
let mut previous_version = version.clone();
let stream = stream
.filter_map(move |_| {
let info = client.info();
let version = client
.runtime_version_at(&BlockId::hash(info.best_hash))
.map_err(|e| Error::Client(Box::new(e)))
.map_err(Into::into);
if previous_version != version {
previous_version = version.clone();
future::ready(Some(Ok::<_, ()>(version)))
} else {
future::ready(None)
}
})
.compat();
let stream = stream.filter_map(move |_| {
let info = client.info();
let version = client
.runtime_version_at(&BlockId::hash(info.best_hash))
.map_err(|e| Error::Client(Box::new(e)))
.map_err(Into::into);
if previous_version != version {
previous_version = version.clone();
future::ready(Some(Ok::<_, ()>(version)))
} else {
future::ready(None)
}
});
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream::iter_result(vec![Ok(version)]).chain(stream))
stream::iter(vec![Ok(version)])
.chain(stream)
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
@@ -538,16 +542,14 @@ where
};
// initial values
let initial = stream::iter_result(
let initial = stream::iter(
keys.map(|keys| {
let block = self.client.info().best_hash;
let changes = keys
.into_iter()
.map(|key| {
StateBackend::storage(self, Some(block.clone()).into(), key.clone())
.map(|val| (key.clone(), val))
.wait()
.unwrap_or_else(|_| (key, None))
let v = self.client.storage(&BlockId::Hash(block), &key).ok().flatten();
(key, v)
})
.collect();
vec![Ok(Ok(StorageChangeSet { block, changes }))]
@@ -556,26 +558,19 @@ where
);
self.subscriptions.add(subscriber, |sink| {
let stream = stream
.map(|(block, changes)| {
Ok::<_, ()>(Ok(StorageChangeSet {
block,
changes: changes
.iter()
.filter_map(|(o_sk, k, v)| {
if o_sk.is_none() {
Some((k.clone(), v.cloned()))
} else {
None
}
})
.collect(),
}))
})
.compat();
let stream = stream.map(|(block, changes)| {
Ok(Ok::<_, rpc::Error>(StorageChangeSet {
block,
changes: changes
.iter()
.filter_map(|(o_sk, k, v)| o_sk.is_none().then(|| (k.clone(), v.cloned())))
.collect(),
}))
});
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(initial.chain(stream))
initial
.chain(stream)
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
@@ -602,11 +597,10 @@ where
storage_keys,
self.rpc_max_payload,
);
Box::new(result(
block_executor
.trace_block()
.map_err(|e| invalid_block::<Block>(block, None, e.to_string())),
))
let r = block_executor
.trace_block()
.map_err(|e| invalid_block::<Block>(block, None, e.to_string()));
async move { r }.boxed()
}
}
@@ -634,25 +628,26 @@ where
storage_key: PrefixedStorageKey,
keys: Vec<StorageKey>,
) -> FutureResult<ReadProof<Block::Hash>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client
.read_child_proof(
&BlockId::Hash(block),
&child_info,
&mut keys.iter().map(|key| key.0.as_ref()),
)
.map(|proof| proof.iter_nodes().map(|node| node.into()).collect())
.map(|proof| ReadProof { at: block, proof })
})
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client
.read_child_proof(
&BlockId::Hash(block),
&child_info,
&mut keys.iter().map(|key| key.0.as_ref()),
)
.map(|proof| proof.iter_nodes().map(|node| node.into()).collect())
.map(|proof| ReadProof { at: block, proof })
})
.map_err(client_err);
async move { r }.boxed()
}
fn storage_keys(
@@ -661,18 +656,19 @@ where
storage_key: PrefixedStorageKey,
prefix: StorageKey,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_keys(&BlockId::Hash(block), &child_info, &prefix)
})
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_keys(&BlockId::Hash(block), &child_info, &prefix)
})
.map_err(client_err);
async move { r }.boxed()
}
fn storage_keys_paged(
@@ -683,24 +679,25 @@ where
count: u32,
start_key: Option<StorageKey>,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_keys_iter(
&BlockId::Hash(block),
child_info,
prefix.as_ref(),
start_key.as_ref(),
)
})
.map(|iter| iter.take(count as usize).collect())
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_keys_iter(
&BlockId::Hash(block),
child_info,
prefix.as_ref(),
start_key.as_ref(),
)
})
.map(|iter| iter.take(count as usize).collect())
.map_err(client_err);
async move { r }.boxed()
}
fn storage(
@@ -709,18 +706,19 @@ where
storage_key: PrefixedStorageKey,
key: StorageKey,
) -> FutureResult<Option<StorageData>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage(&BlockId::Hash(block), &child_info, &key)
})
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage(&BlockId::Hash(block), &child_info, &key)
})
.map_err(client_err);
async move { r }.boxed()
}
fn storage_hash(
@@ -729,18 +727,19 @@ where
storage_key: PrefixedStorageKey,
key: StorageKey,
) -> FutureResult<Option<Block::Hash>> {
Box::new(result(
self.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_hash(&BlockId::Hash(block), &child_info, &key)
})
.map_err(client_err),
))
let r = self
.block_or_best(block)
.and_then(|block| {
let child_info = match ChildType::from_prefixed_key(&storage_key) {
Some((ChildType::ParentKeyId, storage_key)) =>
ChildInfo::new_default(storage_key),
None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
};
self.client.child_storage_hash(&BlockId::Hash(block), &child_info, &key)
})
.map_err(client_err);
async move { r }.boxed()
}
}
+90 -143
View File
@@ -22,20 +22,13 @@ use codec::Decode;
use futures::{
channel::oneshot::{channel, Sender},
future::{ready, Either},
FutureExt, StreamExt as _, TryFutureExt, TryStreamExt as _,
Future, FutureExt, SinkExt, Stream, StreamExt as _, TryFutureExt, TryStreamExt as _,
};
use hash_db::Hasher;
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use log::warn;
use parking_lot::Mutex;
use rpc::{
futures::{
future::{result, Future},
stream::Stream,
Sink,
},
Result as RpcResult,
};
use rpc::Result as RpcResult;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
@@ -171,6 +164,7 @@ where
impl<Block, F, Client> StateBackend<Block, Client> for LightState<Block, F, Client>
where
Block: BlockT,
Block::Hash: Unpin,
Client: BlockchainEvents<Block> + HeaderBackend<Block> + Send + Sync + 'static,
F: Fetcher<Block> + 'static,
{
@@ -180,17 +174,14 @@ where
method: String,
call_data: Bytes,
) -> FutureResult<Bytes> {
Box::new(
call(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
method,
call_data,
)
.boxed()
.compat(),
call(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
method,
call_data,
)
.boxed()
}
fn storage_keys(
@@ -198,7 +189,7 @@ where
_block: Option<Block::Hash>,
_prefix: StorageKey,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage_pairs(
@@ -206,7 +197,7 @@ where
_block: Option<Block::Hash>,
_prefix: StorageKey,
) -> FutureResult<Vec<(StorageKey, StorageData)>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage_keys_paged(
@@ -216,11 +207,11 @@ where
_count: u32,
_start_key: Option<StorageKey>,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage_size(&self, _: Option<Block::Hash>, _: StorageKey) -> FutureResult<Option<u64>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage(
@@ -228,21 +219,18 @@ where
block: Option<Block::Hash>,
key: StorageKey,
) -> FutureResult<Option<StorageData>> {
Box::new(
storage(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
vec![key.0.clone()],
)
.boxed()
.compat()
.map(move |mut values| {
values
.remove(&key)
.expect("successful request has entries for all requested keys; qed")
}),
storage(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
vec![key.0.clone()],
)
.map_ok(move |mut values| {
values
.remove(&key)
.expect("successful request has entries for all requested keys; qed")
})
.boxed()
}
fn storage_hash(
@@ -250,38 +238,28 @@ where
block: Option<Block::Hash>,
key: StorageKey,
) -> FutureResult<Option<Block::Hash>> {
Box::new(StateBackend::storage(self, block, key).and_then(|maybe_storage| {
result(Ok(maybe_storage.map(|storage| HashFor::<Block>::hash(&storage.0))))
}))
let res = StateBackend::storage(self, block, key);
async move { res.await.map(|r| r.map(|s| HashFor::<Block>::hash(&s.0))) }.boxed()
}
fn metadata(&self, block: Option<Block::Hash>) -> FutureResult<Bytes> {
let metadata =
self.call(block, "Metadata_metadata".into(), Bytes(Vec::new()))
.and_then(|metadata| {
OpaqueMetadata::decode(&mut &metadata.0[..]).map(Into::into).map_err(
|decode_err| {
client_err(ClientError::CallResultDecode(
"Unable to decode metadata",
decode_err,
))
},
)
});
Box::new(metadata)
self.call(block, "Metadata_metadata".into(), Bytes(Vec::new()))
.and_then(|metadata| async move {
OpaqueMetadata::decode(&mut &metadata.0[..])
.map(Into::into)
.map_err(|decode_err| {
client_err(ClientError::CallResultDecode(
"Unable to decode metadata",
decode_err,
))
})
})
.boxed()
}
fn runtime_version(&self, block: Option<Block::Hash>) -> FutureResult<RuntimeVersion> {
Box::new(
runtime_version(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
)
runtime_version(&*self.remote_blockchain, self.fetcher.clone(), self.block_or_best(block))
.boxed()
.compat(),
)
}
fn query_storage(
@@ -290,7 +268,7 @@ where
_to: Option<Block::Hash>,
_keys: Vec<StorageKey>,
) -> FutureResult<Vec<StorageChangeSet<Block::Hash>>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn query_storage_at(
@@ -298,7 +276,7 @@ where
_keys: Vec<StorageKey>,
_at: Option<Block::Hash>,
) -> FutureResult<Vec<StorageChangeSet<Block::Hash>>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn read_proof(
@@ -306,7 +284,7 @@ where
_block: Option<Block::Hash>,
_keys: Vec<StorageKey>,
) -> FutureResult<ReadProof<Block::Hash>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn subscribe_storage(
@@ -334,10 +312,7 @@ where
let changes_stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
storage_subscriptions.clone(),
self.client
.import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.hash))
.compat(),
self.client.import_notification_stream().map(|notification| notification.hash),
display_error(
storage(&*remote_blockchain, fetcher.clone(), initial_block, initial_keys)
.map(move |r| r.map(|r| (initial_block, r))),
@@ -365,21 +340,17 @@ where
.as_ref()
.map(|old_value| **old_value != new_value)
.unwrap_or(true);
match value_differs {
true => Some(StorageChangeSet {
block,
changes: new_value
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
}),
false => None,
}
value_differs.then(|| StorageChangeSet {
block,
changes: new_value.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
})
},
);
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(changes_stream.map(|changes| Ok(changes)))
changes_stream
.map_ok(Ok)
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
@@ -441,10 +412,7 @@ where
let versions_stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
version_subscriptions,
self.client
.import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.hash))
.compat(),
self.client.import_notification_stream().map(|notification| notification.hash),
display_error(
runtime_version(&*remote_blockchain, fetcher.clone(), initial_block)
.map(move |r| r.map(|r| (initial_block, r))),
@@ -455,15 +423,14 @@ where
.as_ref()
.map(|old_version| *old_version != new_version)
.unwrap_or(true);
match version_differs {
true => Some(new_version.clone()),
false => None,
}
version_differs.then(|| new_version.clone())
},
);
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(versions_stream.map(|version| Ok(version)))
versions_stream
.map_ok(Ok)
.forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
@@ -483,7 +450,7 @@ where
_targets: Option<String>,
_storage_keys: Option<String>,
) -> FutureResult<sp_rpc::tracing::TraceBlockResponse> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
}
@@ -499,7 +466,7 @@ where
_storage_key: PrefixedStorageKey,
_keys: Vec<StorageKey>,
) -> FutureResult<ReadProof<Block::Hash>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage_keys(
@@ -508,7 +475,7 @@ where
_storage_key: PrefixedStorageKey,
_prefix: StorageKey,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage_keys_paged(
@@ -519,7 +486,7 @@ where
_count: u32,
_start_key: Option<StorageKey>,
) -> FutureResult<Vec<StorageKey>> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
async move { Err(client_err(ClientError::NotAvailableOnLightClient)) }.boxed()
}
fn storage(
@@ -560,7 +527,7 @@ where
}
});
Box::new(child_storage.boxed().compat())
child_storage.boxed()
}
fn storage_hash(
@@ -569,11 +536,9 @@ where
storage_key: PrefixedStorageKey,
key: StorageKey,
) -> FutureResult<Option<Block::Hash>> {
Box::new(ChildStateBackend::storage(self, block, storage_key, key).and_then(
|maybe_storage| {
result(Ok(maybe_storage.map(|storage| HashFor::<Block>::hash(&storage.0))))
},
))
let child_storage = ChildStateBackend::storage(self, block, storage_key, key);
async move { child_storage.await.map(|r| r.map(|s| HashFor::<Block>::hash(&s.0))) }.boxed()
}
}
@@ -687,54 +652,50 @@ fn subscription_stream<
initial_request: InitialRequestFuture,
issue_request: IssueRequest,
compare_values: CompareValues,
) -> impl Stream<Item = N, Error = ()>
) -> impl Stream<Item = std::result::Result<N, ()>>
where
Block: BlockT,
Requests: 'static + SharedRequests<Block::Hash, V>,
FutureBlocksStream: Stream<Item = Block::Hash, Error = ()>,
FutureBlocksStream: Stream<Item = Block::Hash>,
V: Send + 'static + Clone,
InitialRequestFuture:
std::future::Future<Output = Result<(Block::Hash, V), ()>> + Send + 'static,
InitialRequestFuture: Future<Output = Result<(Block::Hash, V), ()>> + Send + 'static,
IssueRequest: 'static + Fn(Block::Hash) -> IssueRequestFuture,
IssueRequestFuture: std::future::Future<Output = Result<V, Error>> + Send + 'static,
IssueRequestFuture: Future<Output = Result<V, Error>> + Send + 'static,
CompareValues: Fn(Block::Hash, Option<&V>, &V) -> Option<N>,
{
// we need to send initial value first, then we'll only be sending if value has changed
let previous_value = Arc::new(Mutex::new(None));
// prepare 'stream' of initial values
let initial_value_stream = ignore_error(initial_request).boxed().compat().into_stream();
let initial_value_stream = initial_request.into_stream();
// prepare stream of future values
//
// we do not want to stop stream if single request fails
// (the warning should have been already issued by the request issuer)
let future_values_stream = future_blocks_stream.and_then(move |block| {
ignore_error(
let future_values_stream = future_blocks_stream
.then(move |block| {
maybe_share_remote_request::<Block, _, _, _, _>(
shared_requests.clone(),
block,
&issue_request,
)
.map(move |r| r.map(|v| (block, v))),
)
.boxed()
.compat()
});
.map(move |r| r.map(|v| (block, v)))
})
.filter(|r| ready(r.is_ok()));
// now let's return changed values for selected blocks
initial_value_stream
.chain(future_values_stream)
.filter_map(move |block_and_new_value| {
block_and_new_value.and_then(|(block, new_value)| {
let mut previous_value = previous_value.lock();
compare_values(block, previous_value.as_ref(), &new_value).map(
|notification_value| {
*previous_value = Some(new_value);
notification_value
},
)
})
.try_filter_map(move |(block, new_value)| {
let mut previous_value = previous_value.lock();
let res = compare_values(block, previous_value.as_ref(), &new_value).map(
|notification_value| {
*previous_value = Some(new_value);
notification_value
},
);
async move { Ok(res) }
})
.map_err(|_| ())
}
@@ -789,24 +750,10 @@ where
})
}
/// Convert successful future result into Ok(Some(result)) and error into Ok(None),
/// displaying warning.
fn ignore_error<F, T>(future: F) -> impl std::future::Future<Output = Result<Option<T>, ()>>
where
F: std::future::Future<Output = Result<T, ()>>,
{
future.then(|result| {
ready(match result {
Ok(result) => Ok(Some(result)),
Err(()) => Ok(None),
})
})
}
#[cfg(test)]
mod tests {
use super::*;
use rpc::futures::stream::futures_ordered;
use futures::{executor, stream};
use sp_core::H256;
use substrate_test_runtime_client::runtime::Block;
@@ -814,7 +761,7 @@ mod tests {
fn subscription_stream_works() {
let stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
SimpleSubscriptions::default(),
futures_ordered(vec![result(Ok(H256::from([2; 32]))), result(Ok(H256::from([3; 32])))]),
stream::iter(vec![H256::from([2; 32]), H256::from([3; 32])]),
ready(Ok((H256::from([1; 32]), 100))),
|block| match block[0] {
2 => ready(Ok(100)),
@@ -827,14 +774,14 @@ mod tests {
},
);
assert_eq!(stream.collect().wait(), Ok(vec![100, 200]));
assert_eq!(executor::block_on(stream.collect::<Vec<_>>()), vec![Ok(100), Ok(200)]);
}
#[test]
fn subscription_stream_ignores_failed_requests() {
let stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
SimpleSubscriptions::default(),
futures_ordered(vec![result(Ok(H256::from([2; 32]))), result(Ok(H256::from([3; 32])))]),
stream::iter(vec![H256::from([2; 32]), H256::from([3; 32])]),
ready(Ok((H256::from([1; 32]), 100))),
|block| match block[0] {
2 => ready(Err(client_err(ClientError::NotAvailableOnLightClient))),
@@ -847,7 +794,7 @@ mod tests {
},
);
assert_eq!(stream.collect().wait(), Ok(vec![100, 200]));
assert_eq!(executor::block_on(stream.collect::<Vec<_>>()), vec![Ok(100), Ok(200)]);
}
#[test]
+49 -57
View File
@@ -18,11 +18,9 @@
use self::error::Error;
use super::{state_full::split_range, *};
use crate::testing::TaskExecutor;
use assert_matches::assert_matches;
use futures::{compat::Future01CompatExt, executor};
use futures01::stream::Stream;
use futures::{executor, StreamExt};
use sc_block_builder::BlockBuilderProvider;
use sc_rpc_api::DenyUnsafe;
use sp_consensus::BlockOrigin;
@@ -63,37 +61,33 @@ fn should_return_storage() {
let key = StorageKey(KEY.to_vec());
assert_eq!(
client
.storage(key.clone(), Some(genesis_hash).into())
.wait()
executor::block_on(client.storage(key.clone(), Some(genesis_hash).into()))
.map(|x| x.map(|x| x.0.len()))
.unwrap()
.unwrap() as usize,
VALUE.len(),
);
assert_matches!(
client
.storage_hash(key.clone(), Some(genesis_hash).into())
.wait()
executor::block_on(client.storage_hash(key.clone(), Some(genesis_hash).into()))
.map(|x| x.is_some()),
Ok(true)
);
assert_eq!(
client.storage_size(key.clone(), None).wait().unwrap().unwrap() as usize,
executor::block_on(client.storage_size(key.clone(), None)).unwrap().unwrap() as usize,
VALUE.len(),
);
assert_eq!(
client.storage_size(StorageKey(b":map".to_vec()), None).wait().unwrap().unwrap() as usize,
executor::block_on(client.storage_size(StorageKey(b":map".to_vec()), None))
.unwrap()
.unwrap() as usize,
2 + 3,
);
assert_eq!(
executor::block_on(
child
.storage(prefixed_storage_key(), key, Some(genesis_hash).into())
.map(|x| x.map(|x| x.0.len()))
.compat(),
.map(|x| x.map(|x| x.unwrap().0.len()))
)
.unwrap()
.unwrap() as usize,
CHILD_VALUE.len(),
);
@@ -114,21 +108,26 @@ fn should_return_child_storage() {
let key = StorageKey(b"key".to_vec());
assert_matches!(
child.storage(
executor::block_on(child.storage(
child_key.clone(),
key.clone(),
Some(genesis_hash).into(),
).wait(),
)),
Ok(Some(StorageData(ref d))) if d[0] == 42 && d.len() == 1
);
assert_matches!(
child
.storage_hash(child_key.clone(), key.clone(), Some(genesis_hash).into(),)
.wait()
.map(|x| x.is_some()),
executor::block_on(child.storage_hash(
child_key.clone(),
key.clone(),
Some(genesis_hash).into(),
))
.map(|x| x.is_some()),
Ok(true)
);
assert_matches!(child.storage_size(child_key.clone(), key.clone(), None).wait(), Ok(Some(1)));
assert_matches!(
executor::block_on(child.storage_size(child_key.clone(), key.clone(), None)),
Ok(Some(1))
);
}
#[test]
@@ -139,16 +138,18 @@ fn should_call_contract() {
new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)), DenyUnsafe::No, None);
assert_matches!(
client
.call("balanceOf".into(), Bytes(vec![1, 2, 3]), Some(genesis_hash).into())
.wait(),
executor::block_on(client.call(
"balanceOf".into(),
Bytes(vec![1, 2, 3]),
Some(genesis_hash).into()
)),
Err(Error::Client(_))
)
}
#[test]
fn should_notify_about_storage_changes() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
@@ -162,7 +163,7 @@ fn should_notify_about_storage_changes() {
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
let mut builder = client.new_block(Default::default()).unwrap();
builder
@@ -177,16 +178,14 @@ fn should_notify_about_storage_changes() {
executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
}
// assert notification sent to transport
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
// Check notification sent to transport
executor::block_on((&mut transport).take(2).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
#[test]
fn should_send_initial_storage_changes_and_notifications() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
@@ -207,7 +206,7 @@ fn should_send_initial_storage_changes_and_notifications() {
);
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
let mut builder = client.new_block(Default::default()).unwrap();
builder
@@ -222,14 +221,9 @@ fn should_send_initial_storage_changes_and_notifications() {
executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
}
// assert initial values sent to transport
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
// Check for the correct number of notifications
executor::block_on((&mut transport).take(2).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
#[test]
@@ -299,7 +293,7 @@ fn should_query_storage() {
let keys = (1..6).map(|k| StorageKey(vec![k])).collect::<Vec<_>>();
let result = api.query_storage(keys.clone(), genesis_hash, Some(block1_hash).into());
assert_eq!(result.wait().unwrap(), expected);
assert_eq!(executor::block_on(result).unwrap(), expected);
// Query all changes
let result = api.query_storage(keys.clone(), genesis_hash, None.into());
@@ -312,18 +306,18 @@ fn should_query_storage() {
(StorageKey(vec![5]), Some(StorageData(vec![1]))),
],
});
assert_eq!(result.wait().unwrap(), expected);
assert_eq!(executor::block_on(result).unwrap(), expected);
// Query changes up to block2.
let result = api.query_storage(keys.clone(), genesis_hash, Some(block2_hash));
assert_eq!(result.wait().unwrap(), expected);
assert_eq!(executor::block_on(result).unwrap(), expected);
// Inverted range.
let result = api.query_storage(keys.clone(), block1_hash, Some(genesis_hash));
assert_eq!(
result.wait().map_err(|e| e.to_string()),
executor::block_on(result).map_err(|e| e.to_string()),
Err(Error::InvalidBlockRange {
from: format!("1 ({:?})", block1_hash),
to: format!("0 ({:?})", genesis_hash),
@@ -339,7 +333,7 @@ fn should_query_storage() {
let result = api.query_storage(keys.clone(), genesis_hash, Some(random_hash1));
assert_eq!(
result.wait().map_err(|e| e.to_string()),
executor::block_on(result).map_err(|e| e.to_string()),
Err(Error::InvalidBlockRange {
from: format!("{:?}", genesis_hash),
to: format!("{:?}", Some(random_hash1)),
@@ -355,7 +349,7 @@ fn should_query_storage() {
let result = api.query_storage(keys.clone(), random_hash1, Some(genesis_hash));
assert_eq!(
result.wait().map_err(|e| e.to_string()),
executor::block_on(result).map_err(|e| e.to_string()),
Err(Error::InvalidBlockRange {
from: format!("{:?}", random_hash1),
to: format!("{:?}", Some(genesis_hash)),
@@ -371,7 +365,7 @@ fn should_query_storage() {
let result = api.query_storage(keys.clone(), random_hash1, None);
assert_eq!(
result.wait().map_err(|e| e.to_string()),
executor::block_on(result).map_err(|e| e.to_string()),
Err(Error::InvalidBlockRange {
from: format!("{:?}", random_hash1),
to: format!("{:?}", Some(block2_hash)), // Best block hash.
@@ -387,7 +381,7 @@ fn should_query_storage() {
let result = api.query_storage(keys.clone(), random_hash1, Some(random_hash2));
assert_eq!(
result.wait().map_err(|e| e.to_string()),
executor::block_on(result).map_err(|e| e.to_string()),
Err(Error::InvalidBlockRange {
from: format!("{:?}", random_hash1), // First hash not found.
to: format!("{:?}", Some(random_hash2)),
@@ -403,7 +397,7 @@ fn should_query_storage() {
let result = api.query_storage_at(keys.clone(), Some(block1_hash));
assert_eq!(
result.wait().unwrap(),
executor::block_on(result).unwrap(),
vec![StorageChangeSet {
block: block1_hash,
changes: vec![
@@ -454,7 +448,7 @@ fn should_return_runtime_version() {
[\"0xf78b278be53f454c\",2],[\"0xab3c0572291feb8b\",1],[\"0xbc9d89904f5b923f\",1]],\
\"transactionVersion\":1}";
let runtime_version = api.runtime_version(None.into()).wait().unwrap();
let runtime_version = executor::block_on(api.runtime_version(None.into())).unwrap();
let serialized = serde_json::to_string(&runtime_version).unwrap();
assert_eq!(serialized, result);
@@ -464,7 +458,7 @@ fn should_return_runtime_version() {
#[test]
fn should_notify_on_runtime_version_initially() {
let (subscriber, id, transport) = Subscriber::new_test("test");
let (subscriber, id, mut transport) = Subscriber::new_test("test");
{
let client = Arc::new(substrate_test_runtime_client::new());
@@ -478,14 +472,12 @@ fn should_notify_on_runtime_version_initially() {
api.subscribe_runtime_version(Default::default(), subscriber);
// assert id assigned
assert!(matches!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::String(_)))));
assert!(matches!(executor::block_on(id), Ok(Ok(SubscriptionId::String(_)))));
}
// assert initial version sent.
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
executor::block_on((&mut transport).take(1).collect::<Vec<_>>());
assert!(executor::block_on(transport.next()).is_none());
}
#[test]
+20 -35
View File
@@ -18,28 +18,27 @@
//! Substrate system API.
#[cfg(test)]
mod tests;
use futures::{channel::oneshot, compat::Compat, future::BoxFuture, FutureExt, TryFutureExt};
use self::error::Result;
use futures::{channel::oneshot, FutureExt};
use sc_rpc_api::{DenyUnsafe, Receiver};
use sc_tracing::logging;
use sp_runtime::traits::{self, Header as HeaderT};
use sp_utils::mpsc::TracingUnboundedSender;
use self::error::Result;
pub use self::{
gen_client::Client as SystemClient,
helpers::{Health, NodeRole, PeerInfo, SyncState, SystemInfo},
};
pub use sc_rpc_api::system::*;
#[cfg(test)]
mod tests;
/// Early exit for RPCs that require `--rpc-methods=Unsafe` to be enabled
macro_rules! bail_if_unsafe {
($value: expr) => {
if let Err(err) = $value.check_if_safe() {
return async move { Err(err.into()) }.boxed().compat()
return async move { Err(err.into()) }.boxed()
}
};
}
@@ -114,51 +113,42 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
fn system_health(&self) -> Receiver<Health> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::Health(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_local_peer_id(&self) -> Receiver<String> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::LocalPeerId(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_local_listen_addresses(&self) -> Receiver<Vec<String>> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::LocalListenAddresses(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_peers(
&self,
) -> Compat<
BoxFuture<'static, rpc::Result<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>>>,
> {
) -> rpc::BoxFuture<rpc::Result<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>>> {
bail_if_unsafe!(self.deny_unsafe);
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::Peers(tx));
async move { rx.await.map_err(|_| rpc::Error::internal_error()) }
.boxed()
.compat()
async move { rx.await.map_err(|_| rpc::Error::internal_error()) }.boxed()
}
fn system_network_state(&self) -> Compat<BoxFuture<'static, rpc::Result<rpc::Value>>> {
fn system_network_state(&self) -> rpc::BoxFuture<rpc::Result<rpc::Value>> {
bail_if_unsafe!(self.deny_unsafe);
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::NetworkState(tx));
async move { rx.await.map_err(|_| rpc::Error::internal_error()) }
.boxed()
.compat()
async move { rx.await.map_err(|_| rpc::Error::internal_error()) }.boxed()
}
fn system_add_reserved_peer(
&self,
peer: String,
) -> Compat<BoxFuture<'static, std::result::Result<(), rpc::Error>>> {
fn system_add_reserved_peer(&self, peer: String) -> rpc::BoxFuture<rpc::Result<()>> {
bail_if_unsafe!(self.deny_unsafe);
let (tx, rx) = oneshot::channel();
@@ -171,13 +161,9 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
}
}
.boxed()
.compat()
}
fn system_remove_reserved_peer(
&self,
peer: String,
) -> Compat<BoxFuture<'static, std::result::Result<(), rpc::Error>>> {
fn system_remove_reserved_peer(&self, peer: String) -> rpc::BoxFuture<rpc::Result<()>> {
bail_if_unsafe!(self.deny_unsafe);
let (tx, rx) = oneshot::channel();
@@ -190,34 +176,33 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
}
}
.boxed()
.compat()
}
fn system_reserved_peers(&self) -> Receiver<Vec<String>> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::NetworkReservedPeers(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_node_roles(&self) -> Receiver<Vec<NodeRole>> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::NodeRoles(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_sync_state(&self) -> Receiver<SyncState<<B::Header as HeaderT>::Number>> {
let (tx, rx) = oneshot::channel();
let _ = self.send_back.unbounded_send(Request::SyncState(tx));
Receiver(Compat::new(rx))
Receiver(rx)
}
fn system_add_log_filter(&self, directives: String) -> std::result::Result<(), rpc::Error> {
fn system_add_log_filter(&self, directives: String) -> rpc::Result<()> {
self.deny_unsafe.check_if_safe()?;
logging::add_directives(&directives);
logging::reload_filter().map_err(|_e| rpc::Error::internal_error())
}
fn system_reset_log_filter(&self) -> std::result::Result<(), rpc::Error> {
fn system_reset_log_filter(&self) -> rpc::Result<()> {
self.deny_unsafe.check_if_safe()?;
logging::reset_log_filter().map_err(|_e| rpc::Error::internal_error())
}
+8 -14
View File
@@ -19,7 +19,7 @@
use super::*;
use assert_matches::assert_matches;
use futures::prelude::*;
use futures::{executor, prelude::*};
use sc_network::{self, config::Role, PeerId};
use sp_utils::mpsc::tracing_unbounded;
use std::{
@@ -139,8 +139,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
}
fn wait_receiver<T>(rx: Receiver<T>) -> T {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
runtime.block_on(rx).unwrap()
futures::executor::block_on(rx).unwrap()
}
#[test]
@@ -223,12 +222,10 @@ fn system_local_listen_addresses_works() {
#[test]
fn system_peers() {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let peer_id = PeerId::random();
let req = api(Status { peer_id: peer_id.clone(), peers: 1, is_syncing: false, is_dev: true })
.system_peers();
let res = runtime.block_on(req).unwrap();
let res = executor::block_on(req).unwrap();
assert_eq!(
res,
@@ -243,9 +240,8 @@ fn system_peers() {
#[test]
fn system_network_state() {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let req = api(None).system_network_state();
let res = runtime.block_on(req).unwrap();
let res = executor::block_on(req).unwrap();
assert_eq!(
serde_json::from_value::<sc_network::network_state::NetworkState>(res).unwrap(),
@@ -278,12 +274,11 @@ fn system_network_add_reserved() {
let good_peer_id =
"/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV";
let bad_peer_id = "/ip4/198.51.100.19/tcp/30333";
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let good_fut = api(None).system_add_reserved_peer(good_peer_id.into());
let bad_fut = api(None).system_add_reserved_peer(bad_peer_id.into());
assert_eq!(runtime.block_on(good_fut), Ok(()));
assert!(runtime.block_on(bad_fut).is_err());
assert_eq!(executor::block_on(good_fut), Ok(()));
assert!(executor::block_on(bad_fut).is_err());
}
#[test]
@@ -291,12 +286,11 @@ fn system_network_remove_reserved() {
let good_peer_id = "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV";
let bad_peer_id =
"/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV";
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let good_fut = api(None).system_remove_reserved_peer(good_peer_id.into());
let bad_fut = api(None).system_remove_reserved_peer(bad_peer_id.into());
assert_eq!(runtime.block_on(good_fut), Ok(()));
assert!(runtime.block_on(bad_fut).is_err());
assert_eq!(executor::block_on(good_fut), Ok(()));
assert!(executor::block_on(bad_fut).is_err());
}
#[test]
+11 -10
View File
@@ -18,8 +18,10 @@
//! Testing utils used by the RPC tests.
use futures::{compat::Future01CompatExt, executor, FutureExt};
use rpc::futures::future as future01;
use futures::{
executor,
task::{FutureObj, Spawn, SpawnError},
};
// Executor shared by all tests.
//
@@ -30,16 +32,15 @@ lazy_static::lazy_static! {
.expect("Failed to create thread pool executor for tests");
}
type Boxed01Future01 = Box<dyn future01::Future<Item = (), Error = ()> + Send + 'static>;
/// Executor for use in testing
pub struct TaskExecutor;
impl future01::Executor<Boxed01Future01> for TaskExecutor {
fn execute(
&self,
future: Boxed01Future01,
) -> std::result::Result<(), future01::ExecuteError<Boxed01Future01>> {
EXECUTOR.spawn_ok(future.compat().map(drop));
impl Spawn for TaskExecutor {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
EXECUTOR.spawn_ok(future);
Ok(())
}
fn status(&self) -> Result<(), SpawnError> {
Ok(())
}
}