Fix RPC tests for machines with a high number of cores (#6021)

This commit is contained in:
Bastian Köcher
2020-05-14 11:08:39 +02:00
committed by GitHub
parent 05fac811b2
commit 73389df860
7 changed files with 102 additions and 78 deletions
+13 -11
View File
@@ -30,7 +30,7 @@ use substrate_test_runtime_client::{
DefaultTestClientBuilderExt, TestClientBuilderExt, Backend, Client,
};
use sc_transaction_pool::{BasicPool, FullChainApi};
use tokio::runtime;
use futures::{executor, compat::Future01CompatExt};
fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic {
let tx = Transfer {
@@ -48,7 +48,6 @@ type FullTransactionPool = BasicPool<
>;
struct TestSetup {
pub runtime: runtime::Runtime,
pub client: Arc<Client<Backend>>,
pub keystore: BareCryptoStorePtr,
pub pool: Arc<FullTransactionPool>,
@@ -68,7 +67,6 @@ impl Default for TestSetup {
None,
).0);
TestSetup {
runtime: runtime::Runtime::new().expect("Failed to create runtime in test setup"),
client,
keystore,
pool,
@@ -81,7 +79,7 @@ impl TestSetup {
Author {
client: self.client.clone(),
pool: self.pool.clone(),
subscriptions: Subscriptions::new(Arc::new(self.runtime.executor())),
subscriptions: Subscriptions::new(Arc::new(crate::testing::TaskExecutor)),
keystore: self.keystore.clone(),
deny_unsafe: DenyUnsafe::No,
}
@@ -121,16 +119,20 @@ fn submit_rich_transaction_should_not_cause_error() {
#[test]
fn should_watch_extrinsic() {
//given
let mut setup = TestSetup::default();
let setup = TestSetup::default();
let p = setup.author();
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
p.watch_extrinsic(
Default::default(),
subscriber,
uxt(AccountKeyring::Alice, 0).encode().into(),
);
// then
assert_eq!(setup.runtime.block_on(id_rx), Ok(Ok(1.into())));
assert_eq!(executor::block_on(id_rx.compat()), Ok(Ok(1.into())));
// check notifications
let replacement = {
let tx = Transfer {
@@ -142,14 +144,14 @@ fn should_watch_extrinsic() {
tx.into_signed_tx()
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = setup.runtime.block_on(data.into_future()).unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();
assert_eq!(
res,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
);
let h = blake2_256(&replacement.encode());
assert_eq!(
setup.runtime.block_on(data.into_future()).unwrap().0,
executor::block_on(data.into_future().compat()).unwrap().0,
Some(format!(r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#, HexDisplay::from(&h)))
);
}
@@ -157,7 +159,7 @@ fn should_watch_extrinsic() {
#[test]
fn should_return_watch_validation_error() {
//given
let mut setup = TestSetup::default();
let setup = TestSetup::default();
let p = setup.author();
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
@@ -166,7 +168,7 @@ fn should_return_watch_validation_error() {
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());
// then
let res = setup.runtime.block_on(id_rx).unwrap();
let res = executor::block_on(id_rx.compat()).unwrap();
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
}
+21 -37
View File
@@ -23,14 +23,13 @@ use substrate_test_runtime_client::{
};
use sp_rpc::list::ListOrValue;
use sc_block_builder::BlockBuilderProvider;
use futures::{executor, compat::{Future01CompatExt, Stream01CompatExt}};
use crate::testing::TaskExecutor;
#[test]
fn should_return_header() {
let core = tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.header(Some(client.genesis_hash()).into()).wait(),
@@ -61,11 +60,8 @@ fn should_return_header() {
#[test]
fn should_return_a_block() {
let core = tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_hash = block.hash();
@@ -115,11 +111,8 @@ fn should_return_a_block() {
#[test]
fn should_return_block_hash() {
let core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.block_hash(None.into()),
@@ -162,11 +155,8 @@ fn should_return_block_hash() {
#[test]
fn should_return_finalized_hash() {
let core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.finalized_head(),
@@ -192,76 +182,70 @@ fn should_return_finalized_hash() {
#[test]
fn should_notify_about_latest_block() {
let mut core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_all_heads(Default::default(), subscriber);
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
}
// assert initial head sent.
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = core.block_on(next.into_future()).unwrap();
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
}
#[test]
fn should_notify_about_best_block() {
let mut core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_new_heads(Default::default(), subscriber);
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
}
// assert initial head sent.
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = core.block_on(next.into_future()).unwrap();
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(Stream01CompatExt::compat(next).into_future()).0, None);
}
#[test]
fn should_notify_about_finalized_block() {
let mut core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_finalized_heads(Default::default(), subscriber);
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
@@ -269,11 +253,11 @@ fn should_notify_about_finalized_block() {
}
// assert initial head sent.
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = core.block_on(next.into_future()).unwrap();
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
}
+2
View File
@@ -31,3 +31,5 @@ pub mod chain;
pub mod offchain;
pub mod state;
pub mod system;
#[cfg(test)]
mod testing;
+22 -30
View File
@@ -31,6 +31,8 @@ use substrate_test_runtime_client::{
runtime,
};
use sp_runtime::generic::BlockId;
use crate::testing::TaskExecutor;
use futures::{executor, compat::Future01CompatExt};
const STORAGE_KEY: &[u8] = b"child";
@@ -46,13 +48,12 @@ fn should_return_storage() {
const CHILD_VALUE: &[u8] = b"hello world !";
let child_info = ChildInfo::new_default(STORAGE_KEY);
let mut core = tokio::runtime::Runtime::new().unwrap();
let client = TestClientBuilder::new()
.add_extra_storage(KEY.to_vec(), VALUE.to_vec())
.add_extra_child_storage(&child_info, KEY.to_vec(), CHILD_VALUE.to_vec())
.build();
let genesis_hash = client.genesis_hash();
let (client, child) = new_full(Arc::new(client), Subscriptions::new(Arc::new(core.executor())));
let (client, child) = new_full(Arc::new(client), Subscriptions::new(Arc::new(TaskExecutor)));
let key = StorageKey(KEY.to_vec());
assert_eq!(
@@ -70,9 +71,10 @@ fn should_return_storage() {
VALUE.len(),
);
assert_eq!(
core.block_on(
executor::block_on(
child.storage(prefixed_storage_key(), key, Some(genesis_hash).into())
.map(|x| x.map(|x| x.0.len()))
.compat(),
).unwrap().unwrap() as usize,
CHILD_VALUE.len(),
);
@@ -82,12 +84,11 @@ fn should_return_storage() {
#[test]
fn should_return_child_storage() {
let child_info = ChildInfo::new_default(STORAGE_KEY);
let core = tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(substrate_test_runtime_client::TestClientBuilder::new()
.add_child_storage(&child_info, "key", vec![42_u8])
.build());
let genesis_hash = client.genesis_hash();
let (_client, child) = new_full(client, Subscriptions::new(Arc::new(core.executor())));
let (_client, child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
let child_key = prefixed_storage_key();
let key = StorageKey(b"key".to_vec());
@@ -120,10 +121,9 @@ fn should_return_child_storage() {
#[test]
fn should_call_contract() {
let core = tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(substrate_test_runtime_client::new());
let genesis_hash = client.genesis_hash();
let (client, _child) = new_full(client, Subscriptions::new(Arc::new(core.executor())));
let (client, _child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(),
@@ -133,18 +133,16 @@ fn should_call_contract() {
#[test]
fn should_notify_about_storage_changes() {
let mut core = tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -158,21 +156,19 @@ fn should_notify_about_storage_changes() {
}
// assert notification sent to transport
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
}
#[test]
fn should_send_initial_storage_changes_and_notifications() {
let mut core = tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into()));
@@ -181,7 +177,7 @@ fn should_send_initial_storage_changes_and_notifications() {
]).into());
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -195,20 +191,19 @@ fn should_send_initial_storage_changes_and_notifications() {
}
// assert initial values sent to transport
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = core.block_on(next.into_future()).unwrap();
let (notification, next) = executor::block_on(next.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
}
#[test]
fn should_query_storage() {
fn run_tests(mut client: Arc<TestClient>, has_changes_trie_config: bool) {
let core = tokio::runtime::Runtime::new().unwrap();
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor())));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let mut add_block = |nonce| {
let mut builder = client.new_block(Default::default()).unwrap();
@@ -424,10 +419,8 @@ fn should_split_ranges() {
#[test]
fn should_return_runtime_version() {
let core = tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor())));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\
\"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\
@@ -446,24 +439,23 @@ fn should_return_runtime_version() {
#[test]
fn should_notify_on_runtime_version_initially() {
let mut core = tokio::runtime::Runtime::new().unwrap();
let (subscriber, id, transport) = Subscriber::new_test("test");
{
let client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor())));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_runtime_version(Default::default(), subscriber);
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
}
// assert initial version sent.
let (notification, next) = core.block_on(transport.into_future()).unwrap();
let (notification, next) = executor::block_on(transport.into_future().compat()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
assert_eq!(executor::block_on(next.into_future().compat()).unwrap().0, None);
}
#[test]
+42
View File
@@ -0,0 +1,42 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Testing utils used by the RPC tests.
use rpc::futures::future as future01;
use futures::{executor, compat::Future01CompatExt, FutureExt};
// Executor shared by all tests.
//
// This shared executor is used to prevent `Too many open files` errors
// on systems with a lot of cores.
lazy_static::lazy_static! {
static ref EXECUTOR: executor::ThreadPool = executor::ThreadPool::new()
.expect("Failed to create thread pool executor for tests");
}
type Boxed01Future01 = Box<dyn future01::Future<Item = (), Error = ()> + Send + 'static>;
pub struct TaskExecutor;
impl future01::Executor<Boxed01Future01> for TaskExecutor {
fn execute(
&self,
future: Boxed01Future01,
) -> std::result::Result<(), future01::ExecuteError<Boxed01Future01>>{
EXECUTOR.spawn_ok(future.compat().map(drop));
Ok(())
}
}